diff options
Diffstat (limited to 'src/transport/tls')
| -rw-r--r-- | src/transport/tls/tls.c | 169 |
1 files changed, 102 insertions, 67 deletions
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 84125c48..610a7f7c 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -32,8 +32,8 @@ struct nni_tls_pipe { uint16_t proto; size_t rcvmax; - nni_aio *user_txaio; - nni_aio *user_rxaio; + nni_list sendq; + nni_list recvq; nni_aio *user_negaio; uint8_t txlen[sizeof(uint64_t)]; @@ -65,6 +65,8 @@ struct nni_tls_ep { int mode; }; +static void nni_tls_pipe_dorecv(nni_tls_pipe *); +static void nni_tls_pipe_dosend(nni_tls_pipe *, nni_aio *); static void nni_tls_pipe_send_cb(void *); static void nni_tls_pipe_recv_cb(void *); static void nni_tls_pipe_nego_cb(void *); @@ -128,6 +130,8 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp) nni_tls_pipe_fini(p); return (rv); } + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -222,17 +226,16 @@ nni_tls_pipe_send_cb(void *arg) nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); - if ((aio = p->user_txaio) == NULL) { - nni_mtx_unlock(&p->mtx); - return; - } + aio = nni_list_first(&p->sendq); if ((rv = nni_aio_result(txaio)) != 0) { - p->user_txaio = NULL; + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_msg_free(msg); nni_aio_finish_error(aio, rv); return; } @@ -244,12 +247,17 @@ nni_tls_pipe_send_cb(void *arg) nni_mtx_unlock(&p->mtx); return; } - + nni_aio_list_remove(aio); + if (!nni_list_empty(&p->sendq)) { + nni_tls_pipe_dosend(p, nni_list_first(&p->sendq)); + } nni_mtx_unlock(&p->mtx); + msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); + nni_aio_set_synch(aio); nni_aio_finish(aio, 0, n); } @@ -261,17 +269,10 @@ nni_tls_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio; + nni_aio * rxaio = p->rxaio; nni_mtx_lock(&p->mtx); - - if ((aio = p->user_rxaio) == NULL) { - // Canceled. - nni_mtx_unlock(&p->mtx); - return; - } - - rxaio = p->rxaio; + aio = nni_list_first(&p->recvq); if ((rv = nni_aio_result(p->rxaio)) != 0) { goto recv_error; @@ -320,17 +321,25 @@ nni_tls_pipe_recv_cb(void *arg) } // We read a message completely. Let the user know the good news. - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + if (!nni_list_empty(&p->recvq)) { + nni_tls_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); - nni_aio_finish_msg(aio, msg); + + nni_aio_set_synch(aio); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); return; recv_error: - p->user_rxaio = NULL; - msg = p->rxmsg; - p->rxmsg = NULL; + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. nni_mtx_unlock(&p->mtx); nni_msg_free(msg); nni_aio_finish_error(aio, rv); @@ -342,43 +351,40 @@ nni_tls_cancel_tx(nni_aio *aio, int rv) nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - if (p->user_txaio != aio) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&p->mtx); return; } - p->user_txaio = NULL; + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); nni_mtx_unlock(&p->mtx); - // cancel the underlying operation. - nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } static void -nni_tls_pipe_send(void *arg, nni_aio *aio) +nni_tls_pipe_dosend(nni_tls_pipe *p, nni_aio *aio) { - nni_tls_pipe *p = arg; - nni_msg * msg = nni_aio_get_msg(aio); - uint64_t len; - nni_aio * txaio; - int niov; - nni_iov iov[3]; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } - - p->user_txaio = aio; - NNI_PUT64(p->txlen, len); - niov = 0; txaio = p->txaio; + niov = 0; iov[niov].iov_buf = p->txlen; iov[niov].iov_len = sizeof(p->txlen); niov++; @@ -395,52 +401,81 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) nni_aio_set_iov(txaio, niov, iov); nni_tls_send(p->tls, txaio); - nni_mtx_unlock(&p->mtx); } static void -nni_tls_cancel_rx(nni_aio *aio, int rv) +nni_tls_pipe_send(void *arg, nni_aio *aio) { - nni_tls_pipe *p = nni_aio_get_prov_data(aio); + nni_tls_pipe *p = arg; nni_mtx_lock(&p->mtx); - if (p->user_rxaio != aio) { + + if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = NULL; - nni_mtx_unlock(&p->mtx); - // cancel the underlying operation. - nni_aio_abort(p->rxaio, rv); - nni_aio_finish_error(aio, rv); + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + nni_tls_pipe_dosend(p, aio); + } + nni_mtx_unlock(&p->mtx); } static void -nni_tls_pipe_recv(void *arg, nni_aio *aio) +nni_tls_cancel_rx(nni_aio *aio, int rv) { - nni_tls_pipe *p = arg; - nni_aio * rxaio; - nni_iov iov; + nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { + if (!nni_aio_list_active(aio)) { nni_mtx_unlock(&p->mtx); return; } - p->user_rxaio = aio; + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(p->rxaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} +static void +nni_tls_pipe_dorecv(nni_tls_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; NNI_ASSERT(p->rxmsg == NULL); - // Schedule a read of the TCP header. - rxaio = p->rxaio; - + // Schedule a read of the IPC header. + rxaio = p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); nni_tls_recv(p->tls, rxaio); +} + +static void +nni_tls_pipe_recv(void *arg, nni_aio *aio) +{ + nni_tls_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + + if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + nni_tls_pipe_dorecv(p); + } nni_mtx_unlock(&p->mtx); } |
