diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-04 17:17:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-04 21:20:00 -0700 |
| commit | dc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch) | |
| tree | 1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/transport | |
| parent | 6887900ae033add30ee0151b72abe927c5239588 (diff) | |
| download | nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2 nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip | |
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak
checks. This represents a complete rethink of how the AIOs work,
and much simpler synchronization; the provider API is a bit simpler
to boot, as a number of failure modes have been simply eliminated.
While here a few other minor bugs were squashed.
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 110 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 69 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 61 |
3 files changed, 136 insertions, 104 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 226a31ce..9cc43ad7 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -241,27 +241,24 @@ static void nni_inproc_conn_finish(nni_aio *aio, int rv) { nni_inproc_ep *ep = aio->a_endpt; + void * pipe; - if (rv != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - } nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } + pipe = aio->a_pipe; + aio->a_pipe = NULL; + + if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && + nni_list_empty(&ep->aios)) { + nni_list_node_remove(&ep->node); } - if (nni_aio_finish(aio, rv, 0) != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; + + if (rv == 0) { + nni_aio_finish_pipe(aio, pipe); + } else { + if (pipe != NULL) { + nni_inproc_pipe_fini(pipe); } + nni_aio_finish_error(aio, rv); } } @@ -291,29 +288,6 @@ nni_inproc_ep_close(void *arg) } static void -nni_inproc_connect_abort(nni_aio *aio) -{ - nni_inproc_ep *ep = aio->a_endpt; - - nni_mtx_lock(&nni_inproc.mx); - - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } - } - nni_mtx_unlock(&nni_inproc.mx); -} - -static void nni_inproc_accept_clients(nni_inproc_ep *server) { nni_inproc_ep * client, *nclient; @@ -369,23 +343,24 @@ nni_inproc_accept_clients(nni_inproc_ep *server) } static void -nni_inproc_ep_cancel(nni_aio *aio) +nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep *ep = aio->a_prov_data; + nni_inproc_ep * ep = aio->a_prov_data; + nni_inproc_pipe *pipe; nni_mtx_lock(&nni_inproc.mx); - if (nni_list_active(&ep->aios, aio)) { - nni_list_remove(&ep->aios, aio); - } - // Arguably if the mode is a client... then we need to remove - // it from the server's list. Notably this isn't *our* list, - // but the offsets are the same and they're good enough using the - // global lock to make it all safe. - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_list_node_remove(&ep->node); + if ((pipe = aio->a_pipe) != NULL) { + aio->a_pipe = NULL; + nni_inproc_pipe_fini(pipe); + } + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); } + static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { @@ -394,7 +369,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) int rv; if (ep->mode != NNI_EP_MODE_DIAL) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&nni_inproc.mx); @@ -406,24 +381,24 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) if (nni_list_active(&ep->clients, ep)) { // We already have a pending connection... - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->started) { - nni_aio_finish(aio, NNG_EBUSY, 0); + nni_aio_finish_error(aio, NNG_EBUSY); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -491,32 +466,33 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_ep *ep = arg; int rv; - if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_mtx_lock(&nni_inproc.mx); + + if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); return; } - nni_mtx_lock(&nni_inproc.mx); + if (ep->mode != NNI_EP_MODE_LISTEN) { + nni_aio_finish_error(aio, NNG_EINVAL); + nni_mtx_unlock(&nni_inproc.mx); + return; + } // We are already on the master list of servers, thanks to bind. if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if (!ep->started) { - nni_aio_finish(aio, NNG_ESTATE, 0); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_aio_finish_error(aio, NNG_ESTATE); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 96dae6de..e8c7968f 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -142,15 +142,20 @@ fail: } static void -nni_ipc_cancel_start(nni_aio *aio) +nni_ipc_cancel_start(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_negaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, aio->a_result); + nni_aio_cancel(&pipe->negaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -239,10 +244,10 @@ nni_ipc_pipe_recv_cb(void *arg) nni_ipc_pipe *pipe = arg; nni_aio * aio; int rv; + nni_msg * msg; nni_mtx_lock(&pipe->mtx); - aio = pipe->user_rxaio; - if (aio == NULL) { + if ((aio = pipe->user_rxaio) == NULL) { // aio was canceled nni_mtx_unlock(&pipe->mtx); return; @@ -257,7 +262,7 @@ nni_ipc_pipe_recv_cb(void *arg) pipe->rxmsg = NULL; } pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -270,7 +275,7 @@ nni_ipc_pipe_recv_cb(void *arg) // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { - nni_aio_finish(aio, NNG_EPROTO, 0); + nni_aio_finish_error(aio, NNG_EPROTO); nni_mtx_unlock(&pipe->mtx); return; } @@ -282,7 +287,7 @@ nni_ipc_pipe_recv_cb(void *arg) // the caller will shut down the pipe. if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&pipe->mtx); return; } @@ -294,7 +299,7 @@ nni_ipc_pipe_recv_cb(void *arg) // unlikely to be much of an issue though. if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -313,22 +318,27 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; + msg = pipe->rxmsg; pipe->rxmsg = NULL; - nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_aio_finish_msg(aio, msg); nni_mtx_unlock(&pipe->mtx); } static void -nni_ipc_cancel_tx(nni_aio *aio) +nni_ipc_cancel_tx(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_txaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->txaio, aio->a_result); + nni_aio_cancel(&pipe->txaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -364,15 +374,20 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) } static void -nni_ipc_cancel_rx(nni_aio *aio) +nni_ipc_cancel_rx(nni_aio *aio, int rv) { nni_ipc_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_rxaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->rxaio, aio->a_result); + nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -552,10 +567,18 @@ done: aio = ep->user_aio; ep->user_aio = NULL; - if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) { - if (pipe != NULL) { - nni_ipc_pipe_fini(pipe); - } + if ((aio != NULL) && (rv == 0)) { + NNI_ASSERT(pipe != NULL); + nni_aio_finish_pipe(aio, pipe); + return; + } + + if (pipe != NULL) { + nni_ipc_pipe_fini(pipe); + } + if (aio != NULL) { + NNI_ASSERT(rv != 0); + nni_aio_finish_error(aio, rv); } } @@ -570,15 +593,21 @@ nni_ipc_ep_cb(void *arg) } static void -nni_ipc_cancel_ep(nni_aio *aio) +nni_ipc_cancel_ep(nni_aio *aio, int rv) { nni_ipc_ep *ep = aio->a_prov_data; + NNI_ASSERT(rv != 0); nni_mtx_lock(&ep->mtx); + if (ep->user_aio != aio) { + nni_mtx_unlock(&ep->mtx); + return; + } ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, aio->a_result); + nni_aio_cancel(&ep->aio, rv); + nni_aio_finish_error(aio, rv); } static void diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 28677f54..a42fa377 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -141,15 +141,20 @@ fail: } static void -nni_tcp_cancel_nego(nni_aio *aio) +nni_tcp_cancel_nego(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_negaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, aio->a_result); + nni_aio_cancel(&pipe->negaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -239,6 +244,7 @@ nni_tcp_pipe_recv_cb(void *arg) nni_tcp_pipe *pipe = arg; nni_aio * aio; int rv; + nni_msg * msg; nni_mtx_lock(&pipe->mtx); @@ -257,7 +263,7 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxmsg = NULL; } pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -274,14 +280,14 @@ nni_tcp_pipe_recv_cb(void *arg) // the caller will shut down the pipe. if (len > pipe->rcvmax) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, NNG_EMSGSIZE, 0); + nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&pipe->mtx); return; } if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { pipe->user_rxaio = NULL; - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&pipe->mtx); return; } @@ -300,23 +306,28 @@ nni_tcp_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; + msg = pipe->rxmsg; pipe->rxmsg = NULL; - nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); + nni_aio_finish_msg(aio, msg); nni_mtx_unlock(&pipe->mtx); } static void -nni_tcp_cancel_tx(nni_aio *aio) +nni_tcp_cancel_tx(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_txaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); // cancel the underlying operation. - nni_aio_cancel(&pipe->txaio, aio->a_result); + nni_aio_cancel(&pipe->txaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -352,16 +363,21 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) } static void -nni_tcp_cancel_rx(nni_aio *aio) +nni_tcp_cancel_rx(nni_aio *aio, int rv) { nni_tcp_pipe *pipe = aio->a_prov_data; nni_mtx_lock(&pipe->mtx); + if (pipe->user_rxaio != aio) { + nni_mtx_unlock(&pipe->mtx); + return; + } pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); // cancel the underlying operation. - nni_aio_cancel(&pipe->rxaio, aio->a_result); + nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_finish_error(aio, rv); } static void @@ -619,10 +635,16 @@ done: aio = ep->user_aio; ep->user_aio = NULL; - if ((aio == NULL) || (nni_aio_finish_pipe(aio, rv, pipe) != 0)) { - if (pipe != NULL) { - nni_tcp_pipe_fini(pipe); - } + if ((aio != NULL) && (rv == 0)) { + nni_aio_finish_pipe(aio, pipe); + return; + } + if (pipe != NULL) { + nni_tcp_pipe_fini(pipe); + } + if (aio != NULL) { + NNI_ASSERT(rv != 0); + nni_aio_finish_error(aio, rv); } } @@ -637,15 +659,20 @@ nni_tcp_ep_cb(void *arg) } static void -nni_tcp_cancel_ep(nni_aio *aio) +nni_tcp_cancel_ep(nni_aio *aio, int rv) { nni_tcp_ep *ep = aio->a_prov_data; nni_mtx_lock(&ep->mtx); + if (ep->user_aio != aio) { + nni_mtx_unlock(&ep->mtx); + return; + } ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, aio->a_result); + nni_aio_cancel(&ep->aio, rv); + nni_aio_finish_error(aio, rv); } static void |
