diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-28 23:07:28 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-28 23:07:28 -0700 |
| commit | fe3c9705072ac8cafecdf2ea6bca4c26f9464824 (patch) | |
| tree | 07aaea70cbf8bb6af369d5efede475ed03ffdd63 /src/core | |
| parent | 10d748fa6444324878a77cc5749c93b75819ced2 (diff) | |
| download | nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.gz nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.bz2 nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.zip | |
Refactor stop again, closing numerous races (thanks valgrind!)
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 123 | ||||
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/endpt.c | 17 | ||||
| -rw-r--r-- | src/core/init.c | 2 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 47 | ||||
| -rw-r--r-- | src/core/pipe.c | 61 | ||||
| -rw-r--r-- | src/core/pipe.h | 9 | ||||
| -rw-r--r-- | src/core/socket.c | 51 | ||||
| -rw-r--r-- | src/core/socket.h | 11 | ||||
| -rw-r--r-- | src/core/taskq.c | 4 |
10 files changed, 201 insertions, 127 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 11aadcb7..96e7c950 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -10,7 +10,10 @@ #include <string.h> #include "core/nng_impl.h" -#define NNI_AIO_WAKE (1<<0) +#define NNI_AIO_WAKE (1<<0) +#define NNI_AIO_DONE (1<<1) +#define NNI_AIO_FINI (1<<2) +#define NNI_AIO_STOP (1<<3) int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -32,6 +35,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; + aio->a_flags = 0; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -41,7 +45,24 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // Cancel the AIO if it was scheduled. + if (cancelfn != NULL) { + cancelfn(aio); + } + + // if the task is already dispatched, cancel it (or wait for it to + // complete). No further dispatches will happen because of the + // above logic to set NNI_AIO_FINI. nni_taskq_cancel(NULL, &aio->a_tqe); + + // At this point the AIO is done. nni_cv_fini(&aio->a_cv); nni_mtx_fini(&aio->a_lk); } @@ -82,21 +103,105 @@ nni_aio_wait(nni_aio *aio) } +int +nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) +{ + NNI_ASSERT(aio->a_prov_data == NULL); + NNI_ASSERT(aio->a_prov_cancel == NULL); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags &= ~(NNI_AIO_DONE|NNI_AIO_WAKE); + if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) { + // We should not reschedule anything at this point. + nni_mtx_unlock(&aio->a_lk); + return (NNG_ECANCELED); + } + aio->a_prov_cancel = cancel; + aio->a_prov_data = data; + nni_mtx_unlock(&aio->a_lk); + return (0); +} + + +void +nni_aio_stop(nni_aio *aio) +{ + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_DONE|NNI_AIO_STOP; + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // This unregisters the AIO from the provider. + if (cancelfn != NULL) { + cancelfn(aio); + } + + nni_mtx_lock(&aio->a_lk); + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + nni_mtx_unlock(&aio->a_lk); + + // This either aborts the task, or waits for it to complete if already + // dispatched. + nni_taskq_cancel(NULL, &aio->a_tqe); +} + + +void +nni_aio_cancel(nni_aio *aio) +{ + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & NNI_AIO_DONE) { + // The operation already completed - so there's nothing + // left for us to do. + nni_mtx_unlock(&aio->a_lk); + return; + } + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ECANCELED; + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // This unregisters the AIO from the provider. + if (cancelfn != NULL) { + cancelfn(aio); + } + + nni_mtx_lock(&aio->a_lk); + // These should have already been cleared by the cancel function. + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + + if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) { + nni_taskq_dispatch(NULL, &aio->a_tqe); + } + nni_mtx_unlock(&aio->a_lk); +} + + // I/O provider related functions. void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_cb cb; - void *arg; - nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & NNI_AIO_DONE) { + // Operation already done (canceled or timed out?) + nni_mtx_unlock(&aio->a_lk); + return; + } + aio->a_flags |= NNI_AIO_DONE; aio->a_result = result; aio->a_count = count; - cb = aio->a_cb; - arg = aio->a_cbarg; - nni_cv_wake(&aio->a_cv); - nni_mtx_unlock(&aio->a_lk); + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; - nni_taskq_dispatch(NULL, &aio->a_tqe); + if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) { + nni_taskq_dispatch(NULL, &aio->a_tqe); + } + nni_mtx_unlock(&aio->a_lk); } diff --git a/src/core/aio.h b/src/core/aio.h index a5f78b3f..a290281f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -87,4 +87,7 @@ extern void nni_aio_wait(nni_aio *); // and the amount of data transferred (if any). extern void nni_aio_finish(nni_aio *, int, size_t); +extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); +extern void nni_aio_stop(nni_aio *); + #endif // CORE_AIO_H diff --git a/src/core/endpt.c b/src/core/endpt.c index 74bd0314..e3f78ecd 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -203,14 +203,7 @@ nni_ep_stop(nni_ep *ep) void nni_ep_close(nni_ep *ep) { - nni_pipe *pipe; - nni_ep_stop(ep); - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_close(pipe); - } - nni_mtx_unlock(&ep->ep_mtx); } @@ -220,7 +213,15 @@ nni_ep_remove(nni_ep *ep) nni_pipe *pipe; nni_sock *sock = ep->ep_sock; - nni_ep_close(ep); + nni_ep_stop(ep); + + nni_thr_wait(&ep->ep_thr); + + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_lock(&ep->ep_mtx); while (nni_list_first(&ep->ep_pipes) != NULL) { diff --git a/src/core/init.c b/src/core/init.c index ca7c214f..98b187d8 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -66,12 +66,12 @@ nni_fini(void) { // XXX: We should make sure that underlying sockets and // file descriptors are closed. Details TBD. + nni_taskq_sys_fini(); nni_tran_sys_fini(); nni_pipe_sys_fini(); nni_ep_sys_fini(); nni_sock_sys_fini(); nni_random_sys_fini(); nni_timer_sys_fini(); - nni_taskq_sys_fini(); nni_plat_fini(); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 99b53274..3d373e2b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -137,8 +137,6 @@ nni_msgq_finish(nni_aio *aio, int rv) { nni_msgq *mq = aio->a_prov_data; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) { nni_list_remove(&mq->mq_aio_putq, aio); } @@ -339,8 +337,6 @@ nni_msgq_cancel(nni_aio *aio) if (nni_list_active(&mq->mq_aio_getq, aio)) { nni_list_remove(&mq->mq_aio_getq, aio); } - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_mtx_unlock(&mq->mq_lock); } @@ -349,8 +345,10 @@ void nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (nni_list_active(&mq->mq_aio_notify_put, aio)) { nni_list_remove(&mq->mq_aio_notify_put, aio); } @@ -363,8 +361,10 @@ void nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (nni_list_active(&mq->mq_aio_notify_get, aio)) { nni_list_remove(&mq->mq_aio_notify_get, aio); } @@ -379,7 +379,10 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); - + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (mq->mq_closed) { nni_aio_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&mq->mq_lock); @@ -391,9 +394,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; - nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -413,6 +413,10 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return; + } if (mq->mq_closed) { nni_aio_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&mq->mq_lock); @@ -424,9 +428,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } - aio->a_prov_data = mq; - aio->a_prov_cancel = nni_msgq_cancel; - nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -507,8 +508,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); raio->a_msg = msg; - raio->a_prov_cancel = NULL; - raio->a_prov_data = NULL; nni_aio_finish(raio, 0, len); nni_mtx_unlock(&mq->mq_lock); @@ -550,13 +549,9 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -568,13 +563,9 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { @@ -662,8 +653,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_list_remove(&mq->mq_aio_putq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_aio_finish(aio, NNG_ECLOSED, 0); } while (mq->mq_len > 0) { @@ -701,8 +690,6 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } @@ -710,8 +697,6 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } diff --git a/src/core/pipe.c b/src/core/pipe.c index f33f21a6..3dcfe9e0 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -46,7 +46,7 @@ nni_pipe_dtor(void *ptr) { nni_pipe *p = ptr; - if (p->p_proto_dtor != NULL) { + if (p->p_proto_data != NULL) { p->p_proto_dtor(p->p_proto_data); } if (p->p_tran_data != NULL) { @@ -145,23 +145,41 @@ nni_pipe_close(nni_pipe *p) } -// nni_pipe_remove is called by protocol implementations to indicate that -// they are finished using the pipe (it should be closed already), and the -// owning socket and endpoint should de-register it. +// We have to stop asynchronously using a task, because otherwise we can +// wind up having a callback from an AIO trying to cancel itself. That +// simply will not work. void nni_pipe_remove(nni_pipe *p) { - // Make sure the pipe is closed, in case it wasn't already done. + // Transport close... nni_pipe_close(p); nni_ep_pipe_remove(p->p_ep, p); - nni_sock_pipe_remove(p->p_sock, p); + + // Tell the protocol to stop. + nni_sock_pipe_stop(p->p_sock, p); // XXX: would be simpler to just do a destroy here nni_pipe_rele(p); } +void +nni_pipe_stop(nni_pipe *p) +{ + // Guard against recursive calls. + nni_mtx_lock(&p->p_mtx); + if (p->p_stop) { + nni_mtx_unlock(&p->p_mtx); + return; + } + p->p_stop = 1; + nni_mtx_unlock(&p->p_mtx); + nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_remove, p); + nni_taskq_dispatch(NULL, &p->p_reap_tqe); +} + + uint16_t nni_pipe_peer(nni_pipe *p) { @@ -175,6 +193,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) nni_pipe *p; int rv; uint32_t id; + void *pdata; rv = nni_objhash_alloc(nni_pipes, &id, (void **) &p); if (rv != 0) { @@ -187,18 +206,24 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *tran->tran_pipe; + // Save the protocol destructor. + p->p_proto_dtor = sock->s_pipe_ops.pipe_fini; + // Initialize the transport pipe data. if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) { nni_objhash_unref(nni_pipes, p->p_id); return (rv); } - if ((rv = nni_ep_pipe_add(ep, p)) != 0) { - nni_pipe_remove(p); + // Initialize protocol pipe data. + rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data); + if (rv != 0) { + nni_objhash_unref(nni_pipes, p->p_id); return (rv); } - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_pipe_remove(p); + + if ((rv = nni_ep_pipe_add(ep, p)) != 0) { + nni_objhash_unref(nni_pipes, p->p_id); return (rv); } @@ -222,16 +247,8 @@ int nni_pipe_start(nni_pipe *p) { int rv; - nni_pipe *scratch; - - rv = nni_objhash_find(nni_pipes, p->p_id, (void **) &scratch); - if (rv != 0) { - return (rv); - } - NNI_ASSERT(p == scratch); if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { - nni_pipe_remove(p); return (rv); } @@ -241,14 +258,6 @@ nni_pipe_start(nni_pipe *p) } -void -nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor) -{ - p->p_proto_data = data; - p->p_proto_dtor = dtor; -} - - void * nni_pipe_get_proto_data(nni_pipe *p) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 1f911480..80560e0e 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,7 +29,9 @@ struct nni_pipe { nni_sock * p_sock; nni_ep * p_ep; int p_reap; + int p_stop; nni_mtx p_mtx; + nni_taskq_ent p_reap_tqe; }; extern int nni_pipe_sys_init(void); @@ -54,6 +56,7 @@ extern void nni_pipe_rele(nni_pipe *); // resources released back to the system. The protocol MUST not reference // the pipe after this. extern void nni_pipe_remove(nni_pipe *); +extern void nni_pipe_stop(nni_pipe *); // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. @@ -64,12 +67,6 @@ extern uint16_t nni_pipe_peer(nni_pipe *); extern int nni_pipe_start(nni_pipe *); extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); -// nni_pipe_set_proto_data sets the protocol private data. No locking is -// performed, and this routine should only be called once per pipe at -// initialization. The third argument is called to destroy the data, -// at termination. -extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb); - // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. extern void *nni_pipe_get_proto_data(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index 85b97363..4535d2f4 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -83,32 +83,6 @@ nni_sock_rele(nni_sock *sock) int -nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) -{ - int rv; - void *pdata; - - rv = sock->s_pipe_ops.pipe_init(&pdata, pipe, sock->s_data); - if (rv != 0) { - return (rv); - } - - // XXX: place a hold on the socket. - - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - sock->s_pipe_ops.pipe_fini(pdata); - return (NNG_ECLOSED); - } - nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini); - nni_list_append(&sock->s_pipes, pipe); - nni_mtx_unlock(&sock->s_mx); - return (0); -} - - -int nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) { int rv; @@ -130,6 +104,9 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) return (rv); } + // We have claimed ownership of the pipe, so add it to the list. + // Up until this point, the caller could destroy the pipe. + nni_list_append(&sock->s_pipes, pipe); nni_mtx_unlock(&sock->s_mx); return (0); @@ -137,14 +114,19 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void -nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe) { void *pdata; - if (sock == NULL) { + pdata = nni_pipe_get_proto_data(pipe); + + nni_mtx_lock(&sock->s_mx); + + if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { + nni_mtx_unlock(&sock->s_mx); return; } - nni_mtx_lock(&sock->s_mx); + sock->s_pipe_ops.pipe_stop(pdata); if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); if (sock->s_closing) { @@ -552,18 +534,17 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); + // For each pipe, close the underlying transport. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_stop(pipe); + } + // For each ep, close it; this will also tell it to force any // of its pipes to close. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_close(ep); } - // For each pipe, close the underlying transport. Also move it - // to the idle list so we won't keep looping. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // Wait for the eps to be reaped. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { nni_list_remove(&sock->s_eps, ep); diff --git a/src/core/socket.h b/src/core/socket.h index 7d5e0f20..928264d9 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -88,16 +88,7 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *); extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); -// nni_sock_pipe_add is called by the pipe to register the pipe with -// with the socket. The pipe is added to the idle list. The protocol -// private pipe data is initialized as well. -extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); - -// nni_sock_pipe_remove is called by the pipe when the protocol is -// done with it. This is the sockets indication that it should be -// removed, and freed. The protocol MUST guarantee that the pipe is -// no longer in use when this function is called. -extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); +extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for // business. This also calls the socket/protocol specific add function, diff --git a/src/core/taskq.c b/src/core/taskq.c index f33a68bf..f4a0beee 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -184,7 +184,9 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_mtx_unlock(&tq->tq_mtx); return (NNG_ENOENT); } - nni_list_remove(&tq->tq_ents, ent); + if (nni_list_active(&tq->tq_ents, ent)) { + nni_list_remove(&tq->tq_ents, ent); + } nni_mtx_unlock(&tq->tq_mtx); return (0); } |
