diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-11-13 21:10:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-13 21:10:03 -0800 |
| commit | e8694d15d0a108895bf869f292d59e11d834361e (patch) | |
| tree | d87b8d396953fee653fbcbee92521395d0cec1fe | |
| parent | ac6019bfabac887274fb9d8b2a167df940ba6121 (diff) | |
| download | nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.gz nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.bz2 nng-e8694d15d0a108895bf869f292d59e11d834361e.zip | |
fixes #154 underlyng TCP & IPC transports should support partial recv/send
fixes #155 POSIX TCP & IPC could avoid a lot of context switches
| -rw-r--r-- | src/core/defs.h | 4 | ||||
| -rw-r--r-- | src/core/platform.h | 13 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 128 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 59 | ||||
| -rw-r--r-- | src/platform/windows/win_tcp.c | 55 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 150 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 137 | ||||
| -rw-r--r-- | tests/udp.c | 20 |
9 files changed, 300 insertions, 272 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 57f7f06a..5a9ded92 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -62,8 +62,8 @@ typedef void (*nni_cb)(void *); // Used by transports for scatter gather I/O. typedef struct { - void * iov_buf; - size_t iov_len; + uint8_t *iov_buf; + size_t iov_len; } nni_iov; // Notify descriptor. diff --git a/src/core/platform.h b/src/core/platform.h index bff7f709..9e193175 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -229,8 +229,17 @@ extern void nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *); // The platform may modify the iovs. extern void nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *, nni_aio *); -// nni_plat_tcp_pipe_recv recvs data into the buffers provided by the iovs. -// The platform may modify the iovs. +// nni_plat_tcp_pipe_recv receives data into the buffers provided by the +// I/O vector (iovs). The platform should attempt to scatter the received +// data into the iovs if possible. +// +// It is an error for the caller to supply any IO vector elements with +// zero length. +// +// It is possible for the TCP reader to return less data than is requested, +// in which case the caller is responsible for resubmitting. The platform +// should not return "zero" data however. (It is an error to attempt to +// receive zero bytes.) The platform may not modify the I/O vector. extern void nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *, nni_aio *); // nni_plat_tcp_pipe_peername gets the peer name. diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 0524fe30..80711ed0 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -279,6 +279,12 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) } (void) fcntl(fd, F_SETFD, FD_CLOEXEC); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif + if (bind(fd, (struct sockaddr *) ss, len) < 0) { nni_mtx_unlock(&ed->mtx); rv = nni_plat_errno(errno); diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index d25a9d96..6ae0d752 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -66,93 +66,75 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) static void nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) { - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - nni_aio * aio; + int n; + struct iovec iovec[4]; + nni_aio * aio; + int niov; while ((aio = nni_list_first(&pd->writeq)) != NULL) { - for (i = 0; i < aio->a_niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; + int i; + for (niov = 0, i = 0; i < aio->a_niov; i++) { + iovec[niov].iov_len = aio->a_iov[i].iov_len; + iovec[niov].iov_base = aio->a_iov[i].iov_buf; + niov++; + } + if (niov == 0) { + nni_posix_pipedesc_finish(aio, NNG_EINVAL); + continue; } - iovp = &iovec[0]; - rv = 0; - n = writev(pd->node.fd, iovp, aio->a_niov); + n = writev(pd->node.fd, iovec, niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done // on this fd for now. return; } - rv = nni_plat_errno(errno); - - nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); nni_posix_pipedesc_doclose(pd); return; } aio->a_count += n; - while (n > 0) { - // If we didn't write the first full iov, - // then we're done for now. Record progress - // and return to caller. - if (n < aio->a_iov[0].iov_len) { - aio->a_iov[0].iov_buf += n; - aio->a_iov[0].iov_len -= n; - return; - } - - // We consumed the full iovec, so just move the - // remaininng ones up, and decrement count handled. - n -= aio->a_iov[0].iov_len; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - NNI_ASSERT(aio->a_niov > 0); - aio->a_niov--; - } - // We completed the entire operation on this aioq. nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another - // aioq ready for us to process. + // aio ready for us to process. } } static void nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) { - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - nni_aio * aio; + int n; + struct iovec iovec[4]; + nni_aio * aio; + int niov; while ((aio = nni_list_first(&pd->readq)) != NULL) { - for (i = 0; i < aio->a_niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; + int i; + for (i = 0, niov = 0; i < aio->a_niov; i++) { + if (aio->a_iov[i].iov_len != 0) { + iovec[niov].iov_len = aio->a_iov[i].iov_len; + iovec[niov].iov_base = aio->a_iov[i].iov_buf; + niov++; + } + } + if (niov == 0) { + nni_posix_pipedesc_finish(aio, NNG_EINVAL); + continue; } - iovp = &iovec[0]; - rv = 0; - n = readv(pd->node.fd, iovp, aio->a_niov); + n = readv(pd->node.fd, iovec, niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done // on this fd for now. return; } - rv = nni_plat_errno(errno); - - nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); nni_posix_pipedesc_doclose(pd); return; } @@ -160,36 +142,17 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) if (n == 0) { // No bytes indicates a closed descriptor. nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + nni_posix_pipedesc_doclose(pd); return; } aio->a_count += n; - while (n > 0) { - // If we didn't write the first full iov, - // then we're done for now. Record progress - // and return to caller. - if (n < aio->a_iov[0].iov_len) { - aio->a_iov[0].iov_buf += n; - aio->a_iov[0].iov_len -= n; - return; - } - - // We consumed the full iovec, so just move the - // remaininng ones up, and decrement count handled. - n -= aio->a_iov[0].iov_len; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - NNI_ASSERT(aio->a_niov > 0); - aio->a_niov--; - } - // We completed the entire operation on this aioq. nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another - // aioq ready for us to process. + // aio ready for us to process. } } @@ -262,7 +225,17 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) } nni_aio_list_append(&pd->readq, aio); - nni_posix_pollq_arm(&pd->node, POLLIN); + // If we are only job on the list, go ahead and try to do an immediate + // transfer. This allows for faster completions in many cases. We + // also need not arm a list if it was already armed. + if (nni_list_first(&pd->readq) == aio) { + nni_posix_pipedesc_doread(pd); + // If we are still the first thing on the list, that means we + // didn't finish the job, so arm the poller to complete us. + if (nni_list_first(&pd->readq) == aio) { + nni_posix_pollq_arm(&pd->node, POLLIN); + } + } nni_mtx_unlock(&pd->mtx); } @@ -283,7 +256,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) } nni_aio_list_append(&pd->writeq, aio); - nni_posix_pollq_arm(&pd->node, POLLOUT); + if (nni_list_first(&pd->writeq) == aio) { + nni_posix_pipedesc_dowrite(pd); + // If we are still the first thing on the list, that means we + // didn't finish the job, so arm the poller to complete us. + if (nni_list_first(&pd->writeq) == aio) { + nni_posix_pollq_arm(&pd->node, POLLOUT); + } + } nni_mtx_unlock(&pd->mtx); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c1e2cd31..68a6ea2c 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -59,11 +59,10 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) BOOL ok; int rv; nni_plat_ipc_pipe *pipe = evt->ptr; + int idx; NNI_ASSERT(aio != NULL); NNI_ASSERT(aio->a_niov > 0); - NNI_ASSERT(aio->a_iov[0].iov_len > 0); - NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); if (pipe->p == INVALID_HANDLE_VALUE) { evt->status = NNG_ECLOSED; @@ -71,13 +70,20 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) return (1); } + idx = 0; + while ((idx < aio->a_niov) && (aio->a_iov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < aio->a_niov); // Now start a transfer. We assume that only one send can be // outstanding on a pipe at a time. This is important to avoid // scrambling the data anyway. Note that Windows named pipes do // not appear to support scatter/gather, so we have to process // each element in turn. - buf = aio->a_iov[0].iov_buf; - len = (DWORD) aio->a_iov[0].iov_len; + buf = aio->a_iov[idx].iov_buf; + len = (DWORD) aio->a_iov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); // We limit ourselves to writing 16MB at a time. Named Pipes // on Windows have limits of between 31 and 64MB. @@ -115,50 +121,7 @@ nni_win_ipc_pipe_cancel(nni_win_event *evt) static void nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) { - int rv; - DWORD cnt; - - cnt = evt->count; - if ((rv = evt->status) == 0) { - - int i; - aio->a_count += cnt; - - while (cnt > 0) { - // If we didn't transfer the first full iov, - // then we're done for now. Record progress - // and move on. - if (cnt < aio->a_iov[0].iov_len) { - aio->a_iov[0].iov_len -= cnt; - aio->a_iov[0].iov_buf = - (char *) aio->a_iov[0].iov_buf + cnt; - break; - } - - // We consumed the full iov, so just move the - // remaining ones up, and decrement count handled. - cnt -= aio->a_iov[0].iov_len; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - NNI_ASSERT(aio->a_niov > 0); - aio->a_niov--; - } - - while (aio->a_niov > 0) { - // If we have more to do, submit it! - if (aio->a_iov[0].iov_len > 0) { - nni_win_event_resubmit(evt, aio); - return; - } - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - } - } - - // All done; hopefully successfully. - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, evt->status, evt->count); } static int diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c index 375a72c3..01818af4 100644 --- a/src/platform/windows/win_tcp.c +++ b/src/platform/windows/win_tcp.c @@ -112,12 +112,13 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) NNI_ASSERT(aio->a_iov[0].iov_len > 0); NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - niov = aio->a_niov; - // Put the AIOs in Windows form. - for (i = 0; i < aio->a_niov; i++) { - iov[i].buf = aio->a_iov[i].iov_buf; - iov[i].len = (ULONG) aio->a_iov[i].iov_len; + for (niov = 0, i = 0; i < aio->a_niov; i++) { + if (aio->a_iov[i].iov_len != 0) { + iov[niov].buf = aio->a_iov[i].iov_buf; + iov[niov].len = (ULONG) aio->a_iov[i].iov_len; + niov++; + } } if ((s = pipe->s) == INVALID_SOCKET) { @@ -162,49 +163,7 @@ nni_win_tcp_pipe_cancel(nni_win_event *evt) static void nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) { - int rv; - size_t cnt; - - cnt = evt->count; - if ((rv = evt->status) == 0) { - int i; - aio->a_count += cnt; - - while (cnt > 0) { - // If we didn't transfer the first full iov, - // then we're done for now. Record progress - // and move on. - if (cnt < aio->a_iov[0].iov_len) { - aio->a_iov[0].iov_len -= cnt; - aio->a_iov[0].iov_buf = - (char *) aio->a_iov[0].iov_buf + cnt; - break; - } - - // We consumed the full iov, so just move the - // remaining ones up, and decrement count handled. - cnt -= aio->a_iov[0].iov_len; - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - NNI_ASSERT(aio->a_niov > 0); - aio->a_niov--; - } - - while (aio->a_niov > 0) { - // If we have more to do, submit it! - if (aio->a_iov[0].iov_len > 0) { - nni_win_event_resubmit(evt, aio); - return; - } - for (i = 1; i < aio->a_niov; i++) { - aio->a_iov[i - 1] = aio->a_iov[i]; - } - } - } - - // All done; hopefully successfully. - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, evt->status, evt->count); } static int diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c13dbd34..32ff1c0e 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -207,25 +207,52 @@ nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; nni_aio * aio; + nni_aio * txaio = pipe->txaio; + nni_msg * msg; int rv; - size_t len; + size_t n; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_txaio = NULL; - if ((rv = nni_aio_result(pipe->txaio)) != 0) { - len = 0; - } else { - nni_msg *msg = nni_aio_get_msg(aio); - len = nni_msg_len(msg); - nni_msg_free(msg); + + if ((rv = nni_aio_result(txaio)) != 0) { + pipe->user_txaio = NULL; + nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + while (n) { + NNI_ASSERT(txaio->a_niov != 0); + if (txaio->a_iov[0].iov_len > n) { + txaio->a_iov[0].iov_len -= n; + txaio->a_iov[0].iov_buf += n; + break; + } + n -= txaio->a_iov[0].iov_len; + for (int i = 0; i < txaio->a_niov; i++) { + txaio->a_iov[i] = txaio->a_iov[i + 1]; + } + txaio->a_niov--; + } + if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_plat_ipc_pipe_send(pipe->ipp, txaio); + nni_mtx_unlock(&pipe->mtx); + return; } - nni_aio_finish(aio, rv, len); + nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, n); } static void @@ -234,7 +261,9 @@ nni_ipc_pipe_recv_cb(void *arg) nni_ipc_pipe *pipe = arg; nni_aio * aio; int rv; + size_t n; nni_msg * msg; + nni_aio * rxaio = pipe->rxaio; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_rxaio) == NULL) { @@ -243,32 +272,45 @@ nni_ipc_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(pipe->rxaio)) != 0) { + if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. - if (pipe->rxmsg != NULL) { - nni_msg_free(pipe->rxmsg); - pipe->rxmsg = NULL; + goto recv_error; + } + + n = nni_aio_count(rxaio); + while (n) { + NNI_ASSERT(rxaio->a_niov != 0); + if (rxaio->a_iov[0].iov_len > n) { + rxaio->a_iov[0].iov_len -= n; + rxaio->a_iov[0].iov_buf += n; + break; } - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); + n -= rxaio->a_iov[0].iov_len; + for (int i = 0; i < rxaio->a_niov; i++) { + rxaio->a_iov[i] = rxaio->a_iov[i + 1]; + } + rxaio->a_niov--; + } + + // Was this a partial read? If so then resubmit for the rest. + if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; } - // If we don't have a message yet, we were reading the TCP message + // If we don't have a message yet, we were reading the message // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (pipe->rxmsg == NULL) { uint64_t len; - nni_aio *rxaio; // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { - nni_aio_finish_error(aio, NNG_EPROTO); - nni_mtx_unlock(&pipe->mtx); - return; + rv = NNG_EPROTO; + goto recv_error; } // We should have gotten a message header. @@ -277,10 +319,8 @@ nni_ipc_pipe_recv_cb(void *arg) // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. if (len > pipe->rcvmax) { - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, NNG_EMSGSIZE); - nni_mtx_unlock(&pipe->mtx); - return; + rv = NNG_EMSGSIZE; + goto recv_error; } // Note that all IO on this pipe is blocked behind this @@ -289,22 +329,20 @@ nni_ipc_pipe_recv_cb(void *arg) // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&pipe->mtx); - return; + goto recv_error; } - // Submit the rest of the data for a read -- we want to - // read the entire message now. - rxaio = pipe->rxaio; - rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - rxaio->a_niov = 1; + if (len != 0) { + // Submit the rest of the data for a read -- we want to + // read the entire message now. + rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + rxaio->a_iov[0].iov_len = (size_t) len; + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); - return; + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); + nni_mtx_unlock(&pipe->mtx); + return; + } } // Otherwise we got a message read completely. Let the user know the @@ -312,8 +350,18 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->user_rxaio = NULL; msg = pipe->rxmsg; pipe->rxmsg = NULL; + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_msg(aio, msg); + return; + +recv_error: + pipe->user_rxaio = NULL; + msg = pipe->rxmsg; + pipe->rxmsg = NULL; nni_mtx_unlock(&pipe->mtx); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); } static void @@ -340,6 +388,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; nni_aio * txaio; + int niov; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -354,14 +403,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - txaio = pipe->txaio; - txaio->a_iov[0].iov_buf = pipe->txhead; - txaio->a_iov[0].iov_len = sizeof(pipe->txhead); - txaio->a_iov[1].iov_buf = nni_msg_header(msg); - txaio->a_iov[1].iov_len = nni_msg_header_len(msg); - txaio->a_iov[2].iov_buf = nni_msg_body(msg); - txaio->a_iov[2].iov_len = nni_msg_len(msg); - txaio->a_niov = 3; + txaio = pipe->txaio; + niov = 0; + txaio->a_iov[niov].iov_buf = pipe->txhead; + txaio->a_iov[niov].iov_len = sizeof(pipe->txhead); + niov++; + if (nni_msg_header_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_header(msg); + txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_body(msg); + txaio->a_iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + txaio->a_niov = niov; nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); @@ -625,7 +682,8 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - // If we can't start, then its dying and we can't report either. + // If we can't start, then its dying and we can't report + // either. if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { nni_mtx_unlock(&ep->mtx); return; diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e33db865..d6a51faa 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -207,24 +207,51 @@ nni_tcp_pipe_send_cb(void *arg) nni_tcp_pipe *p = arg; int rv; nni_aio * aio; - size_t len; + size_t n; + nng_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); if ((aio = p->user_txaio) == NULL) { nni_mtx_unlock(&p->mtx); return; } - p->user_txaio = NULL; - if ((rv = nni_aio_result(p->txaio)) != 0) { - len = 0; - } else { - len = nni_msg_len(aio->a_msg); - nni_msg_free(nni_aio_get_msg(aio)); + if ((rv = nni_aio_result(txaio)) != 0) { + p->user_txaio = NULL; + nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + while (n) { + NNI_ASSERT(txaio->a_niov != 0); + if (txaio->a_iov[0].iov_len > n) { + txaio->a_iov[0].iov_len -= n; + txaio->a_iov[0].iov_buf += n; + break; + } + n -= txaio->a_iov[0].iov_len; + for (int i = 0; i < txaio->a_niov; i++) { + txaio->a_iov[i] = txaio->a_iov[i + 1]; + } + txaio->a_niov--; } - nni_aio_finish(aio, 0, len); + if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_mtx_unlock(&p->mtx); + return; + } + nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, n); } static void @@ -233,26 +260,39 @@ nni_tcp_pipe_recv_cb(void *arg) nni_tcp_pipe *p = arg; nni_aio * aio; int rv; + size_t n; nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); - aio = p->user_rxaio; - if (aio == NULL) { + if ((aio = p->user_rxaio) == NULL) { + // Canceled. nni_mtx_unlock(&p->mtx); return; } if ((rv = nni_aio_result(p->rxaio)) != 0) { - // Error on receive. This has to cause an error back - // to the user. Also, if we had allocated an rxmsg, lets - // toss it. - if (p->rxmsg != NULL) { - nni_msg_free(p->rxmsg); - p->rxmsg = NULL; + goto recv_error; + } + + n = nni_aio_count(p->rxaio); + while (n) { + NNI_ASSERT(rxaio->a_niov != 0); + if (rxaio->a_iov[0].iov_len > n) { + rxaio->a_iov[0].iov_len -= n; + rxaio->a_iov[0].iov_buf += n; + break; } - p->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); + n -= rxaio->a_iov[0].iov_len; + rxaio->a_niov--; + for (int i = 0; i < rxaio->a_niov; i++) { + rxaio->a_iov[i] = rxaio->a_iov[i + 1]; + } + } + // Was this a partial read? If so then resubmit for the rest. + if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -261,7 +301,6 @@ nni_tcp_pipe_recv_cb(void *arg) // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (p->rxmsg == NULL) { - nni_aio *rxaio; uint64_t len; // We should have gotten a message header. NNI_GET64(p->rxlen, len); @@ -269,37 +308,42 @@ nni_tcp_pipe_recv_cb(void *arg) // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. if (len > p->rcvmax) { - p->user_rxaio = NULL; - nni_aio_finish_error(aio, NNG_EMSGSIZE); - nni_mtx_unlock(&p->mtx); - return; + rv = NNG_EMSGSIZE; + goto recv_error; } if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { - p->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&p->mtx); - return; + goto recv_error; } // Submit the rest of the data for a read -- we want to // read the entire message now. - rxaio = p->rxaio; - rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg); - rxaio->a_niov = 1; + if (len != 0) { + rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); + rxaio->a_iov[0].iov_len = (size_t) len; + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, rxaio); - nni_mtx_unlock(&p->mtx); - return; + nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } } // We read a message completely. Let the user know the good news. p->user_rxaio = NULL; msg = p->rxmsg; p->rxmsg = NULL; + nni_mtx_unlock(&p->mtx); nni_aio_finish_msg(aio, msg); + return; + +recv_error: + p->user_rxaio = NULL; + msg = p->rxmsg; + p->rxmsg = NULL; nni_mtx_unlock(&p->mtx); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); } static void @@ -327,6 +371,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; nni_aio * txaio; + int niov; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -341,14 +386,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - txaio = p->txaio; - txaio->a_iov[0].iov_buf = p->txlen; - txaio->a_iov[0].iov_len = sizeof(p->txlen); - txaio->a_iov[1].iov_buf = nni_msg_header(msg); - txaio->a_iov[1].iov_len = nni_msg_header_len(msg); - txaio->a_iov[2].iov_buf = nni_msg_body(msg); - txaio->a_iov[2].iov_len = nni_msg_len(msg); - txaio->a_niov = 3; + niov = 0; + txaio = p->txaio; + txaio->a_iov[niov].iov_buf = p->txlen; + txaio->a_iov[niov].iov_len = sizeof(p->txlen); + niov++; + if (nni_msg_header_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_header(msg); + txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_body(msg); + txaio->a_iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + txaio->a_niov = niov; nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); diff --git a/tests/udp.c b/tests/udp.c index 639d8da8..3097476f 100644 --- a/tests/udp.c +++ b/tests/udp.c @@ -67,12 +67,12 @@ TestMain("UDP support", { to = sa2; aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg; + aio1->a_iov[0].iov_buf = (void *) msg; aio1->a_iov[0].iov_len = strlen(msg) + 1; aio1->a_addr = &to; aio2->a_niov = 1; - aio2->a_iov[0].iov_buf = rbuf; + aio2->a_iov[0].iov_buf = (void *) rbuf; aio2->a_iov[0].iov_len = 1024; aio2->a_addr = &from; @@ -107,7 +107,7 @@ TestMain("UDP support", { nni_aio_init(&aio1, NULL, NULL); aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg; + aio1->a_iov[0].iov_buf = (void *) msg; aio1->a_iov[0].iov_len = strlen(msg) + 1; nni_plat_udp_send(u1, aio1); @@ -137,23 +137,23 @@ TestMain("UDP support", { to1 = sa2; aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg1; + aio1->a_iov[0].iov_buf = (void *) msg1; aio1->a_iov[0].iov_len = strlen(msg1) + 1; aio1->a_addr = &to1; to2 = sa2; aio2->a_niov = 1; - aio2->a_iov[0].iov_buf = msg2; + aio2->a_iov[0].iov_buf = (void *) msg2; aio2->a_iov[0].iov_len = strlen(msg2) + 1; aio2->a_addr = &to2; aio3->a_niov = 1; - aio3->a_iov[0].iov_buf = rbuf1; + aio3->a_iov[0].iov_buf = (void *) rbuf1; aio3->a_iov[0].iov_len = 1024; aio3->a_addr = &from1; aio4->a_niov = 1; - aio4->a_iov[0].iov_buf = rbuf2; + aio4->a_iov[0].iov_buf = (void *) rbuf2; aio4->a_iov[0].iov_len = 1024; aio4->a_addr = &from2; @@ -192,7 +192,7 @@ TestMain("UDP support", { nni_aio_init(&aio1, NULL, NULL); aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg; + aio1->a_iov[0].iov_buf = (void *) msg; aio1->a_iov[0].iov_len = strlen(msg) + 1; nni_plat_udp_send(u1, aio1); @@ -214,7 +214,7 @@ TestMain("UDP support", { sa.s_un.s_in6.sa_port = 80; nni_aio_init(&aio1, NULL, NULL); aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg; + aio1->a_iov[0].iov_buf = (void *) msg; aio1->a_iov[0].iov_len = strlen(msg) + 1; aio1->a_addr = &sa; @@ -239,7 +239,7 @@ TestMain("UDP support", { sa.s_un.s_in6.sa_port = 80; nni_aio_init(&aio1, NULL, NULL); aio1->a_niov = 1; - aio1->a_iov[0].iov_buf = msg; + aio1->a_iov[0].iov_buf = (void *) msg; aio1->a_iov[0].iov_len = strlen(msg) + 1; aio1->a_addr = &sa; |
