diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-04 17:17:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-04 21:20:00 -0700 |
| commit | dc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch) | |
| tree | 1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/transport/inproc/inproc.c | |
| parent | 6887900ae033add30ee0151b72abe927c5239588 (diff) | |
| download | nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2 nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip | |
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak
checks. This represents a complete rethink of how the AIOs work,
and much simpler synchronization; the provider API is a bit simpler
to boot, as a number of failure modes have been simply eliminated.
While here a few other minor bugs were squashed.
Diffstat (limited to 'src/transport/inproc/inproc.c')
| -rw-r--r-- | src/transport/inproc/inproc.c | 110 |
1 files changed, 43 insertions, 67 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 226a31ce..9cc43ad7 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -241,27 +241,24 @@ static void nni_inproc_conn_finish(nni_aio *aio, int rv) { nni_inproc_ep *ep = aio->a_endpt; + void * pipe; - if (rv != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - } nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } + pipe = aio->a_pipe; + aio->a_pipe = NULL; + + if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && + nni_list_empty(&ep->aios)) { + nni_list_node_remove(&ep->node); } - if (nni_aio_finish(aio, rv, 0) != 0) { - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; + + if (rv == 0) { + nni_aio_finish_pipe(aio, pipe); + } else { + if (pipe != NULL) { + nni_inproc_pipe_fini(pipe); } + nni_aio_finish_error(aio, rv); } } @@ -291,29 +288,6 @@ nni_inproc_ep_close(void *arg) } static void -nni_inproc_connect_abort(nni_aio *aio) -{ - nni_inproc_ep *ep = aio->a_endpt; - - nni_mtx_lock(&nni_inproc.mx); - - if (aio->a_pipe != NULL) { - nni_inproc_pipe_fini(aio->a_pipe); - aio->a_pipe = NULL; - } - nni_aio_list_remove(aio); - if (ep != NULL) { - if ((ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); - } - } - } - nni_mtx_unlock(&nni_inproc.mx); -} - -static void nni_inproc_accept_clients(nni_inproc_ep *server) { nni_inproc_ep * client, *nclient; @@ -369,23 +343,24 @@ nni_inproc_accept_clients(nni_inproc_ep *server) } static void -nni_inproc_ep_cancel(nni_aio *aio) +nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep *ep = aio->a_prov_data; + nni_inproc_ep * ep = aio->a_prov_data; + nni_inproc_pipe *pipe; nni_mtx_lock(&nni_inproc.mx); - if (nni_list_active(&ep->aios, aio)) { - nni_list_remove(&ep->aios, aio); - } - // Arguably if the mode is a client... then we need to remove - // it from the server's list. Notably this isn't *our* list, - // but the offsets are the same and they're good enough using the - // global lock to make it all safe. - if (nni_list_active(&ep->clients, ep)) { - nni_list_remove(&ep->clients, ep); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_list_node_remove(&ep->node); + if ((pipe = aio->a_pipe) != NULL) { + aio->a_pipe = NULL; + nni_inproc_pipe_fini(pipe); + } + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); } + static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { @@ -394,7 +369,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) int rv; if (ep->mode != NNI_EP_MODE_DIAL) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&nni_inproc.mx); @@ -406,24 +381,24 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) if (nni_list_active(&ep->clients, ep)) { // We already have a pending connection... - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish_error(aio, NNG_EINVAL); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->started) { - nni_aio_finish(aio, NNG_EBUSY, 0); + nni_aio_finish_error(aio, NNG_EBUSY); nni_mtx_unlock(&nni_inproc.mx); return; } if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -491,32 +466,33 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_ep *ep = arg; int rv; - if (ep->mode != NNI_EP_MODE_LISTEN) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_mtx_lock(&nni_inproc.mx); + + if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); return; } - nni_mtx_lock(&nni_inproc.mx); + if (ep->mode != NNI_EP_MODE_LISTEN) { + nni_aio_finish_error(aio, NNG_EINVAL); + nni_mtx_unlock(&nni_inproc.mx); + return; + } // We are already on the master list of servers, thanks to bind. if (ep->closed) { - nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&nni_inproc.mx); return; } if (!ep->started) { - nni_aio_finish(aio, NNG_ESTATE, 0); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_aio_finish_error(aio, NNG_ESTATE); nni_mtx_unlock(&nni_inproc.mx); return; } if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { - nni_aio_finish(aio, rv, 0); + nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } |
