diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-11-13 21:10:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-13 21:10:03 -0800 |
| commit | e8694d15d0a108895bf869f292d59e11d834361e (patch) | |
| tree | d87b8d396953fee653fbcbee92521395d0cec1fe /src/transport/ipc | |
| parent | ac6019bfabac887274fb9d8b2a167df940ba6121 (diff) | |
| download | nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.gz nng-e8694d15d0a108895bf869f292d59e11d834361e.tar.bz2 nng-e8694d15d0a108895bf869f292d59e11d834361e.zip | |
fixes #154 underlyng TCP & IPC transports should support partial recv/send
fixes #155 POSIX TCP & IPC could avoid a lot of context switches
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 150 |
1 files changed, 104 insertions, 46 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c13dbd34..32ff1c0e 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -207,25 +207,52 @@ nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; nni_aio * aio; + nni_aio * txaio = pipe->txaio; + nni_msg * msg; int rv; - size_t len; + size_t n; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_txaio = NULL; - if ((rv = nni_aio_result(pipe->txaio)) != 0) { - len = 0; - } else { - nni_msg *msg = nni_aio_get_msg(aio); - len = nni_msg_len(msg); - nni_msg_free(msg); + + if ((rv = nni_aio_result(txaio)) != 0) { + pipe->user_txaio = NULL; + nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + while (n) { + NNI_ASSERT(txaio->a_niov != 0); + if (txaio->a_iov[0].iov_len > n) { + txaio->a_iov[0].iov_len -= n; + txaio->a_iov[0].iov_buf += n; + break; + } + n -= txaio->a_iov[0].iov_len; + for (int i = 0; i < txaio->a_niov; i++) { + txaio->a_iov[i] = txaio->a_iov[i + 1]; + } + txaio->a_niov--; + } + if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_plat_ipc_pipe_send(pipe->ipp, txaio); + nni_mtx_unlock(&pipe->mtx); + return; } - nni_aio_finish(aio, rv, len); + nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, n); } static void @@ -234,7 +261,9 @@ nni_ipc_pipe_recv_cb(void *arg) nni_ipc_pipe *pipe = arg; nni_aio * aio; int rv; + size_t n; nni_msg * msg; + nni_aio * rxaio = pipe->rxaio; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_rxaio) == NULL) { @@ -243,32 +272,45 @@ nni_ipc_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(pipe->rxaio)) != 0) { + if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. - if (pipe->rxmsg != NULL) { - nni_msg_free(pipe->rxmsg); - pipe->rxmsg = NULL; + goto recv_error; + } + + n = nni_aio_count(rxaio); + while (n) { + NNI_ASSERT(rxaio->a_niov != 0); + if (rxaio->a_iov[0].iov_len > n) { + rxaio->a_iov[0].iov_len -= n; + rxaio->a_iov[0].iov_buf += n; + break; } - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); + n -= rxaio->a_iov[0].iov_len; + for (int i = 0; i < rxaio->a_niov; i++) { + rxaio->a_iov[i] = rxaio->a_iov[i + 1]; + } + rxaio->a_niov--; + } + + // Was this a partial read? If so then resubmit for the rest. + if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; } - // If we don't have a message yet, we were reading the TCP message + // If we don't have a message yet, we were reading the message // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (pipe->rxmsg == NULL) { uint64_t len; - nni_aio *rxaio; // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { - nni_aio_finish_error(aio, NNG_EPROTO); - nni_mtx_unlock(&pipe->mtx); - return; + rv = NNG_EPROTO; + goto recv_error; } // We should have gotten a message header. @@ -277,10 +319,8 @@ nni_ipc_pipe_recv_cb(void *arg) // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. if (len > pipe->rcvmax) { - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, NNG_EMSGSIZE); - nni_mtx_unlock(&pipe->mtx); - return; + rv = NNG_EMSGSIZE; + goto recv_error; } // Note that all IO on this pipe is blocked behind this @@ -289,22 +329,20 @@ nni_ipc_pipe_recv_cb(void *arg) // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { - pipe->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&pipe->mtx); - return; + goto recv_error; } - // Submit the rest of the data for a read -- we want to - // read the entire message now. - rxaio = pipe->rxaio; - rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - rxaio->a_niov = 1; + if (len != 0) { + // Submit the rest of the data for a read -- we want to + // read the entire message now. + rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + rxaio->a_iov[0].iov_len = (size_t) len; + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); - nni_mtx_unlock(&pipe->mtx); - return; + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); + nni_mtx_unlock(&pipe->mtx); + return; + } } // Otherwise we got a message read completely. Let the user know the @@ -312,8 +350,18 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->user_rxaio = NULL; msg = pipe->rxmsg; pipe->rxmsg = NULL; + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_msg(aio, msg); + return; + +recv_error: + pipe->user_rxaio = NULL; + msg = pipe->rxmsg; + pipe->rxmsg = NULL; nni_mtx_unlock(&pipe->mtx); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); } static void @@ -340,6 +388,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; nni_aio * txaio; + int niov; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -354,14 +403,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - txaio = pipe->txaio; - txaio->a_iov[0].iov_buf = pipe->txhead; - txaio->a_iov[0].iov_len = sizeof(pipe->txhead); - txaio->a_iov[1].iov_buf = nni_msg_header(msg); - txaio->a_iov[1].iov_len = nni_msg_header_len(msg); - txaio->a_iov[2].iov_buf = nni_msg_body(msg); - txaio->a_iov[2].iov_len = nni_msg_len(msg); - txaio->a_niov = 3; + txaio = pipe->txaio; + niov = 0; + txaio->a_iov[niov].iov_buf = pipe->txhead; + txaio->a_iov[niov].iov_len = sizeof(pipe->txhead); + niov++; + if (nni_msg_header_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_header(msg); + txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_body(msg); + txaio->a_iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + txaio->a_niov = niov; nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); @@ -625,7 +682,8 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - // If we can't start, then its dying and we can't report either. + // If we can't start, then its dying and we can't report + // either. if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { nni_mtx_unlock(&ep->mtx); return; |
