From a37093079b492e966344416445aae354b147d30e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 20 Jul 2017 14:34:51 -0700 Subject: Yet more race condition fixes. We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too. --- src/core/aio.c | 84 ++++++++++++++++++++----------------------------- src/core/aio.h | 10 ++++++ src/core/endpt.c | 20 +++++++++--- src/core/pipe.c | 65 ++++++++++---------------------------- src/core/socket.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++--------- src/core/socket.h | 7 +++-- src/core/taskq.c | 3 +- 7 files changed, 159 insertions(+), 123 deletions(-) (limited to 'src/core') diff --git a/src/core/aio.c b/src/core/aio.c index a9fbc50d..620a865d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,10 +12,9 @@ #include enum nni_aio_flags { - NNI_AIO_WAKE = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, - NNI_AIO_START = 0x8, + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, }; // These are used for expiration. @@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = NNI_AIO_START; + aio->a_flags = 0; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { - void (*cancelfn)(nni_aio *); + nni_aio_stop(aio); - nni_mtx_lock(&aio->a_lk); - aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - if ((aio->a_flags & NNI_AIO_DONE) == 0) { - aio->a_flags |= NNI_AIO_DONE; - aio->a_result = NNG_ECANCELED; - cancelfn = aio->a_prov_cancel; - if (aio->a_flags & NNI_AIO_START) { - aio->a_flags &= ~NNI_AIO_START; - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // At this point the AIO is done. + nni_cv_fini(&aio->a_cv); + nni_mtx_fini(&aio->a_lk); - } else { - cancelfn = NULL; + if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { + NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); } - nni_cv_wake(&aio->a_cv); +} - while (aio->a_refcnt != 0) { - nni_cv_wait(&aio->a_cv); +// nni_aio_stop cancels any oustanding operation, and waits for the +// callback to complete, if still running. It also marks the AIO as +// stopped, preventing further calls to nni_aio_start from succeeding. +// To correctly tear down an AIO, call stop, and make sure any other +// calles are not also stopped, before calling nni_aio_fini to release +// actual memory. +void +nni_aio_stop(nni_aio *aio) +{ + if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) { + // Never initialized, so nothing should have happened. + return; } + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled nni_mtx_unlock(&aio->a_lk); - // Stop any timeouts. If one was in flight, we wait until it - // completes (it could fire the completion callback.) - nni_aio_expire_remove(aio); - - // Cancel the AIO if it was scheduled. - if (cancelfn != NULL) { - cancelfn(aio); - } + nni_aio_cancel(aio, NNG_ECANCELED); // Wait for any outstanding task to complete. We won't schedule // new stuff because nni_aio_start will fail (due to AIO_FINI). nni_taskq_wait(NULL, &aio->a_tqe); - - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); - nni_mtx_fini(&aio->a_lk); - - if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { - NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); - } } int @@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv) void (*cancelfn)(nni_aio *); nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // The operation already completed - so there's nothing // left for us to do. nni_mtx_unlock(&aio->a_lk); return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = rv; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; - // Guaraneed to just be a list operation. - nni_aio_expire_remove(aio); - aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); + // Stop any I/O at the provider level. if (cancelfn != NULL) { cancelfn(aio); @@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv) aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - if (!(aio->a_flags & NNI_AIO_FINI)) { - // If we are finalizing, then we are done. - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -213,13 +198,12 @@ int nni_aio_finish(nni_aio *aio, int result, size_t count) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = count; @@ -240,13 +224,12 @@ int nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = 0; @@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg) } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = NNG_ETIMEDOUT; cancelfn = aio->a_prov_cancel; diff --git a/src/core/aio.h b/src/core/aio.h index ad5b560e..c698ce20 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -69,6 +69,16 @@ extern int nni_aio_init(nni_aio *, nni_cb, void *); // on zero'd memory. extern void nni_aio_fini(nni_aio *); +// nni_aio_stop cancels any unfinished I/O, running completion callbacks, +// but also prevents any new operations from starting (nni_aio_start will +// return NNG_ESTATE). This should be called before nni_aio_fini(). The +// best pattern is to call nni_aio_stop on all linked aios, before calling +// nni_aio_fini on any of them. This function will block until any +// callbacks are executed, and therefore it should never be executed +// from a callback itself. (To abort operations without blocking +// use nni_aio_cancel instead.) +extern void nni_aio_stop(nni_aio *); + // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait diff --git a/src/core/endpt.c b/src/core/endpt.c index ca76838a..6b474698 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -64,6 +64,10 @@ nni_ep_destroy(nni_ep *ep) if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + nni_aio_stop(&ep->ep_backoff); nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); @@ -179,6 +183,10 @@ nni_ep_reap(nni_ep *ep) { nni_ep_close(ep); // Extra sanity. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); @@ -188,11 +196,13 @@ nni_ep_reap(nni_ep *ep) // done everything we can to wake any waiter (synchronous connect) // gracefully. nni_mtx_lock(&ep->ep_mtx); - while (!nni_list_empty(&ep->ep_pipes)) { - nni_cv_wait(&ep->ep_cv); - } - while (ep->ep_refcnt != 0) { - nni_cv_wait(&ep->ep_cv); + ep->ep_closed = 1; + for (;;) { + if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) { + nni_cv_wait(&ep->ep_cv); + continue; + } + break; } nni_mtx_unlock(&ep->ep_mtx); diff --git a/src/core/pipe.c b/src/core/pipe.c index 8cc21941..5b51a38b 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p) if (p == NULL) { return; } - + NNI_ASSERT(p->p_refcnt != 0xDEAD); // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&p->p_mtx); @@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p) nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&p->p_mtx); + p->p_refcnt = 0xDEAD; + + nni_aio_stop(&p->p_start_aio); + nni_aio_fini(&p->p_start_aio); if (p->p_proto_data != NULL) { p->p_proto_dtor(p->p_proto_data); @@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p) } p->p_reap = 1; - // abort any pending negotiation/start process. - nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); - // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); @@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p) nni_mtx_unlock(&p->p_mtx); - // Ensure that the negotiation step is aborted fully. - nni_aio_fini(&p->p_start_aio); + // abort any pending negotiation/start process. + nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); } // Pipe reap is called on a taskq when the pipe should be closed. No @@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p) // Transport close... nni_pipe_close(p); + nni_aio_stop(&p->p_start_aio); + // Remove the pipe from the socket and the endpoint. Note // that it is in theory possible for either of these to be null // if the pipe is being torn down before it is fully initialized. @@ -148,6 +151,7 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. + nni_pipe_close(p); nni_mtx_lock(&p->p_mtx); if (p->p_stop) { nni_mtx_unlock(&p->p_mtx); @@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg) } } -void -nni_pipe_hold(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt++; - nni_mtx_unlock(&p->p_mtx); -} - -void -nni_pipe_rele(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt--; - if (p->p_refcnt == 0) { - nni_cv_wake(&p->p_cv); - } - nni_mtx_unlock(&p->p_mtx); -} - int nni_pipe_create(nni_ep *ep, void *tdata) { @@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(p); + tran->tran_pipe->p_fini(tdata); return (NNG_ENOMEM); } @@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata) return (rv); } - nni_pipe_hold(p); - NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { - goto fail; - } - - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) { - goto fail; + nni_pipe_destroy(p); + return (rv); } - // Attempt to initialize sock protocol & endpoint. - if ((rv = nni_ep_pipe_add(ep, p)) != 0) { - goto fail; - } - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - goto fail; + if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) { + nni_pipe_destroy(p); + return (rv); } - // Start the pipe running. - nni_pipe_start(p); - nni_pipe_rele(p); - return (0); - -fail: - nni_pipe_stop(p); - nni_pipe_rele(p); - - return (rv); } int diff --git a/src/core/socket.c b/src/core/socket.c index 10bc1c80..23a1793a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +static int +nni_sock_pipe_start(nni_pipe *pipe) +{ + nni_sock *sock = pipe->p_sock; + void * pdata = nni_pipe_get_proto_data(pipe); + int rv; + + NNI_ASSERT(sock != NULL); + if (sock->s_closing) { + // We're closing, bail out. + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + // Protocol rejection for other reasons. + // E.g. pair and already have active connected partner. + return (rv); + } + return (0); +} + +static void +nni_sock_pipe_start_cb(void *arg) +{ + nni_pipe *pipe = arg; + nni_aio * aio = &pipe->p_start_aio; + + if (nni_aio_result(aio) != 0) { + // Failed I/O during start, abort everything. + nni_pipe_stop(pipe); + return; + } + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + return; + } +} + int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { + nni_mtx_lock(&ep->ep_mtx); + + if ((sock->s_closing) || (ep->ep_closed)) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } + rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); + if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + rv = sock->s_pipe_ops.pipe_init( &pipe->p_proto_data, pipe, sock->s_data); if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&sock->s_mx); return (rv); } // Save the protocol destructor. pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; pipe->p_sock = sock; + pipe->p_ep = ep; + nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&ep->ep_pipes, pipe); + + // Start the initial negotiation I/O... + if (pipe->p_tran_ops.p_start == NULL) { + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + } + } else { + pipe->p_tran_ops.p_start( + pipe->p_tran_data, &pipe->p_start_aio); + } + + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (0); } @@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - nni_mtx_lock(&sock->s_mx); + // Stop any pending negotiation. + nni_aio_stop(&pipe->p_start_aio); + nni_mtx_lock(&sock->s_mx); if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { nni_mtx_unlock(&sock->s_mx); return; @@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, arrange for it to teardown hard. (Close, etc.). - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_stop(ep); } - - // Wait for the pipes to be reaped (there should not be any because - // we have already reaped the EPs.) - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_cv_wait(&sock->s_cv); + // For each pipe, arrange for it to teardown hard. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); } - // Wait for the eps to be reaped. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + // We have to wait for *both* endpoints and pipes to be removed. + while ((!nni_list_empty(&sock->s_pipes)) || + (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); } diff --git a/src/core/socket.h b/src/core/socket.h index d0196eea..41dfbc33 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -87,8 +87,11 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // nni_sock_pipe_add adds the pipe to the socket. It is called by -// the generic pipe creation code. -extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); +// the generic pipe creation code. It also adds the socket to the +// ep list, and starts the pipe. It does all these to ensure that +// we have complete success or failure, and there is no point where +// a pipe could wind up orphaned. +extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); diff --git a/src/core/taskq.c b/src/core/taskq.c index 36129bfd..64179790 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -153,7 +154,7 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) return (NNG_ECLOSED); } // It might already be scheduled... if so don't redo it. - if (ent->tqe_tq == NULL) { + if (!nni_list_active(&tq->tq_ents, ent)) { ent->tqe_tq = tq; nni_list_append(&tq->tq_ents, ent); } -- cgit v1.2.3-70-g09d2