From e8694d15d0a108895bf869f292d59e11d834361e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 13 Nov 2017 21:10:03 -0800 Subject: fixes #154 underlyng TCP & IPC transports should support partial recv/send fixes #155 POSIX TCP & IPC could avoid a lot of context switches --- src/transport/tcp/tcp.c | 137 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 42 deletions(-) (limited to 'src/transport/tcp') diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e33db865..d6a51faa 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -207,24 +207,51 @@ nni_tcp_pipe_send_cb(void *arg) nni_tcp_pipe *p = arg; int rv; nni_aio * aio; - size_t len; + size_t n; + nng_msg * msg; + nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); if ((aio = p->user_txaio) == NULL) { nni_mtx_unlock(&p->mtx); return; } - p->user_txaio = NULL; - if ((rv = nni_aio_result(p->txaio)) != 0) { - len = 0; - } else { - len = nni_msg_len(aio->a_msg); - nni_msg_free(nni_aio_get_msg(aio)); + if ((rv = nni_aio_result(txaio)) != 0) { + p->user_txaio = NULL; + nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + while (n) { + NNI_ASSERT(txaio->a_niov != 0); + if (txaio->a_iov[0].iov_len > n) { + txaio->a_iov[0].iov_len -= n; + txaio->a_iov[0].iov_buf += n; + break; + } + n -= txaio->a_iov[0].iov_len; + for (int i = 0; i < txaio->a_niov; i++) { + txaio->a_iov[i] = txaio->a_iov[i + 1]; + } + txaio->a_niov--; } - nni_aio_finish(aio, 0, len); + if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_plat_tcp_pipe_send(p->tpp, txaio); + nni_mtx_unlock(&p->mtx); + return; + } + nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, n); } static void @@ -233,26 +260,39 @@ nni_tcp_pipe_recv_cb(void *arg) nni_tcp_pipe *p = arg; nni_aio * aio; int rv; + size_t n; nni_msg * msg; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); - aio = p->user_rxaio; - if (aio == NULL) { + if ((aio = p->user_rxaio) == NULL) { + // Canceled. nni_mtx_unlock(&p->mtx); return; } if ((rv = nni_aio_result(p->rxaio)) != 0) { - // Error on receive. This has to cause an error back - // to the user. Also, if we had allocated an rxmsg, lets - // toss it. - if (p->rxmsg != NULL) { - nni_msg_free(p->rxmsg); - p->rxmsg = NULL; + goto recv_error; + } + + n = nni_aio_count(p->rxaio); + while (n) { + NNI_ASSERT(rxaio->a_niov != 0); + if (rxaio->a_iov[0].iov_len > n) { + rxaio->a_iov[0].iov_len -= n; + rxaio->a_iov[0].iov_buf += n; + break; } - p->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); + n -= rxaio->a_iov[0].iov_len; + rxaio->a_niov--; + for (int i = 0; i < rxaio->a_niov; i++) { + rxaio->a_iov[i] = rxaio->a_iov[i + 1]; + } + } + // Was this a partial read? If so then resubmit for the rest. + if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -261,7 +301,6 @@ nni_tcp_pipe_recv_cb(void *arg) // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (p->rxmsg == NULL) { - nni_aio *rxaio; uint64_t len; // We should have gotten a message header. NNI_GET64(p->rxlen, len); @@ -269,37 +308,42 @@ nni_tcp_pipe_recv_cb(void *arg) // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. if (len > p->rcvmax) { - p->user_rxaio = NULL; - nni_aio_finish_error(aio, NNG_EMSGSIZE); - nni_mtx_unlock(&p->mtx); - return; + rv = NNG_EMSGSIZE; + goto recv_error; } if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { - p->user_rxaio = NULL; - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&p->mtx); - return; + goto recv_error; } // Submit the rest of the data for a read -- we want to // read the entire message now. - rxaio = p->rxaio; - rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg); - rxaio->a_niov = 1; + if (len != 0) { + rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); + rxaio->a_iov[0].iov_len = (size_t) len; + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, rxaio); - nni_mtx_unlock(&p->mtx); - return; + nni_plat_tcp_pipe_recv(p->tpp, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } } // We read a message completely. Let the user know the good news. p->user_rxaio = NULL; msg = p->rxmsg; p->rxmsg = NULL; + nni_mtx_unlock(&p->mtx); nni_aio_finish_msg(aio, msg); + return; + +recv_error: + p->user_rxaio = NULL; + msg = p->rxmsg; + p->rxmsg = NULL; nni_mtx_unlock(&p->mtx); + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); } static void @@ -327,6 +371,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; nni_aio * txaio; + int niov; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -341,14 +386,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - txaio = p->txaio; - txaio->a_iov[0].iov_buf = p->txlen; - txaio->a_iov[0].iov_len = sizeof(p->txlen); - txaio->a_iov[1].iov_buf = nni_msg_header(msg); - txaio->a_iov[1].iov_len = nni_msg_header_len(msg); - txaio->a_iov[2].iov_buf = nni_msg_body(msg); - txaio->a_iov[2].iov_len = nni_msg_len(msg); - txaio->a_niov = 3; + niov = 0; + txaio = p->txaio; + txaio->a_iov[niov].iov_buf = p->txlen; + txaio->a_iov[niov].iov_len = sizeof(p->txlen); + niov++; + if (nni_msg_header_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_header(msg); + txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + txaio->a_iov[niov].iov_buf = nni_msg_body(msg); + txaio->a_iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + txaio->a_niov = niov; nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); -- cgit v1.2.3-70-g09d2