From 36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 29 Jun 2017 11:33:01 -0700 Subject: Pass cancel of IPC and TCP all the way down to POSIX pipedescs. --- src/transport/tcp/tcp.c | 138 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 94 insertions(+), 44 deletions(-) (limited to 'src/transport/tcp') diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1c1eabbf..4aecef65 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -35,6 +35,7 @@ struct nni_tcp_pipe { nni_aio txaio; nni_aio rxaio; nni_msg * rxmsg; + nni_mtx mtx; }; struct nni_tcp_ep { @@ -72,6 +73,24 @@ nni_tcp_pipe_close(void *arg) } +static void +nni_tcp_pipe_fini(void *arg) +{ + nni_tcp_pipe *pipe = arg; + + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + if (pipe->tsp != NULL) { + nni_plat_tcp_fini(pipe->tsp); + } + if (pipe->rxmsg) { + nni_msg_free(pipe->rxmsg); + } + + NNI_FREE_STRUCT(pipe); +} + + static int nni_tcp_pipe_init(void **argp) { @@ -81,39 +100,26 @@ nni_tcp_pipe_init(void **argp) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { + goto fail; + } if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); if (rv != 0) { - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe); if (rv != 0) { - nni_aio_fini(&pipe->txaio); - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); + goto fail; } *argp = pipe; return (0); -} - - -static void -nni_tcp_pipe_fini(void *arg) -{ - nni_tcp_pipe *pipe = arg; - if (pipe->rxmsg) { - nni_msg_free(pipe->rxmsg); - } - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); +fail: + nni_tcp_pipe_fini(pipe); + return (rv); } @@ -125,23 +131,22 @@ nni_tcp_pipe_send_cb(void *arg) nni_aio *aio; size_t len; + nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { - // This should never ever happen. - NNI_ASSERT(aio != NULL); + nni_mtx_unlock(&pipe->mtx); return; } pipe->user_txaio = NULL; if ((rv = nni_aio_result(&pipe->txaio)) != 0) { - nni_aio_finish(aio, rv, 0); - return; + len = 0; + } else { + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; } - - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; - nni_aio_finish(aio, 0, len); + nni_mtx_unlock(&pipe->mtx); } @@ -152,10 +157,11 @@ nni_tcp_pipe_recv_cb(void *arg) nni_aio *aio; int rv; + nni_mtx_lock(&pipe->mtx); + aio = pipe->user_rxaio; if (aio == NULL) { - // This should never ever happen. - NNI_ASSERT(aio != NULL); + nni_mtx_unlock(&pipe->mtx); return; } @@ -169,6 +175,7 @@ nni_tcp_pipe_recv_cb(void *arg) } pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -185,12 +192,14 @@ nni_tcp_pipe_recv_cb(void *arg) if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_mtx_unlock(&pipe->mtx); return; } if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -200,14 +209,8 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); - if (rv != 0) { - pipe->user_rxaio = NULL; - nni_msg_free(pipe->rxmsg); - pipe->rxmsg = NULL; - nni_aio_finish(aio, rv, 0); - return; - } + nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); return; } @@ -217,6 +220,21 @@ nni_tcp_pipe_recv_cb(void *arg) aio->a_msg = pipe->rxmsg; pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_tcp_cancel_tx(nni_aio *aio) +{ + nni_tcp_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_txaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->txaio); } @@ -227,9 +245,17 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) nni_msg *msg = aio->a_msg; uint64_t len; + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } + pipe->user_txaio = aio; - len = nni_msg_len(msg) + nni_msg_header_len(msg); NNI_PUT64(pipe->txlen, len); pipe->txaio.a_iov[0].iov_buf = pipe->txlen; @@ -240,7 +266,23 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio)); + nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); + nni_mtx_unlock(&pipe->mtx); + return (0); +} + + +static void +nni_tcp_cancel_rx(nni_aio *aio) +{ + nni_tcp_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_rxaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->rxaio); } @@ -249,6 +291,12 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *pipe = arg; + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } pipe->user_rxaio = aio; NNI_ASSERT(pipe->rxmsg == NULL); @@ -258,7 +306,9 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); pipe->rxaio.a_niov = 1; - return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio)); + nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); + return (0); } -- cgit v1.2.3-70-g09d2