diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-29 11:33:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-29 11:33:01 -0700 |
| commit | 36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64 (patch) | |
| tree | a2ab064810f269fac8e9b0473685a02c5643f5c9 /src/transport | |
| parent | d0c0c9969ab5552889f91d09db6dbf6b79f6705c (diff) | |
| download | nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.gz nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.tar.bz2 nng-36bd0c71ce7a0bfbb22ffc99aa0dd94ed95cfc64.zip | |
Pass cancel of IPC and TCP all the way down to POSIX pipedescs.
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/ipc/ipc.c | 147 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 138 |
2 files changed, 195 insertions, 90 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index ada7a87c..7727b375 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -36,6 +36,7 @@ struct nni_ipc_pipe { nni_aio txaio; nni_aio rxaio; nni_msg * rxmsg; + nni_mtx mtx; }; struct nni_ipc_ep { @@ -72,6 +73,24 @@ nni_ipc_pipe_close(void *arg) } +static void +nni_ipc_pipe_fini(void *arg) +{ + nni_ipc_pipe *pipe = arg; + + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + if (pipe->isp != NULL) { + nni_plat_ipc_fini(pipe->isp); + } + if (pipe->rxmsg) { + nni_msg_free(pipe->rxmsg); + } + nni_mtx_fini(&pipe->mtx); + NNI_FREE_STRUCT(pipe); +} + + static int nni_ipc_pipe_init(void **argp) { @@ -81,41 +100,27 @@ nni_ipc_pipe_init(void **argp) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { + goto fail; + } if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); if (rv != 0) { - nni_plat_ipc_fini(pipe->isp); - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe); if (rv != 0) { - nni_aio_fini(&pipe->txaio); - nni_plat_ipc_fini(pipe->isp); - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } *argp = pipe; return (0); -} - - -static void -nni_ipc_pipe_fini(void *arg) -{ - nni_ipc_pipe *pipe = arg; - if (pipe->rxmsg) { - nni_msg_free(pipe->rxmsg); - } - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_plat_ipc_fini(pipe->isp); - NNI_FREE_STRUCT(pipe); +fail: + nni_ipc_pipe_fini(pipe); + return (rv); } @@ -127,21 +132,21 @@ nni_ipc_pipe_send_cb(void *arg) int rv; size_t len; + nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { - NNI_ASSERT(aio != NULL); + nni_mtx_unlock(&pipe->mtx); return; } pipe->user_txaio = NULL; if ((rv = nni_aio_result(&pipe->txaio)) != 0) { - nni_aio_finish(aio, rv, 0); - return; + len = 0; + } else { + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; } - - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; - - nni_aio_finish(aio, 0, len); + nni_aio_finish(aio, rv, len); + nni_mtx_unlock(&pipe->mtx); } @@ -152,10 +157,11 @@ nni_ipc_pipe_recv_cb(void *arg) nni_aio *aio; int rv; + nni_mtx_lock(&pipe->mtx); aio = pipe->user_rxaio; if (aio == NULL) { - // This should never ever happen. - NNI_ASSERT(aio != NULL); + // aio was canceled + nni_mtx_unlock(&pipe->mtx); return; } @@ -169,6 +175,7 @@ nni_ipc_pipe_recv_cb(void *arg) } pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -181,6 +188,7 @@ nni_ipc_pipe_recv_cb(void *arg) // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { nni_aio_finish(aio, NNG_EPROTO, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -192,12 +200,19 @@ nni_ipc_pipe_recv_cb(void *arg) if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_mtx_unlock(&pipe->mtx); return; } + // Note that all IO on this pipe is blocked behind this + // allocation. We could possibly look at using a separate + // lock for the read side in the future, so that we allow + // transmits to proceed normally. In practice this is + // unlikely to be much of an issue though. if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -207,14 +222,8 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - rv = nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio); - if (rv != 0) { - pipe->user_rxaio = NULL; - nni_msg_free(pipe->rxmsg); - pipe->rxmsg = NULL; - nni_aio_finish(aio, rv, 0); - return; - } + nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); return; } @@ -224,6 +233,21 @@ nni_ipc_pipe_recv_cb(void *arg) aio->a_msg = pipe->rxmsg; pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_ipc_cancel_tx(nni_aio *aio) +{ + nni_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_txaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->txaio); } @@ -234,10 +258,17 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio) nni_msg *msg = aio->a_msg; uint64_t len; + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + nni_mtx_lock(&pipe->mtx); + if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } + pipe->user_txaio = aio; pipe->txhead[0] = 1; // message type, 1. - len = nni_msg_len(msg) + nni_msg_header_len(msg); NNI_PUT64(pipe->txhead + 1, len); pipe->txaio.a_iov[0].iov_buf = pipe->txhead; @@ -248,7 +279,23 @@ nni_ipc_pipe_aio_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - return (nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio)); + nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio); + nni_mtx_unlock(&pipe->mtx); + return (0); +} + + +static void +nni_ipc_cancel_rx(nni_aio *aio) +{ + nni_ipc_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_rxaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->rxaio); } @@ -257,8 +304,14 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - pipe->user_rxaio = aio; + nni_mtx_lock(&pipe->mtx); + if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } + + pipe->user_rxaio = aio; NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. @@ -266,7 +319,9 @@ nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); pipe->rxaio.a_niov = 1; - return (nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio)); + nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); + return (0); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1c1eabbf..4aecef65 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -35,6 +35,7 @@ struct nni_tcp_pipe { nni_aio txaio; nni_aio rxaio; nni_msg * rxmsg; + nni_mtx mtx; }; struct nni_tcp_ep { @@ -72,6 +73,24 @@ nni_tcp_pipe_close(void *arg) } +static void +nni_tcp_pipe_fini(void *arg) +{ + nni_tcp_pipe *pipe = arg; + + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + if (pipe->tsp != NULL) { + nni_plat_tcp_fini(pipe->tsp); + } + if (pipe->rxmsg) { + nni_msg_free(pipe->rxmsg); + } + + NNI_FREE_STRUCT(pipe); +} + + static int nni_tcp_pipe_init(void **argp) { @@ -81,39 +100,26 @@ nni_tcp_pipe_init(void **argp) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { + goto fail; + } if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); if (rv != 0) { - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); - return (rv); + goto fail; } rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe); if (rv != 0) { - nni_aio_fini(&pipe->txaio); - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); + goto fail; } *argp = pipe; return (0); -} - - -static void -nni_tcp_pipe_fini(void *arg) -{ - nni_tcp_pipe *pipe = arg; - if (pipe->rxmsg) { - nni_msg_free(pipe->rxmsg); - } - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_plat_tcp_fini(pipe->tsp); - NNI_FREE_STRUCT(pipe); +fail: + nni_tcp_pipe_fini(pipe); + return (rv); } @@ -125,23 +131,22 @@ nni_tcp_pipe_send_cb(void *arg) nni_aio *aio; size_t len; + nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { - // This should never ever happen. - NNI_ASSERT(aio != NULL); + nni_mtx_unlock(&pipe->mtx); return; } pipe->user_txaio = NULL; if ((rv = nni_aio_result(&pipe->txaio)) != 0) { - nni_aio_finish(aio, rv, 0); - return; + len = 0; + } else { + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; } - - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; - nni_aio_finish(aio, 0, len); + nni_mtx_unlock(&pipe->mtx); } @@ -152,10 +157,11 @@ nni_tcp_pipe_recv_cb(void *arg) nni_aio *aio; int rv; + nni_mtx_lock(&pipe->mtx); + aio = pipe->user_rxaio; if (aio == NULL) { - // This should never ever happen. - NNI_ASSERT(aio != NULL); + nni_mtx_unlock(&pipe->mtx); return; } @@ -169,6 +175,7 @@ nni_tcp_pipe_recv_cb(void *arg) } pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -185,12 +192,14 @@ nni_tcp_pipe_recv_cb(void *arg) if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_mtx_unlock(&pipe->mtx); return; } if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; nni_aio_finish(aio, rv, 0); + nni_mtx_unlock(&pipe->mtx); return; } @@ -200,14 +209,8 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); - if (rv != 0) { - pipe->user_rxaio = NULL; - nni_msg_free(pipe->rxmsg); - pipe->rxmsg = NULL; - nni_aio_finish(aio, rv, 0); - return; - } + nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); return; } @@ -217,6 +220,21 @@ nni_tcp_pipe_recv_cb(void *arg) aio->a_msg = pipe->rxmsg; pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_mtx_unlock(&pipe->mtx); +} + + +static void +nni_tcp_cancel_tx(nni_aio *aio) +{ + nni_tcp_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_txaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->txaio); } @@ -227,9 +245,17 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) nni_msg *msg = aio->a_msg; uint64_t len; + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } + pipe->user_txaio = aio; - len = nni_msg_len(msg) + nni_msg_header_len(msg); NNI_PUT64(pipe->txlen, len); pipe->txaio.a_iov[0].iov_buf = pipe->txlen; @@ -240,7 +266,23 @@ nni_tcp_pipe_aio_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio)); + nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); + nni_mtx_unlock(&pipe->mtx); + return (0); +} + + +static void +nni_tcp_cancel_rx(nni_aio *aio) +{ + nni_tcp_pipe *pipe = aio->a_prov_data; + + nni_mtx_lock(&pipe->mtx); + pipe->user_rxaio = NULL; + nni_mtx_unlock(&pipe->mtx); + + // stop the underlying aio ... we don't want a result for it. + nni_aio_stop(&pipe->rxaio); } @@ -249,6 +291,12 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *pipe = arg; + nni_mtx_lock(&pipe->mtx); + + if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) { + nni_mtx_unlock(&pipe->mtx); + return (0); + } pipe->user_rxaio = aio; NNI_ASSERT(pipe->rxmsg == NULL); @@ -258,7 +306,9 @@ nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); pipe->rxaio.a_niov = 1; - return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio)); + nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_mtx_unlock(&pipe->mtx); + return (0); } |
