diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/transport/ipc/ipc.c | 157 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 172 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 169 |
3 files changed, 306 insertions, 192 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9d25ed72..e2bb5f99 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -37,8 +37,8 @@ struct nni_ipc_pipe { size_t wanttxhead; size_t wantrxhead; - nni_aio *user_txaio; - nni_aio *user_rxaio; + nni_list recvq; + nni_list sendq; nni_aio *user_negaio; nni_aio *txaio; nni_aio *rxaio; @@ -57,6 +57,8 @@ struct nni_ipc_ep { nni_mtx mtx; }; +static void nni_ipc_pipe_dosend(nni_ipc_pipe *, nni_aio *); +static void nni_ipc_pipe_dorecv(nni_ipc_pipe *); static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); @@ -119,6 +121,8 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) nni_ipc_pipe_fini(p); return (rv); } + nni_aio_list_init(&p->sendq); + nni_aio_list_init(&p->recvq); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -215,17 +219,16 @@ nni_ipc_pipe_send_cb(void *arg) size_t n; nni_mtx_lock(&pipe->mtx); - if ((aio = pipe->user_txaio) == NULL) { - nni_mtx_unlock(&pipe->mtx); - return; - } + aio = nni_list_first(&pipe->sendq); if ((rv = nni_aio_result(txaio)) != 0) { - pipe->user_txaio = NULL; + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); nni_mtx_unlock(&pipe->mtx); - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_msg_free(msg); nni_aio_finish_error(aio, rv); return; } @@ -238,11 +241,18 @@ nni_ipc_pipe_send_cb(void *arg) return; } + nni_aio_list_remove(aio); + if (!nni_list_empty(&pipe->sendq)) { + // schedule next send + nni_ipc_pipe_dosend(pipe, nni_list_first(&pipe->sendq)); + } nni_mtx_unlock(&pipe->mtx); + msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); + nni_aio_set_synch(aio); nni_aio_finish(aio, 0, n); } @@ -257,11 +267,7 @@ nni_ipc_pipe_recv_cb(void *arg) nni_aio * rxaio = pipe->rxaio; nni_mtx_lock(&pipe->mtx); - if ((aio = pipe->user_rxaio) == NULL) { - // aio was canceled - nni_mtx_unlock(&pipe->mtx); - return; - } + aio = nni_list_first(&pipe->recvq); if ((rv = nni_aio_result(rxaio)) != 0) { // Error on receive. This has to cause an error back @@ -325,19 +331,28 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. - pipe->user_rxaio = NULL; - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + + nni_aio_list_remove(aio); + msg = pipe->rxmsg; + pipe->rxmsg = NULL; + if (!nni_list_empty(&pipe->recvq)) { + nni_ipc_pipe_dorecv(pipe); + } nni_mtx_unlock(&pipe->mtx); - nni_aio_finish_msg(aio, msg); + nni_aio_set_msg(aio, msg); + nni_aio_set_synch(aio); + nni_aio_finish(aio, 0, nni_msg_len(msg)); return; recv_error: - pipe->user_rxaio = NULL; - msg = pipe->rxmsg; - pipe->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = pipe->rxmsg; + pipe->rxmsg = NULL; + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. nni_mtx_unlock(&pipe->mtx); + nni_msg_free(msg); nni_aio_finish_error(aio, rv); } @@ -348,37 +363,36 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); - if (pipe->user_txaio != aio) { + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&pipe->mtx); + return; + } + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&pipe->sendq) == aio) { + nni_aio_abort(pipe->txaio, rv); nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_txaio = NULL; + nni_aio_list_remove(aio); nni_mtx_unlock(&pipe->mtx); - - nni_aio_abort(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } static void -nni_ipc_pipe_send(void *arg, nni_aio *aio) +nni_ipc_pipe_dosend(nni_ipc_pipe *pipe, nni_aio *aio) { - nni_ipc_pipe *pipe = arg; - nni_msg * msg = nni_aio_get_msg(aio); - uint64_t len; - nni_aio * txaio; - int niov; - nni_iov iov[3]; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + // This runs to send the message. + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - nni_mtx_lock(&pipe->mtx); - if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } - - pipe->user_txaio = aio; - pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); @@ -398,43 +412,54 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_ipc_pipe_send(pipe->ipp, txaio); - nni_mtx_unlock(&pipe->mtx); } static void -nni_ipc_cancel_rx(nni_aio *aio, int rv) +nni_ipc_pipe_send(void *arg, nni_aio *aio) { - nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); + nni_ipc_pipe *pipe = arg; nni_mtx_lock(&pipe->mtx); - if (pipe->user_rxaio != aio) { + if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); return; } - pipe->user_rxaio = NULL; + nni_list_append(&pipe->sendq, aio); + if (nni_list_first(&pipe->sendq) == aio) { + nni_ipc_pipe_dosend(pipe, aio); + } nni_mtx_unlock(&pipe->mtx); - - nni_aio_abort(pipe->rxaio, rv); - nni_aio_finish_error(aio, rv); } static void -nni_ipc_pipe_recv(void *arg, nni_aio *aio) +nni_ipc_cancel_rx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = arg; - nni_aio * rxaio; - nni_iov iov; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); - - if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&pipe->mtx); return; } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&pipe->recvq) == aio) { + nni_aio_abort(pipe->rxaio, rv); + nni_mtx_unlock(&pipe->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); +} - pipe->user_rxaio = aio; +static void +nni_ipc_pipe_dorecv(nni_ipc_pipe *pipe) +{ + nni_aio *rxaio; + nni_iov iov; NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. @@ -444,6 +469,24 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); +} + +static void +nni_ipc_pipe_recv(void *arg, nni_aio *aio) +{ + nni_ipc_pipe *pipe = arg; + + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return; + } + + nni_list_append(&pipe->recvq, aio); + if (nni_list_first(&pipe->recvq) == aio) { + nni_ipc_pipe_dorecv(pipe); + } nni_mtx_unlock(&pipe->mtx); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 4837c8d3..7f819d4f 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -27,8 +27,8 @@ struct nni_tcp_pipe { uint16_t proto; size_t rcvmax; - nni_aio *user_txaio; - nni_aio *user_rxaio; + nni_list recvq; + nni_list sendq; nni_aio *user_negaio; uint8_t txlen[sizeof(uint64_t)]; @@ -57,6 +57,8 @@ struct nni_tcp_ep { nni_mtx mtx; }; +static void nni_tcp_pipe_dosend(nni_tcp_pipe *, nni_aio *); +static void nni_tcp_pipe_dorecv(nni_tcp_pipe *); static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); static void nni_tcp_pipe_nego_cb(void *); @@ -119,6 +121,8 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) nni_tcp_pipe_fini(p); return (rv); } + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -213,17 +217,16 @@ nni_tcp_pipe_send_cb(void *arg) nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); - if ((aio = p->user_txaio) == NULL) { - nni_mtx_unlock(&p->mtx); - return; - } + aio = nni_list_first(&p->sendq); if ((rv = nni_aio_result(txaio)) != 0) { - p->user_txaio = NULL; + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_msg_free(msg); nni_aio_finish_error(aio, rv); return; } @@ -236,11 +239,18 @@ nni_tcp_pipe_send_cb(void *arg) return; } + nni_aio_list_remove(aio); + if (!nni_list_empty(&p->sendq)) { + // schedule next send + nni_tcp_pipe_dosend(p, nni_list_first(&p->sendq)); + } nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); + nni_aio_set_synch(aio); nni_aio_finish(aio, 0, n); } @@ -252,17 +262,10 @@ nni_tcp_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); - - if ((aio = p->user_rxaio) == NULL) { - // Canceled. - nni_mtx_unlock(&p->mtx); - return; - } - - rxaio = p->rxaio; + aio = nni_list_first(&p->recvq); if ((rv = nni_aio_result(rxaio)) != 0) { goto recv_error; @@ -310,18 +313,27 @@ nni_tcp_pipe_recv_cb(void *arg) } // We read a message completely. Let the user know the good news. - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + if (!nni_list_empty(&p->recvq)) { + nni_tcp_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); - nni_aio_finish_msg(aio, msg); + + nni_aio_set_synch(aio); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); return; recv_error: - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. nni_mtx_unlock(&p->mtx); + nni_msg_free(msg); nni_aio_finish_error(aio, rv); } @@ -332,45 +344,43 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv) nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - if (p->user_txaio != aio) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&p->mtx); return; } - p->user_txaio = NULL; + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - // cancel the underlying operation. - nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } static void -nni_tcp_pipe_send(void *arg, nni_aio *aio) +nni_tcp_pipe_dosend(nni_tcp_pipe *p, nni_aio *aio) { - nni_tcp_pipe *p = arg; - nni_msg * msg = nni_aio_get_msg(aio); - uint64_t len; - nni_aio * txaio; - int niov; - nni_iov iov[3]; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + // This runs to send the message. + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } - - p->user_txaio = aio; - NNI_PUT64(p->txlen, len); - niov = 0; - txaio = p->txaio; - iov[niov].iov_buf = p->txlen; - iov[niov].iov_len = sizeof(p->txlen); + txaio = p->txaio; + niov = 0; + iov[0].iov_buf = p->txlen; + iov[0].iov_len = sizeof(p->txlen); niov++; if (nni_msg_header_len(msg) > 0) { iov[niov].iov_buf = nni_msg_header(msg); @@ -383,53 +393,79 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_plat_tcp_pipe_send(p->tpp, txaio); - nni_mtx_unlock(&p->mtx); } static void -nni_tcp_cancel_rx(nni_aio *aio, int rv) +nni_tcp_pipe_send(void *arg, nni_aio *aio) { - nni_tcp_pipe *p = nni_aio_get_prov_data(aio); + nni_tcp_pipe *p = arg; nni_mtx_lock(&p->mtx); - if (p->user_rxaio != aio) { + if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) { nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = NULL; + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + nni_tcp_pipe_dosend(p, aio); + } nni_mtx_unlock(&p->mtx); - - // cancel the underlying operation. - nni_aio_abort(p->rxaio, rv); - nni_aio_finish_error(aio, rv); } static void -nni_tcp_pipe_recv(void *arg, nni_aio *aio) +nni_tcp_cancel_rx(nni_aio *aio, int rv) { - nni_tcp_pipe *p = arg; - nni_aio * rxaio; - nni_iov iov; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) { + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(p->rxaio, rv); nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = aio; + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} +static void +nni_tcp_pipe_dorecv(nni_tcp_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; NNI_ASSERT(p->rxmsg == NULL); - // Schedule a read of the TCP header. + // Schedule a read of the IPC header. rxaio = p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, rxaio); +} + +static void +nni_tcp_pipe_recv(void *arg, nni_aio *aio) +{ + nni_tcp_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) { + nni_mtx_unlock(&p->mtx); + return; + } + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + nni_tcp_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); } diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 84125c48..610a7f7c 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -32,8 +32,8 @@ struct nni_tls_pipe { uint16_t proto; size_t rcvmax; - nni_aio *user_txaio; - nni_aio *user_rxaio; + nni_list sendq; + nni_list recvq; nni_aio *user_negaio; uint8_t txlen[sizeof(uint64_t)]; @@ -65,6 +65,8 @@ struct nni_tls_ep { int mode; }; +static void nni_tls_pipe_dorecv(nni_tls_pipe *); +static void nni_tls_pipe_dosend(nni_tls_pipe *, nni_aio *); static void nni_tls_pipe_send_cb(void *); static void nni_tls_pipe_recv_cb(void *); static void nni_tls_pipe_nego_cb(void *); @@ -128,6 +130,8 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp) nni_tls_pipe_fini(p); return (rv); } + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -222,17 +226,16 @@ nni_tls_pipe_send_cb(void *arg) nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); - if ((aio = p->user_txaio) == NULL) { - nni_mtx_unlock(&p->mtx); - return; - } + aio = nni_list_first(&p->sendq); if ((rv = nni_aio_result(txaio)) != 0) { - p->user_txaio = NULL; + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_msg_free(msg); nni_aio_finish_error(aio, rv); return; } @@ -244,12 +247,17 @@ nni_tls_pipe_send_cb(void *arg) nni_mtx_unlock(&p->mtx); return; } - + nni_aio_list_remove(aio); + if (!nni_list_empty(&p->sendq)) { + nni_tls_pipe_dosend(p, nni_list_first(&p->sendq)); + } nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); + nni_aio_set_synch(aio); nni_aio_finish(aio, 0, n); } @@ -261,17 +269,10 @@ nni_tls_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); - - if ((aio = p->user_rxaio) == NULL) { - // Canceled. - nni_mtx_unlock(&p->mtx); - return; - } - - rxaio = p->rxaio; + aio = nni_list_first(&p->recvq); if ((rv = nni_aio_result(p->rxaio)) != 0) { goto recv_error; @@ -320,17 +321,25 @@ nni_tls_pipe_recv_cb(void *arg) } // We read a message completely. Let the user know the good news. - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + if (!nni_list_empty(&p->recvq)) { + nni_tls_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); - nni_aio_finish_msg(aio, msg); + + nni_aio_set_synch(aio); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); return; recv_error: - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. nni_mtx_unlock(&p->mtx); nni_msg_free(msg); nni_aio_finish_error(aio, rv); @@ -342,43 +351,40 @@ nni_tls_cancel_tx(nni_aio *aio, int rv) nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - if (p->user_txaio != aio) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&p->mtx); return; } - p->user_txaio = NULL; + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - // cancel the underlying operation. - nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } static void -nni_tls_pipe_send(void *arg, nni_aio *aio) +nni_tls_pipe_dosend(nni_tls_pipe *p, nni_aio *aio) { - nni_tls_pipe *p = arg; - nni_msg * msg = nni_aio_get_msg(aio); - uint64_t len; - nni_aio * txaio; - int niov; - nni_iov iov[3]; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } - - p->user_txaio = aio; - NNI_PUT64(p->txlen, len); - niov = 0; txaio = p->txaio; + niov = 0; iov[niov].iov_buf = p->txlen; iov[niov].iov_len = sizeof(p->txlen); niov++; @@ -395,52 +401,81 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) nni_aio_set_iov(txaio, niov, iov); nni_tls_send(p->tls, txaio); - nni_mtx_unlock(&p->mtx); } static void -nni_tls_cancel_rx(nni_aio *aio, int rv) +nni_tls_pipe_send(void *arg, nni_aio *aio) { - nni_tls_pipe *p = nni_aio_get_prov_data(aio); + nni_tls_pipe *p = arg; nni_mtx_lock(&p->mtx); - if (p->user_rxaio != aio) { + + if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = NULL; - nni_mtx_unlock(&p->mtx); - // cancel the underlying operation. - nni_aio_abort(p->rxaio, rv); - nni_aio_finish_error(aio, rv); + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + nni_tls_pipe_dosend(p, aio); + } + nni_mtx_unlock(&p->mtx); } static void -nni_tls_pipe_recv(void *arg, nni_aio *aio) +nni_tls_cancel_rx(nni_aio *aio, int rv) { - nni_tls_pipe *p = arg; - nni_aio * rxaio; - nni_iov iov; + nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = aio; + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(p->rxaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} +static void +nni_tls_pipe_dorecv(nni_tls_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; NNI_ASSERT(p->rxmsg == NULL); - // Schedule a read of the TCP header. - rxaio = p->rxaio; - + // Schedule a read of the IPC header. + rxaio = p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); nni_tls_recv(p->tls, rxaio); +} + +static void +nni_tls_pipe_recv(void *arg, nni_aio *aio) +{ + nni_tls_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + + if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + nni_tls_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); } |
