diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-03-29 19:35:09 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-03-30 12:38:56 -0700 |
| commit | 99907b52872d9fc9c68f1af60299dd8476c0a603 (patch) | |
| tree | ef465cb6c71efe9b5294069627be57f99a8d9f8f /src/transport/ipc | |
| parent | 5f69a5ba55058c7f5bfc4d5a250e72c56f31e0eb (diff) | |
| download | nng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.gz nng-99907b52872d9fc9c68f1af60299dd8476c0a603.tar.bz2 nng-99907b52872d9fc9c68f1af60299dd8476c0a603.zip | |
fixes #317 TLS, TCP, and IPC should support multiple outstanding ops
fixes #22 Consider using synchronous completions sometimes
Transport improvements for IPC, TCP, and TLS.
This change does three things.
First it permits multiple outstanding receives or sends on the transport.
This change is being made to accomodate some other changes in the protocols
where it might be advantageous to post send or receives directly against
the transport pipe without going through another level of indirection.
Second, it changes the normal completions to be performed synchronously.
This translates into a rather major performance improvement, reducing
latency by some 27%, and thereby improving performance altogether. (This
elminates two extra context switches per transaction!)
FInally, we can save some extra checks and conditions because we know
that completions cannot happen if we don't have a pending operation
(we no longer complete out of sequence), and we only call the dosend
operation when we have something to send. This can eliminate some
pipeline stalls.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 157 |
1 files changed, 100 insertions, 57 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9d25ed72..e2bb5f99 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -37,8 +37,8 @@ struct nni_ipc_pipe { size_t wanttxhead; size_t wantrxhead; - nni_aio *user_txaio; - nni_aio *user_rxaio; + nni_list recvq; + nni_list sendq; nni_aio *user_negaio; nni_aio *txaio; nni_aio *rxaio; @@ -57,6 +57,8 @@ struct nni_ipc_ep { nni_mtx mtx; }; +static void nni_ipc_pipe_dosend(nni_ipc_pipe *, nni_aio *); +static void nni_ipc_pipe_dorecv(nni_ipc_pipe *); static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); @@ -119,6 +121,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) nni_ipc_pipe_fini(p); return (rv); } + nni_aio_list_init(&p->sendq); + nni_aio_list_init(&p->recvq); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -215,17 +219,16 @@ nni_ipc_pipe_send_cb(void *arg) size_t n; nni_mtx_lock(&pipe->mtx); - if ((aio = pipe->user_txaio) == NULL) { - nni_mtx_unlock(&pipe->mtx); - return; - } + aio = nni_list_first(&pipe->sendq); if ((rv = nni_aio_result(txaio)) != 0) { - pipe->user_txaio = NULL; + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); nni_mtx_unlock(&pipe->mtx); - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_msg_free(msg); nni_aio_finish_error(aio, rv); return; } @@ -238,11 +241,18 @@ nni_ipc_pipe_send_cb(void *arg) return; } + nni_aio_list_remove(aio); + if (!nni_list_empty(&pipe->sendq)) { + // schedule next send + nni_ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq)); + } nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); + nni_aio_set_synch(aio); nni_aio_finish(aio, 0, n); } @@ -257,11 +267,7 @@ nni_ipc_pipe_recv_cb(void *arg) nni_aio * rxaio = pipe->rxaio; nni_mtx_lock(&pipe->mtx); - if ((aio = pipe->user_rxaio) == NULL) { - // aio was canceled - nni_mtx_unlock(&pipe->mtx); - return; - } + aio = nni_list_first(&pipe->recvq); if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back @@ -325,19 +331,28 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. - pipe->user_rxaio = NULL; - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + + nni_aio_list_remove(aio); + msg = pipe->rxmsg; + pipe->rxmsg = NULL; + if (!nni_list_empty(&pipe->recvq)) { + nni_ipc_pipe_dorecv(pipe); + } nni_mtx_unlock(&pipe->mtx); - nni_aio_finish_msg(aio, msg); + nni_aio_set_msg(aio, msg); + nni_aio_set_synch(aio); + nni_aio_finish(aio, 0, nni_msg_len(msg)); return; recv_error: - pipe->user_rxaio = NULL; - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = pipe->rxmsg; + pipe->rxmsg = NULL; + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. nni_mtx_unlock(&pipe->mtx); + nni_msg_free(msg); nni_aio_finish_error(aio, rv); } @@ -348,37 +363,36 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); - if (pipe->user_txaio != aio) { + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&pipe->mtx); + return; + } + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&pipe->sendq) == aio) { + nni_aio_abort(pipe->txaio, rv); nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_txaio = NULL; + nni_aio_list_remove(aio); nni_mtx_unlock(&pipe->mtx); - - nni_aio_abort(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } static void -nni_ipc_pipe_send(void *arg, nni_aio *aio) +nni_ipc_pipe_dosend(nni_ipc_pipe *pipe, nni_aio *aio) { - nni_ipc_pipe *pipe = arg; - nni_msg * msg = nni_aio_get_msg(aio); - uint64_t len; - nni_aio * txaio; - int niov; - nni_iov iov[3]; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + // This runs to send the message. + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - nni_mtx_lock(&pipe->mtx); - if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - - pipe->user_txaio = aio; - pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); @@ -398,43 +412,54 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_ipc_pipe_send(pipe->ipp, txaio); - nni_mtx_unlock(&pipe->mtx); } static void -nni_ipc_cancel_rx(nni_aio *aio, int rv) +nni_ipc_pipe_send(void *arg, nni_aio *aio) { - nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); + nni_ipc_pipe *pipe = arg; nni_mtx_lock(&pipe->mtx); - if (pipe->user_rxaio != aio) { + if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_rxaio = NULL; + nni_list_append(&pipe->sendq, aio); + if (nni_list_first(&pipe->sendq) == aio) { + nni_ipc_pipe_dosend(pipe, aio); + } nni_mtx_unlock(&pipe->mtx); - - nni_aio_abort(pipe->rxaio, rv); - nni_aio_finish_error(aio, rv); } static void -nni_ipc_pipe_recv(void *arg, nni_aio *aio) +nni_ipc_cancel_rx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = arg; - nni_aio * rxaio; - nni_iov iov; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); - - if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&pipe->mtx); return; } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&pipe->recvq) == aio) { + nni_aio_abort(pipe->rxaio, rv); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); +} - pipe->user_rxaio = aio; +static void +nni_ipc_pipe_dorecv(nni_ipc_pipe *pipe) +{ + nni_aio *rxaio; + nni_iov iov; NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. @@ -444,6 +469,24 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); +} + +static void +nni_ipc_pipe_recv(void *arg, nni_aio *aio) +{ + nni_ipc_pipe *pipe = arg; + + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; + } + + nni_list_append(&pipe->recvq, aio); + if (nni_list_first(&pipe->recvq) == aio) { + nni_ipc_pipe_dorecv(pipe); + } nni_mtx_unlock(&pipe->mtx); } |
