diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 30 | ||||
| -rw-r--r-- | src/core/aio.h | 11 | ||||
| -rw-r--r-- | src/core/defs.h | 13 | ||||
| -rw-r--r-- | src/nng.c | 6 | ||||
| -rw-r--r-- | src/nng.h | 12 | ||||
| -rw-r--r-- | src/supplemental/http/http.c | 6 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 4 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 4 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 4 |
9 files changed, 70 insertions, 20 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index c6e0ed97..a036a606 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -66,8 +66,10 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) } memset(aio, 0, sizeof(*aio)); nni_cv_init(&aio->a_cv, &nni_aio_lk); - aio->a_expire = NNI_TIME_NEVER; - aio->a_timeout = NNG_DURATION_INFINITE; + aio->a_expire = NNI_TIME_NEVER; + aio->a_timeout = NNG_DURATION_INFINITE; + aio->a_iov = aio->a_iovinl; + aio->a_niovalloc = 0; if (arg == NULL) { arg = aio; } @@ -85,10 +87,34 @@ nni_aio_fini(nni_aio *aio) // At this point the AIO is done. nni_cv_fini(&aio->a_cv); + if (aio->a_niovalloc > 0) { + NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc); + } + NNI_FREE_STRUCT(aio); } } +int +nni_aio_set_iov(nni_aio *aio, int niov, nng_iov *iov) +{ + if ((niov > 4) && (niov > aio->a_niovalloc)) { + nni_iov *newiov = NNI_ALLOC_STRUCTS(newiov, niov); + if (newiov == NULL) { + return (NNG_ENOMEM); + } + if (aio->a_niovalloc > 0) { + NNI_FREE_STRUCTS(aio->a_iov, aio->a_niovalloc); + } + aio->a_iov = newiov; + aio->a_niovalloc = niov; + } + + memcpy(aio->a_iov, iov, niov * sizeof(nng_iov)); + aio->a_niov = niov; + return (0); +} + void nni_aio_fini_cb(nni_aio *aio) { diff --git a/src/core/aio.h b/src/core/aio.h index b5db29c9..33fe07cb 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -39,8 +39,10 @@ struct nni_aio { nni_task a_task; // Read/write operations. - nni_iov a_iov[4]; - int a_niov; + nni_iov *a_iov; + int a_niov; + nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short + int a_niovalloc; // number of allocated IOVs // Message operations. nni_msg *a_msg; @@ -126,6 +128,11 @@ extern void nni_aio_set_output(nni_aio *, int, void *); // nni_get_output returns an output previously stored on the AIO. extern void *nni_aio_get_output(nni_aio *, int); +// nni_aio_set_iov sets an IOV (scatter/gather vector) on the AIO. +// Up to 4 may be set without any possibility of failure, more than that +// may require an allocation and hence fail due to NNG_ENOMEM. +extern int nni_aio_set_iov(nni_aio *, int, nng_iov *); + // XXX: These should be refactored in terms of generic inputs and outputs. extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); diff --git a/src/core/defs.h b/src/core/defs.h index cecb4825..fbf074b4 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -1,5 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitoar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -34,6 +35,7 @@ typedef struct nng_sockaddr nni_sockaddr; typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; typedef struct nng_url nni_url; +typedef struct nng_iov nni_iov; // These are our own names. typedef struct nni_socket nni_sock; @@ -64,12 +66,6 @@ typedef struct nni_aio nni_aio; typedef void (*nni_cb)(void *); -// Used by transports for scatter gather I/O. -typedef struct { - uint8_t *iov_buf; - size_t iov_len; -} nni_iov; - // Notify descriptor. typedef struct { int sn_wfd; // written to in order to flag an event @@ -134,6 +130,9 @@ typedef struct { (((uint64_t)((uint8_t)(ptr)[6])) << 8) + \ (((uint64_t)(uint8_t)(ptr)[7])) +// This increments a pointer a fixed number of byte cells. +#define NNI_INCPTR(ptr, n) ((ptr) = (void *) ((char *) (ptr) + (n))) + // A few assorted other items. #define NNI_FLAG_IPV4ONLY 1 @@ -1082,6 +1082,12 @@ nng_aio_set_timeout(nng_aio *ap, nng_duration dur) nni_aio_set_timeout((nni_aio *) ap, dur); } +int +nng_aio_set_iov(nng_aio *ap, int niov, nng_iov *iov) +{ + return (nni_aio_set_iov((nni_aio *) ap, niov, iov)); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) @@ -53,6 +53,12 @@ typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; typedef struct nng_aio nng_aio; +// Scatter/gather I/O. +typedef struct nng_iov { + void * iov_buf; + size_t iov_len; +} nng_iov; + // Some definitions for durations used with timeouts. #define NNG_DURATION_INFINITE (-1) #define NNG_DURATION_DEFAULT (-2) @@ -298,6 +304,12 @@ NNG_DECL nng_msg *nng_aio_get_msg(nng_aio *); // that any socket specific timeout should be used. NNG_DECL void nng_aio_set_timeout(nng_aio *, nng_duration); +// nng_aio_set_iov sets a scatter/gather vector on the aio. The iov array +// itself is copied. Data members (the memory regions referenced) *may* be +// copied as well, depending on the operation. This operation is guaranteed +// to succeed if n <= 4, otherwise it may fail due to NNG_ENOMEM. +NNG_DECL int nng_aio_set_iov(nng_aio *, int, nng_iov *); + // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); NNG_DECL void nng_msg_free(nng_msg *); diff --git a/src/supplemental/http/http.c b/src/supplemental/http/http.c index 43db1d15..2ba5b274 100644 --- a/src/supplemental/http/http.c +++ b/src/supplemental/http/http.c @@ -145,7 +145,7 @@ http_rd_buf(nni_http *http, nni_aio *aio) } memcpy(aio->a_iov[0].iov_buf, rbuf, n); aio->a_iov[0].iov_len -= n; - aio->a_iov[0].iov_buf += n; + NNI_INCPTR(aio->a_iov[0].iov_buf, n); http->rd_get += n; rbuf += n; aio->a_count += n; @@ -302,7 +302,7 @@ http_rd_cb(void *arg) n = cnt; } uaio->a_iov[0].iov_len -= n; - uaio->a_iov[0].iov_buf += n; + NNI_INCPTR(uaio->a_iov[0].iov_buf, n); uaio->a_count += n; cnt -= n; @@ -420,7 +420,7 @@ http_wr_cb(void *arg) if (aio->a_iov[0].iov_len > n) { aio->a_iov[0].iov_len -= n; - aio->a_iov[0].iov_buf += n; + NNI_INCPTR(aio->a_iov[0].iov_buf, n); break; } n -= aio->a_iov[0].iov_len; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 62751d86..9e8a3829 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -233,7 +233,7 @@ nni_ipc_pipe_send_cb(void *arg) NNI_ASSERT(txaio->a_niov != 0); if (txaio->a_iov[0].iov_len > n) { txaio->a_iov[0].iov_len -= n; - txaio->a_iov[0].iov_buf += n; + NNI_INCPTR(txaio->a_iov[0].iov_buf, n); break; } n -= txaio->a_iov[0].iov_len; @@ -285,7 +285,7 @@ nni_ipc_pipe_recv_cb(void *arg) NNI_ASSERT(rxaio->a_niov != 0); if (rxaio->a_iov[0].iov_len > n) { rxaio->a_iov[0].iov_len -= n; - rxaio->a_iov[0].iov_buf += n; + NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); break; } n -= rxaio->a_iov[0].iov_len; diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 0a123a79..28ec9438 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -230,7 +230,7 @@ nni_tcp_pipe_send_cb(void *arg) NNI_ASSERT(txaio->a_niov != 0); if (txaio->a_iov[0].iov_len > n) { txaio->a_iov[0].iov_len -= n; - txaio->a_iov[0].iov_buf += n; + NNI_INCPTR(txaio->a_iov[0].iov_buf, n); break; } n -= txaio->a_iov[0].iov_len; @@ -280,7 +280,7 @@ nni_tcp_pipe_recv_cb(void *arg) NNI_ASSERT(rxaio->a_niov != 0); if (rxaio->a_iov[0].iov_len > n) { rxaio->a_iov[0].iov_len -= n; - rxaio->a_iov[0].iov_buf += n; + NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); break; } n -= rxaio->a_iov[0].iov_len; diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 6bc884e7..f6c5bc6e 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -238,7 +238,7 @@ nni_tls_pipe_send_cb(void *arg) NNI_ASSERT(txaio->a_niov != 0); if (txaio->a_iov[0].iov_len > n) { txaio->a_iov[0].iov_len -= n; - txaio->a_iov[0].iov_buf += n; + NNI_INCPTR(txaio->a_iov[0].iov_buf, n); break; } n -= txaio->a_iov[0].iov_len; @@ -288,7 +288,7 @@ nni_tls_pipe_recv_cb(void *arg) NNI_ASSERT(rxaio->a_niov != 0); if (rxaio->a_iov[0].iov_len > n) { rxaio->a_iov[0].iov_len -= n; - rxaio->a_iov[0].iov_buf += n; + NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); break; } n -= rxaio->a_iov[0].iov_len; |
