diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 84 | ||||
| -rw-r--r-- | src/core/aio.h | 10 | ||||
| -rw-r--r-- | src/core/endpt.c | 20 | ||||
| -rw-r--r-- | src/core/pipe.c | 65 | ||||
| -rw-r--r-- | src/core/socket.c | 93 | ||||
| -rw-r--r-- | src/core/socket.h | 7 | ||||
| -rw-r--r-- | src/core/taskq.c | 3 |
7 files changed, 159 insertions, 123 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a9fbc50d..620a865d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,10 +12,9 @@ #include <string.h> enum nni_aio_flags { - NNI_AIO_WAKE = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, - NNI_AIO_START = 0x8, + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, }; // These are used for expiration. @@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = NNI_AIO_START; + aio->a_flags = 0; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { - void (*cancelfn)(nni_aio *); + nni_aio_stop(aio); - nni_mtx_lock(&aio->a_lk); - aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - if ((aio->a_flags & NNI_AIO_DONE) == 0) { - aio->a_flags |= NNI_AIO_DONE; - aio->a_result = NNG_ECANCELED; - cancelfn = aio->a_prov_cancel; - if (aio->a_flags & NNI_AIO_START) { - aio->a_flags &= ~NNI_AIO_START; - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // At this point the AIO is done. + nni_cv_fini(&aio->a_cv); + nni_mtx_fini(&aio->a_lk); - } else { - cancelfn = NULL; + if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { + NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); } - nni_cv_wake(&aio->a_cv); +} - while (aio->a_refcnt != 0) { - nni_cv_wait(&aio->a_cv); +// nni_aio_stop cancels any oustanding operation, and waits for the +// callback to complete, if still running. It also marks the AIO as +// stopped, preventing further calls to nni_aio_start from succeeding. +// To correctly tear down an AIO, call stop, and make sure any other +// calles are not also stopped, before calling nni_aio_fini to release +// actual memory. +void +nni_aio_stop(nni_aio *aio) +{ + if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) { + // Never initialized, so nothing should have happened. + return; } + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled nni_mtx_unlock(&aio->a_lk); - // Stop any timeouts. If one was in flight, we wait until it - // completes (it could fire the completion callback.) - nni_aio_expire_remove(aio); - - // Cancel the AIO if it was scheduled. - if (cancelfn != NULL) { - cancelfn(aio); - } + nni_aio_cancel(aio, NNG_ECANCELED); // Wait for any outstanding task to complete. We won't schedule // new stuff because nni_aio_start will fail (due to AIO_FINI). nni_taskq_wait(NULL, &aio->a_tqe); - - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); - nni_mtx_fini(&aio->a_lk); - - if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { - NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); - } } int @@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv) void (*cancelfn)(nni_aio *); nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // The operation already completed - so there's nothing // left for us to do. nni_mtx_unlock(&aio->a_lk); return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = rv; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; - // Guaraneed to just be a list operation. - nni_aio_expire_remove(aio); - aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); + // Stop any I/O at the provider level. if (cancelfn != NULL) { cancelfn(aio); @@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv) aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - if (!(aio->a_flags & NNI_AIO_FINI)) { - // If we are finalizing, then we are done. - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -213,13 +198,12 @@ int nni_aio_finish(nni_aio *aio, int result, size_t count) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = count; @@ -240,13 +224,12 @@ int nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = 0; @@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg) } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = NNG_ETIMEDOUT; cancelfn = aio->a_prov_cancel; diff --git a/src/core/aio.h b/src/core/aio.h index ad5b560e..c698ce20 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -69,6 +69,16 @@ extern int nni_aio_init(nni_aio *, nni_cb, void *); // on zero'd memory. extern void nni_aio_fini(nni_aio *); +// nni_aio_stop cancels any unfinished I/O, running completion callbacks, +// but also prevents any new operations from starting (nni_aio_start will +// return NNG_ESTATE). This should be called before nni_aio_fini(). The +// best pattern is to call nni_aio_stop on all linked aios, before calling +// nni_aio_fini on any of them. This function will block until any +// callbacks are executed, and therefore it should never be executed +// from a callback itself. (To abort operations without blocking +// use nni_aio_cancel instead.) +extern void nni_aio_stop(nni_aio *); + // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait diff --git a/src/core/endpt.c b/src/core/endpt.c index ca76838a..6b474698 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -64,6 +64,10 @@ nni_ep_destroy(nni_ep *ep) if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + nni_aio_stop(&ep->ep_backoff); nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); @@ -179,6 +183,10 @@ nni_ep_reap(nni_ep *ep) { nni_ep_close(ep); // Extra sanity. + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + // Take us off the sock list. nni_sock_ep_remove(ep->ep_sock, ep); @@ -188,11 +196,13 @@ nni_ep_reap(nni_ep *ep) // done everything we can to wake any waiter (synchronous connect) // gracefully. nni_mtx_lock(&ep->ep_mtx); - while (!nni_list_empty(&ep->ep_pipes)) { - nni_cv_wait(&ep->ep_cv); - } - while (ep->ep_refcnt != 0) { - nni_cv_wait(&ep->ep_cv); + ep->ep_closed = 1; + for (;;) { + if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) { + nni_cv_wait(&ep->ep_cv); + continue; + } + break; } nni_mtx_unlock(&ep->ep_mtx); diff --git a/src/core/pipe.c b/src/core/pipe.c index 8cc21941..5b51a38b 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p) if (p == NULL) { return; } - + NNI_ASSERT(p->p_refcnt != 0xDEAD); // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&p->p_mtx); @@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p) nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&p->p_mtx); + p->p_refcnt = 0xDEAD; + + nni_aio_stop(&p->p_start_aio); + nni_aio_fini(&p->p_start_aio); if (p->p_proto_data != NULL) { p->p_proto_dtor(p->p_proto_data); @@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p) } p->p_reap = 1; - // abort any pending negotiation/start process. - nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); - // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); @@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p) nni_mtx_unlock(&p->p_mtx); - // Ensure that the negotiation step is aborted fully. - nni_aio_fini(&p->p_start_aio); + // abort any pending negotiation/start process. + nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); } // Pipe reap is called on a taskq when the pipe should be closed. No @@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p) // Transport close... nni_pipe_close(p); + nni_aio_stop(&p->p_start_aio); + // Remove the pipe from the socket and the endpoint. Note // that it is in theory possible for either of these to be null // if the pipe is being torn down before it is fully initialized. @@ -148,6 +151,7 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. + nni_pipe_close(p); nni_mtx_lock(&p->p_mtx); if (p->p_stop) { nni_mtx_unlock(&p->p_mtx); @@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg) } } -void -nni_pipe_hold(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt++; - nni_mtx_unlock(&p->p_mtx); -} - -void -nni_pipe_rele(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt--; - if (p->p_refcnt == 0) { - nni_cv_wake(&p->p_cv); - } - nni_mtx_unlock(&p->p_mtx); -} - int nni_pipe_create(nni_ep *ep, void *tdata) { @@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(p); + tran->tran_pipe->p_fini(tdata); return (NNG_ENOMEM); } @@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata) return (rv); } - nni_pipe_hold(p); - NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { - goto fail; - } - - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) { - goto fail; + nni_pipe_destroy(p); + return (rv); } - // Attempt to initialize sock protocol & endpoint. - if ((rv = nni_ep_pipe_add(ep, p)) != 0) { - goto fail; - } - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - goto fail; + if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) { + nni_pipe_destroy(p); + return (rv); } - // Start the pipe running. - nni_pipe_start(p); - nni_pipe_rele(p); - return (0); - -fail: - nni_pipe_stop(p); - nni_pipe_rele(p); - - return (rv); } int diff --git a/src/core/socket.c b/src/core/socket.c index 10bc1c80..23a1793a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock) nni_objhash_unref(nni_socks, sock->s_id); } +static int +nni_sock_pipe_start(nni_pipe *pipe) +{ + nni_sock *sock = pipe->p_sock; + void * pdata = nni_pipe_get_proto_data(pipe); + int rv; + + NNI_ASSERT(sock != NULL); + if (sock->s_closing) { + // We're closing, bail out. + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + // Protocol rejection for other reasons. + // E.g. pair and already have active connected partner. + return (rv); + } + return (0); +} + +static void +nni_sock_pipe_start_cb(void *arg) +{ + nni_pipe *pipe = arg; + nni_aio * aio = &pipe->p_start_aio; + + if (nni_aio_result(aio) != 0) { + // Failed I/O during start, abort everything. + nni_pipe_stop(pipe); + return; + } + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + return; + } +} + int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { + nni_mtx_lock(&ep->ep_mtx); + + if ((sock->s_closing) || (ep->ep_closed)) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } + rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); + if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + rv = sock->s_pipe_ops.pipe_init( &pipe->p_proto_data, pipe, sock->s_data); if (rv != 0) { + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&sock->s_mx); return (rv); } // Save the protocol destructor. pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; pipe->p_sock = sock; + pipe->p_ep = ep; + nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&ep->ep_pipes, pipe); + + // Start the initial negotiation I/O... + if (pipe->p_tran_ops.p_start == NULL) { + if (nni_sock_pipe_start(pipe) != 0) { + nni_pipe_stop(pipe); + } + } else { + pipe->p_tran_ops.p_start( + pipe->p_tran_data, &pipe->p_start_aio); + } + + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); return (0); } @@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - nni_mtx_lock(&sock->s_mx); + // Stop any pending negotiation. + nni_aio_stop(&pipe->p_start_aio); + nni_mtx_lock(&sock->s_mx); if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { nni_mtx_unlock(&sock->s_mx); return; @@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, arrange for it to teardown hard. (Close, etc.). - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_stop(ep); } - - // Wait for the pipes to be reaped (there should not be any because - // we have already reaped the EPs.) - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_cv_wait(&sock->s_cv); + // For each pipe, arrange for it to teardown hard. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); } - // Wait for the eps to be reaped. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + // We have to wait for *both* endpoints and pipes to be removed. + while ((!nni_list_empty(&sock->s_pipes)) || + (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); } diff --git a/src/core/socket.h b/src/core/socket.h index d0196eea..41dfbc33 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -87,8 +87,11 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // nni_sock_pipe_add adds the pipe to the socket. It is called by -// the generic pipe creation code. -extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); +// the generic pipe creation code. It also adds the socket to the +// ep list, and starts the pipe. It does all these to ensure that +// we have complete success or failure, and there is no point where +// a pipe could wind up orphaned. +extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); diff --git a/src/core/taskq.c b/src/core/taskq.c index 36129bfd..64179790 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -153,7 +154,7 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) return (NNG_ECLOSED); } // It might already be scheduled... if so don't redo it. - if (ent->tqe_tq == NULL) { + if (!nni_list_active(&tq->tq_ents, ent)) { ent->tqe_tq = tq; nni_list_append(&tq->tq_ents, ent); } |
