aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-13 21:10:03 -0800
committerGarrett D'Amore <garrett@damore.org>2017-11-13 21:10:03 -0800
commite8694d15d0a108895bf869f292d59e11d834361e (patch)
treed87b8d396953fee653fbcbee92521395d0cec1fe /src
parentac6019bfabac887274fb9d8b2a167df940ba6121 (diff)
downloadnng-e8694d15d0a108895bf869f292d59e11d834361e.tar.gz
nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.bz2
nng-e8694d15d0a108895bf869f292d59e11d834361e.zip
fixes #154 underlyng TCP & IPC transports should support partial recv/send
fixes #155 POSIX TCP & IPC could avoid a lot of context switches
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/platform.h13
-rw-r--r--src/platform/posix/posix_epdesc.c6
-rw-r--r--src/platform/posix/posix_pipedesc.c128
-rw-r--r--src/platform/windows/win_ipc.c59
-rw-r--r--src/platform/windows/win_tcp.c55
-rw-r--r--src/transport/ipc/ipc.c150
-rw-r--r--src/transport/tcp/tcp.c137
8 files changed, 290 insertions, 262 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 57f7f06a..5a9ded92 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -62,8 +62,8 @@ typedef void (*nni_cb)(void *);
// Used by transports for scatter gather I/O.
typedef struct {
- void * iov_buf;
- size_t iov_len;
+ uint8_t *iov_buf;
+ size_t iov_len;
} nni_iov;
// Notify descriptor.
diff --git a/src/core/platform.h b/src/core/platform.h
index bff7f709..9e193175 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -229,8 +229,17 @@ extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *);
// The platform may modify the iovs.
extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *);
-// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs.
-// The platform may modify the iovs.
+// nni_plat_tcp_pipe_recv receives data into the buffers provided by the
+// I/O vector (iovs). The platform should attempt to scatter the received
+// data into the iovs if possible.
+//
+// It is an error for the caller to supply any IO vector elements with
+// zero length.
+//
+// It is possible for the TCP reader to return less data than is requested,
+// in which case the caller is responsible for resubmitting. The platform
+// should not return "zero" data however. (It is an error to attempt to
+// receive zero bytes.) The platform may not modify the I/O vector.
extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *);
// nni_plat_tcp_pipe_peername gets the peer name.
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 0524fe30..80711ed0 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -279,6 +279,12 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
}
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
+
if (bind(fd, (struct sockaddr *) ss, len) < 0) {
nni_mtx_unlock(&ed->mtx);
rv = nni_plat_errno(errno);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index d25a9d96..6ae0d752 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -66,93 +66,75 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- nni_aio * aio;
+ int n;
+ struct iovec iovec[4];
+ nni_aio * aio;
+ int niov;
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- for (i = 0; i < aio->a_niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ int i;
+ for (niov = 0, i = 0; i < aio->a_niov; i++) {
+ iovec[niov].iov_len = aio->a_iov[i].iov_len;
+ iovec[niov].iov_base = aio->a_iov[i].iov_buf;
+ niov++;
+ }
+ if (niov == 0) {
+ nni_posix_pipedesc_finish(aio, NNG_EINVAL);
+ continue;
}
- iovp = &iovec[0];
- rv = 0;
- n = writev(pd->node.fd, iovp, aio->a_niov);
+ n = writev(pd->node.fd, iovec, niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
// on this fd for now.
return;
}
- rv = nni_plat_errno(errno);
-
- nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
nni_posix_pipedesc_doclose(pd);
return;
}
aio->a_count += n;
- while (n > 0) {
- // If we didn't write the first full iov,
- // then we're done for now. Record progress
- // and return to caller.
- if (n < aio->a_iov[0].iov_len) {
- aio->a_iov[0].iov_buf += n;
- aio->a_iov[0].iov_len -= n;
- return;
- }
-
- // We consumed the full iovec, so just move the
- // remaininng ones up, and decrement count handled.
- n -= aio->a_iov[0].iov_len;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- NNI_ASSERT(aio->a_niov > 0);
- aio->a_niov--;
- }
-
// We completed the entire operation on this aioq.
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
- // aioq ready for us to process.
+ // aio ready for us to process.
}
}
static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- nni_aio * aio;
+ int n;
+ struct iovec iovec[4];
+ nni_aio * aio;
+ int niov;
while ((aio = nni_list_first(&pd->readq)) != NULL) {
- for (i = 0; i < aio->a_niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ int i;
+ for (i = 0, niov = 0; i < aio->a_niov; i++) {
+ if (aio->a_iov[i].iov_len != 0) {
+ iovec[niov].iov_len = aio->a_iov[i].iov_len;
+ iovec[niov].iov_base = aio->a_iov[i].iov_buf;
+ niov++;
+ }
+ }
+ if (niov == 0) {
+ nni_posix_pipedesc_finish(aio, NNG_EINVAL);
+ continue;
}
- iovp = &iovec[0];
- rv = 0;
- n = readv(pd->node.fd, iovp, aio->a_niov);
+ n = readv(pd->node.fd, iovec, niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
// on this fd for now.
return;
}
- rv = nni_plat_errno(errno);
-
- nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
nni_posix_pipedesc_doclose(pd);
return;
}
@@ -160,36 +142,17 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
if (n == 0) {
// No bytes indicates a closed descriptor.
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ nni_posix_pipedesc_doclose(pd);
return;
}
aio->a_count += n;
- while (n > 0) {
- // If we didn't write the first full iov,
- // then we're done for now. Record progress
- // and return to caller.
- if (n < aio->a_iov[0].iov_len) {
- aio->a_iov[0].iov_buf += n;
- aio->a_iov[0].iov_len -= n;
- return;
- }
-
- // We consumed the full iovec, so just move the
- // remaininng ones up, and decrement count handled.
- n -= aio->a_iov[0].iov_len;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- NNI_ASSERT(aio->a_niov > 0);
- aio->a_niov--;
- }
-
// We completed the entire operation on this aioq.
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
- // aioq ready for us to process.
+ // aio ready for us to process.
}
}
@@ -262,7 +225,17 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
}
nni_aio_list_append(&pd->readq, aio);
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ // If we are only job on the list, go ahead and try to do an immediate
+ // transfer. This allows for faster completions in many cases. We
+ // also need not arm a list if it was already armed.
+ if (nni_list_first(&pd->readq) == aio) {
+ nni_posix_pipedesc_doread(pd);
+ // If we are still the first thing on the list, that means we
+ // didn't finish the job, so arm the poller to complete us.
+ if (nni_list_first(&pd->readq) == aio) {
+ nni_posix_pollq_arm(&pd->node, POLLIN);
+ }
+ }
nni_mtx_unlock(&pd->mtx);
}
@@ -283,7 +256,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
}
nni_aio_list_append(&pd->writeq, aio);
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ if (nni_list_first(&pd->writeq) == aio) {
+ nni_posix_pipedesc_dowrite(pd);
+ // If we are still the first thing on the list, that means we
+ // didn't finish the job, so arm the poller to complete us.
+ if (nni_list_first(&pd->writeq) == aio) {
+ nni_posix_pollq_arm(&pd->node, POLLOUT);
+ }
+ }
nni_mtx_unlock(&pd->mtx);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c1e2cd31..68a6ea2c 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -59,11 +59,10 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
BOOL ok;
int rv;
nni_plat_ipc_pipe *pipe = evt->ptr;
+ int idx;
NNI_ASSERT(aio != NULL);
NNI_ASSERT(aio->a_niov > 0);
- NNI_ASSERT(aio->a_iov[0].iov_len > 0);
- NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
if (pipe->p == INVALID_HANDLE_VALUE) {
evt->status = NNG_ECLOSED;
@@ -71,13 +70,20 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio)
return (1);
}
+ idx = 0;
+ while ((idx < aio->a_niov) && (aio->a_iov[idx].iov_len == 0)) {
+ idx++;
+ }
+ NNI_ASSERT(idx < aio->a_niov);
// Now start a transfer. We assume that only one send can be
// outstanding on a pipe at a time. This is important to avoid
// scrambling the data anyway. Note that Windows named pipes do
// not appear to support scatter/gather, so we have to process
// each element in turn.
- buf = aio->a_iov[0].iov_buf;
- len = (DWORD) aio->a_iov[0].iov_len;
+ buf = aio->a_iov[idx].iov_buf;
+ len = (DWORD) aio->a_iov[idx].iov_len;
+ NNI_ASSERT(buf != NULL);
+ NNI_ASSERT(len != 0);
// We limit ourselves to writing 16MB at a time. Named Pipes
// on Windows have limits of between 31 and 64MB.
@@ -115,50 +121,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt)
static void
nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio)
{
- int rv;
- DWORD cnt;
-
- cnt = evt->count;
- if ((rv = evt->status) == 0) {
-
- int i;
- aio->a_count += cnt;
-
- while (cnt > 0) {
- // If we didn't transfer the first full iov,
- // then we're done for now. Record progress
- // and move on.
- if (cnt < aio->a_iov[0].iov_len) {
- aio->a_iov[0].iov_len -= cnt;
- aio->a_iov[0].iov_buf =
- (char *) aio->a_iov[0].iov_buf + cnt;
- break;
- }
-
- // We consumed the full iov, so just move the
- // remaining ones up, and decrement count handled.
- cnt -= aio->a_iov[0].iov_len;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- NNI_ASSERT(aio->a_niov > 0);
- aio->a_niov--;
- }
-
- while (aio->a_niov > 0) {
- // If we have more to do, submit it!
- if (aio->a_iov[0].iov_len > 0) {
- nni_win_event_resubmit(evt, aio);
- return;
- }
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- }
- }
-
- // All done; hopefully successfully.
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, evt->status, evt->count);
}
static int
diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c
index 375a72c3..01818af4 100644
--- a/src/platform/windows/win_tcp.c
+++ b/src/platform/windows/win_tcp.c
@@ -112,12 +112,13 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
NNI_ASSERT(aio->a_iov[0].iov_len > 0);
NNI_ASSERT(aio->a_iov[0].iov_buf != NULL);
- niov = aio->a_niov;
-
// Put the AIOs in Windows form.
- for (i = 0; i < aio->a_niov; i++) {
- iov[i].buf = aio->a_iov[i].iov_buf;
- iov[i].len = (ULONG) aio->a_iov[i].iov_len;
+ for (niov = 0, i = 0; i < aio->a_niov; i++) {
+ if (aio->a_iov[i].iov_len != 0) {
+ iov[niov].buf = aio->a_iov[i].iov_buf;
+ iov[niov].len = (ULONG) aio->a_iov[i].iov_len;
+ niov++;
+ }
}
if ((s = pipe->s) == INVALID_SOCKET) {
@@ -162,49 +163,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt)
static void
nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio)
{
- int rv;
- size_t cnt;
-
- cnt = evt->count;
- if ((rv = evt->status) == 0) {
- int i;
- aio->a_count += cnt;
-
- while (cnt > 0) {
- // If we didn't transfer the first full iov,
- // then we're done for now. Record progress
- // and move on.
- if (cnt < aio->a_iov[0].iov_len) {
- aio->a_iov[0].iov_len -= cnt;
- aio->a_iov[0].iov_buf =
- (char *) aio->a_iov[0].iov_buf + cnt;
- break;
- }
-
- // We consumed the full iov, so just move the
- // remaining ones up, and decrement count handled.
- cnt -= aio->a_iov[0].iov_len;
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- NNI_ASSERT(aio->a_niov > 0);
- aio->a_niov--;
- }
-
- while (aio->a_niov > 0) {
- // If we have more to do, submit it!
- if (aio->a_iov[0].iov_len > 0) {
- nni_win_event_resubmit(evt, aio);
- return;
- }
- for (i = 1; i < aio->a_niov; i++) {
- aio->a_iov[i - 1] = aio->a_iov[i];
- }
- }
- }
-
- // All done; hopefully successfully.
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, evt->status, evt->count);
}
static int
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c13dbd34..32ff1c0e 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -207,25 +207,52 @@ nni_ipc_pipe_send_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
nni_aio * aio;
+ nni_aio * txaio = pipe->txaio;
+ nni_msg * msg;
int rv;
- size_t len;
+ size_t n;
nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- pipe->user_txaio = NULL;
- if ((rv = nni_aio_result(pipe->txaio)) != 0) {
- len = 0;
- } else {
- nni_msg *msg = nni_aio_get_msg(aio);
- len = nni_msg_len(msg);
- nni_msg_free(msg);
+
+ if ((rv = nni_aio_result(txaio)) != 0) {
+ pipe->user_txaio = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+ msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ n = nni_aio_count(txaio);
+ while (n) {
+ NNI_ASSERT(txaio->a_niov != 0);
+ if (txaio->a_iov[0].iov_len > n) {
+ txaio->a_iov[0].iov_len -= n;
+ txaio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= txaio->a_iov[0].iov_len;
+ for (int i = 0; i < txaio->a_niov; i++) {
+ txaio->a_iov[i] = txaio->a_iov[i + 1];
+ }
+ txaio->a_niov--;
+ }
+ if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_plat_ipc_pipe_send(pipe->ipp, txaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
}
- nni_aio_finish(aio, rv, len);
+
nni_mtx_unlock(&pipe->mtx);
+ msg = nni_aio_get_msg(aio);
+ n = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, n);
}
static void
@@ -234,7 +261,9 @@ nni_ipc_pipe_recv_cb(void *arg)
nni_ipc_pipe *pipe = arg;
nni_aio * aio;
int rv;
+ size_t n;
nni_msg * msg;
+ nni_aio * rxaio = pipe->rxaio;
nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_rxaio) == NULL) {
@@ -243,32 +272,45 @@ nni_ipc_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
- if (pipe->rxmsg != NULL) {
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
+ goto recv_error;
+ }
+
+ n = nni_aio_count(rxaio);
+ while (n) {
+ NNI_ASSERT(rxaio->a_niov != 0);
+ if (rxaio->a_iov[0].iov_len > n) {
+ rxaio->a_iov[0].iov_len -= n;
+ rxaio->a_iov[0].iov_buf += n;
+ break;
}
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
+ n -= rxaio->a_iov[0].iov_len;
+ for (int i = 0; i < rxaio->a_niov; i++) {
+ rxaio->a_iov[i] = rxaio->a_iov[i + 1];
+ }
+ rxaio->a_niov--;
+ }
+
+ // Was this a partial read? If so then resubmit for the rest.
+ if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
- // If we don't have a message yet, we were reading the TCP message
+ // If we don't have a message yet, we were reading the message
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (pipe->rxmsg == NULL) {
uint64_t len;
- nni_aio *rxaio;
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
- nni_aio_finish_error(aio, NNG_EPROTO);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ rv = NNG_EPROTO;
+ goto recv_error;
}
// We should have gotten a message header.
@@ -277,10 +319,8 @@ nni_ipc_pipe_recv_cb(void *arg)
// Make sure the message payload is not too big. If it is
// the caller will shut down the pipe.
if (len > pipe->rcvmax) {
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, NNG_EMSGSIZE);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ rv = NNG_EMSGSIZE;
+ goto recv_error;
}
// Note that all IO on this pipe is blocked behind this
@@ -289,22 +329,20 @@ nni_ipc_pipe_recv_cb(void *arg)
// transmits to proceed normally. In practice this is
// unlikely to be much of an issue though.
if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
- pipe->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ goto recv_error;
}
- // Submit the rest of the data for a read -- we want to
- // read the entire message now.
- rxaio = pipe->rxaio;
- rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- rxaio->a_niov = 1;
+ if (len != 0) {
+ // Submit the rest of the data for a read -- we want to
+ // read the entire message now.
+ rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ rxaio->a_iov[0].iov_len = (size_t) len;
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
- nni_mtx_unlock(&pipe->mtx);
- return;
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
+ nni_mtx_unlock(&pipe->mtx);
+ return;
+ }
}
// Otherwise we got a message read completely. Let the user know the
@@ -312,8 +350,18 @@ nni_ipc_pipe_recv_cb(void *arg)
pipe->user_rxaio = NULL;
msg = pipe->rxmsg;
pipe->rxmsg = NULL;
+ nni_mtx_unlock(&pipe->mtx);
+
nni_aio_finish_msg(aio, msg);
+ return;
+
+recv_error:
+ pipe->user_rxaio = NULL;
+ msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
nni_mtx_unlock(&pipe->mtx);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -340,6 +388,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
nni_aio * txaio;
+ int niov;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -354,14 +403,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- txaio = pipe->txaio;
- txaio->a_iov[0].iov_buf = pipe->txhead;
- txaio->a_iov[0].iov_len = sizeof(pipe->txhead);
- txaio->a_iov[1].iov_buf = nni_msg_header(msg);
- txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
- txaio->a_iov[2].iov_buf = nni_msg_body(msg);
- txaio->a_iov[2].iov_len = nni_msg_len(msg);
- txaio->a_niov = 3;
+ txaio = pipe->txaio;
+ niov = 0;
+ txaio->a_iov[niov].iov_buf = pipe->txhead;
+ txaio->a_iov[niov].iov_len = sizeof(pipe->txhead);
+ niov++;
+ if (nni_msg_header_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ niov++;
+ }
+ if (nni_msg_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ niov++;
+ }
+ txaio->a_niov = niov;
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
@@ -625,7 +682,8 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report either.
+ // If we can't start, then its dying and we can't report
+ // either.
if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
return;
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index e33db865..d6a51faa 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -207,24 +207,51 @@ nni_tcp_pipe_send_cb(void *arg)
nni_tcp_pipe *p = arg;
int rv;
nni_aio * aio;
- size_t len;
+ size_t n;
+ nng_msg * msg;
+ nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
if ((aio = p->user_txaio) == NULL) {
nni_mtx_unlock(&p->mtx);
return;
}
- p->user_txaio = NULL;
- if ((rv = nni_aio_result(p->txaio)) != 0) {
- len = 0;
- } else {
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(nni_aio_get_msg(aio));
+ if ((rv = nni_aio_result(txaio)) != 0) {
+ p->user_txaio = NULL;
+ nni_mtx_unlock(&p->mtx);
+ msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ n = nni_aio_count(txaio);
+ while (n) {
+ NNI_ASSERT(txaio->a_niov != 0);
+ if (txaio->a_iov[0].iov_len > n) {
+ txaio->a_iov[0].iov_len -= n;
+ txaio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= txaio->a_iov[0].iov_len;
+ for (int i = 0; i < txaio->a_niov; i++) {
+ txaio->a_iov[i] = txaio->a_iov[i + 1];
+ }
+ txaio->a_niov--;
}
- nni_aio_finish(aio, 0, len);
+ if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_plat_tcp_pipe_send(p->tpp, txaio);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+
nni_mtx_unlock(&p->mtx);
+ msg = nni_aio_get_msg(aio);
+ n = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, n);
}
static void
@@ -233,26 +260,39 @@ nni_tcp_pipe_recv_cb(void *arg)
nni_tcp_pipe *p = arg;
nni_aio * aio;
int rv;
+ size_t n;
nni_msg * msg;
+ nni_aio * rxaio = p->rxaio;
nni_mtx_lock(&p->mtx);
- aio = p->user_rxaio;
- if (aio == NULL) {
+ if ((aio = p->user_rxaio) == NULL) {
+ // Canceled.
nni_mtx_unlock(&p->mtx);
return;
}
if ((rv = nni_aio_result(p->rxaio)) != 0) {
- // Error on receive. This has to cause an error back
- // to the user. Also, if we had allocated an rxmsg, lets
- // toss it.
- if (p->rxmsg != NULL) {
- nni_msg_free(p->rxmsg);
- p->rxmsg = NULL;
+ goto recv_error;
+ }
+
+ n = nni_aio_count(p->rxaio);
+ while (n) {
+ NNI_ASSERT(rxaio->a_niov != 0);
+ if (rxaio->a_iov[0].iov_len > n) {
+ rxaio->a_iov[0].iov_len -= n;
+ rxaio->a_iov[0].iov_buf += n;
+ break;
}
- p->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
+ n -= rxaio->a_iov[0].iov_len;
+ rxaio->a_niov--;
+ for (int i = 0; i < rxaio->a_niov; i++) {
+ rxaio->a_iov[i] = rxaio->a_iov[i + 1];
+ }
+ }
+ // Was this a partial read? If so then resubmit for the rest.
+ if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -261,7 +301,6 @@ nni_tcp_pipe_recv_cb(void *arg)
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (p->rxmsg == NULL) {
- nni_aio *rxaio;
uint64_t len;
// We should have gotten a message header.
NNI_GET64(p->rxlen, len);
@@ -269,37 +308,42 @@ nni_tcp_pipe_recv_cb(void *arg)
// Make sure the message payload is not too big. If it is
// the caller will shut down the pipe.
if (len > p->rcvmax) {
- p->user_rxaio = NULL;
- nni_aio_finish_error(aio, NNG_EMSGSIZE);
- nni_mtx_unlock(&p->mtx);
- return;
+ rv = NNG_EMSGSIZE;
+ goto recv_error;
}
if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
- p->user_rxaio = NULL;
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&p->mtx);
- return;
+ goto recv_error;
}
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- rxaio = p->rxaio;
- rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg);
- rxaio->a_niov = 1;
+ if (len != 0) {
+ rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
+ rxaio->a_iov[0].iov_len = (size_t) len;
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, rxaio);
- nni_mtx_unlock(&p->mtx);
- return;
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
}
// We read a message completely. Let the user know the good news.
p->user_rxaio = NULL;
msg = p->rxmsg;
p->rxmsg = NULL;
+ nni_mtx_unlock(&p->mtx);
nni_aio_finish_msg(aio, msg);
+ return;
+
+recv_error:
+ p->user_rxaio = NULL;
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
nni_mtx_unlock(&p->mtx);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -327,6 +371,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
nni_aio * txaio;
+ int niov;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -341,14 +386,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- txaio = p->txaio;
- txaio->a_iov[0].iov_buf = p->txlen;
- txaio->a_iov[0].iov_len = sizeof(p->txlen);
- txaio->a_iov[1].iov_buf = nni_msg_header(msg);
- txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
- txaio->a_iov[2].iov_buf = nni_msg_body(msg);
- txaio->a_iov[2].iov_len = nni_msg_len(msg);
- txaio->a_niov = 3;
+ niov = 0;
+ txaio = p->txaio;
+ txaio->a_iov[niov].iov_buf = p->txlen;
+ txaio->a_iov[niov].iov_len = sizeof(p->txlen);
+ niov++;
+ if (nni_msg_header_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ niov++;
+ }
+ if (nni_msg_len(msg) > 0) {
+ txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ niov++;
+ }
+ txaio->a_niov = niov;
nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);