summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
commitfe3c9705072ac8cafecdf2ea6bca4c26f9464824 (patch)
tree07aaea70cbf8bb6af369d5efede475ed03ffdd63 /src/core
parent10d748fa6444324878a77cc5749c93b75819ced2 (diff)
downloadnng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.gz
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.bz2
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.zip
Refactor stop again, closing numerous races (thanks valgrind!)
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c123
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/endpt.c17
-rw-r--r--src/core/init.c2
-rw-r--r--src/core/msgqueue.c47
-rw-r--r--src/core/pipe.c61
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/socket.c51
-rw-r--r--src/core/socket.h11
-rw-r--r--src/core/taskq.c4
10 files changed, 201 insertions, 127 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 11aadcb7..96e7c950 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -10,7 +10,10 @@
#include <string.h>
#include "core/nng_impl.h"
-#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_DONE (1<<1)
+#define NNI_AIO_FINI (1<<2)
+#define NNI_AIO_STOP (1<<3)
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -32,6 +35,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -41,7 +45,24 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // Cancel the AIO if it was scheduled.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ // if the task is already dispatched, cancel it (or wait for it to
+ // complete). No further dispatches will happen because of the
+ // above logic to set NNI_AIO_FINI.
nni_taskq_cancel(NULL, &aio->a_tqe);
+
+ // At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
nni_mtx_fini(&aio->a_lk);
}
@@ -82,21 +103,105 @@ nni_aio_wait(nni_aio *aio)
}
+int
+nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
+{
+ NNI_ASSERT(aio->a_prov_data == NULL);
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags &= ~(NNI_AIO_DONE|NNI_AIO_WAKE);
+ if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) {
+ // We should not reschedule anything at this point.
+ nni_mtx_unlock(&aio->a_lk);
+ return (NNG_ECANCELED);
+ }
+ aio->a_prov_cancel = cancel;
+ aio->a_prov_data = data;
+ nni_mtx_unlock(&aio->a_lk);
+ return (0);
+}
+
+
+void
+nni_aio_stop(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_DONE|NNI_AIO_STOP;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This either aborts the task, or waits for it to complete if already
+ // dispatched.
+ nni_taskq_cancel(NULL, &aio->a_tqe);
+}
+
+
+void
+nni_aio_cancel(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // The operation already completed - so there's nothing
+ // left for us to do.
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
+ aio->a_result = NNG_ECANCELED;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ // These should have already been cleared by the cancel function.
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
+}
+
+
// I/O provider related functions.
void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_cb cb;
- void *arg;
-
nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // Operation already done (canceled or timed out?)
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
aio->a_result = result;
aio->a_count = count;
- cb = aio->a_cb;
- arg = aio->a_cbarg;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index a5f78b3f..a290281f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -87,4 +87,7 @@ extern void nni_aio_wait(nni_aio *);
// and the amount of data transferred (if any).
extern void nni_aio_finish(nni_aio *, int, size_t);
+extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
+extern void nni_aio_stop(nni_aio *);
+
#endif // CORE_AIO_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 74bd0314..e3f78ecd 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -203,14 +203,7 @@ nni_ep_stop(nni_ep *ep)
void
nni_ep_close(nni_ep *ep)
{
- nni_pipe *pipe;
-
nni_ep_stop(ep);
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_close(pipe);
- }
- nni_mtx_unlock(&ep->ep_mtx);
}
@@ -220,7 +213,15 @@ nni_ep_remove(nni_ep *ep)
nni_pipe *pipe;
nni_sock *sock = ep->ep_sock;
- nni_ep_close(ep);
+ nni_ep_stop(ep);
+
+ nni_thr_wait(&ep->ep_thr);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&ep->ep_mtx);
while (nni_list_first(&ep->ep_pipes) != NULL) {
diff --git a/src/core/init.c b/src/core/init.c
index ca7c214f..98b187d8 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -66,12 +66,12 @@ nni_fini(void)
{
// XXX: We should make sure that underlying sockets and
// file descriptors are closed. Details TBD.
+ nni_taskq_sys_fini();
nni_tran_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
nni_sock_sys_fini();
nni_random_sys_fini();
nni_timer_sys_fini();
- nni_taskq_sys_fini();
nni_plat_fini();
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 99b53274..3d373e2b 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -137,8 +137,6 @@ nni_msgq_finish(nni_aio *aio, int rv)
{
nni_msgq *mq = aio->a_prov_data;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
nni_list_remove(&mq->mq_aio_putq, aio);
}
@@ -339,8 +337,6 @@ nni_msgq_cancel(nni_aio *aio)
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
}
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -349,8 +345,10 @@ void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
nni_list_remove(&mq->mq_aio_notify_put, aio);
}
@@ -363,8 +361,10 @@ void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
nni_list_remove(&mq->mq_aio_notify_get, aio);
}
@@ -379,7 +379,10 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
-
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -391,9 +394,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -413,6 +413,10 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -424,9 +428,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -507,8 +508,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
raio->a_msg = msg;
- raio->a_prov_cancel = NULL;
- raio->a_prov_data = NULL;
nni_aio_finish(raio, 0, len);
nni_mtx_unlock(&mq->mq_lock);
@@ -550,13 +549,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -568,13 +563,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -662,8 +653,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_list_remove(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -701,8 +690,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
@@ -710,8 +697,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f33f21a6..3dcfe9e0 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -46,7 +46,7 @@ nni_pipe_dtor(void *ptr)
{
nni_pipe *p = ptr;
- if (p->p_proto_dtor != NULL) {
+ if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
}
if (p->p_tran_data != NULL) {
@@ -145,23 +145,41 @@ nni_pipe_close(nni_pipe *p)
}
-// nni_pipe_remove is called by protocol implementations to indicate that
-// they are finished using the pipe (it should be closed already), and the
-// owning socket and endpoint should de-register it.
+// We have to stop asynchronously using a task, because otherwise we can
+// wind up having a callback from an AIO trying to cancel itself. That
+// simply will not work.
void
nni_pipe_remove(nni_pipe *p)
{
- // Make sure the pipe is closed, in case it wasn't already done.
+ // Transport close...
nni_pipe_close(p);
nni_ep_pipe_remove(p->p_ep, p);
- nni_sock_pipe_remove(p->p_sock, p);
+
+ // Tell the protocol to stop.
+ nni_sock_pipe_stop(p->p_sock, p);
// XXX: would be simpler to just do a destroy here
nni_pipe_rele(p);
}
+void
+nni_pipe_stop(nni_pipe *p)
+{
+ // Guard against recursive calls.
+ nni_mtx_lock(&p->p_mtx);
+ if (p->p_stop) {
+ nni_mtx_unlock(&p->p_mtx);
+ return;
+ }
+ p->p_stop = 1;
+ nni_mtx_unlock(&p->p_mtx);
+ nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_remove, p);
+ nni_taskq_dispatch(NULL, &p->p_reap_tqe);
+}
+
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -175,6 +193,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
nni_pipe *p;
int rv;
uint32_t id;
+ void *pdata;
rv = nni_objhash_alloc(nni_pipes, &id, (void **) &p);
if (rv != 0) {
@@ -187,18 +206,24 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
+ // Save the protocol destructor.
+ p->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
+
// Initialize the transport pipe data.
if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- nni_pipe_remove(p);
+ // Initialize protocol pipe data.
+ rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data);
+ if (rv != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_remove(p);
+
+ if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
@@ -222,16 +247,8 @@ int
nni_pipe_start(nni_pipe *p)
{
int rv;
- nni_pipe *scratch;
-
- rv = nni_objhash_find(nni_pipes, p->p_id, (void **) &scratch);
- if (rv != 0) {
- return (rv);
- }
- NNI_ASSERT(p == scratch);
if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
- nni_pipe_remove(p);
return (rv);
}
@@ -241,14 +258,6 @@ nni_pipe_start(nni_pipe *p)
}
-void
-nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor)
-{
- p->p_proto_data = data;
- p->p_proto_dtor = dtor;
-}
-
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1f911480..80560e0e 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -29,7 +29,9 @@ struct nni_pipe {
nni_sock * p_sock;
nni_ep * p_ep;
int p_reap;
+ int p_stop;
nni_mtx p_mtx;
+ nni_taskq_ent p_reap_tqe;
};
extern int nni_pipe_sys_init(void);
@@ -54,6 +56,7 @@ extern void nni_pipe_rele(nni_pipe *);
// resources released back to the system. The protocol MUST not reference
// the pipe after this.
extern void nni_pipe_remove(nni_pipe *);
+extern void nni_pipe_stop(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
@@ -64,12 +67,6 @@ extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
-// nni_pipe_set_proto_data sets the protocol private data. No locking is
-// performed, and this routine should only be called once per pipe at
-// initialization. The third argument is called to destroy the data,
-// at termination.
-extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb);
-
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 85b97363..4535d2f4 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -83,32 +83,6 @@ nni_sock_rele(nni_sock *sock)
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
-{
- int rv;
- void *pdata;
-
- rv = sock->s_pipe_ops.pipe_init(&pdata, pipe, sock->s_data);
- if (rv != 0) {
- return (rv);
- }
-
- // XXX: place a hold on the socket.
-
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- sock->s_pipe_ops.pipe_fini(pdata);
- return (NNG_ECLOSED);
- }
- nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini);
- nni_list_append(&sock->s_pipes, pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (0);
-}
-
-
-int
nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
{
int rv;
@@ -130,6 +104,9 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (rv);
}
+ // We have claimed ownership of the pipe, so add it to the list.
+ // Up until this point, the caller could destroy the pipe.
+ nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -137,14 +114,19 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
-nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
{
void *pdata;
- if (sock == NULL) {
+ pdata = nni_pipe_get_proto_data(pipe);
+
+ nni_mtx_lock(&sock->s_mx);
+
+ if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
+ nni_mtx_unlock(&sock->s_mx);
return;
}
- nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_ops.pipe_stop(pdata);
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
if (sock->s_closing) {
@@ -552,18 +534,17 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, close the underlying transport.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+
// For each ep, close it; this will also tell it to force any
// of its pipes to close.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_close(ep);
}
- // For each pipe, close the underlying transport. Also move it
- // to the idle list so we won't keep looping.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// Wait for the eps to be reaped.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
nni_list_remove(&sock->s_eps, ep);
diff --git a/src/core/socket.h b/src/core/socket.h
index 7d5e0f20..928264d9 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -88,16 +88,7 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
-// nni_sock_pipe_add is called by the pipe to register the pipe with
-// with the socket. The pipe is added to the idle list. The protocol
-// private pipe data is initialized as well.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
-
-// nni_sock_pipe_remove is called by the pipe when the protocol is
-// done with it. This is the sockets indication that it should be
-// removed, and freed. The protocol MUST guarantee that the pipe is
-// no longer in use when this function is called.
-extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
+extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for
// business. This also calls the socket/protocol specific add function,
diff --git a/src/core/taskq.c b/src/core/taskq.c
index f33a68bf..f4a0beee 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -184,7 +184,9 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_unlock(&tq->tq_mtx);
return (NNG_ENOENT);
}
- nni_list_remove(&tq->tq_ents, ent);
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ nni_list_remove(&tq->tq_ents, ent);
+ }
nni_mtx_unlock(&tq->tq_mtx);
return (0);
}