From fe3c9705072ac8cafecdf2ea6bca4c26f9464824 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 28 Jun 2017 23:07:28 -0700 Subject: Refactor stop again, closing numerous races (thanks valgrind!) --- src/core/pipe.c | 61 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 26 deletions(-) (limited to 'src/core/pipe.c') diff --git a/src/core/pipe.c b/src/core/pipe.c index f33f21a6..3dcfe9e0 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -46,7 +46,7 @@ nni_pipe_dtor(void *ptr) { nni_pipe *p = ptr; - if (p->p_proto_dtor != NULL) { + if (p->p_proto_data != NULL) { p->p_proto_dtor(p->p_proto_data); } if (p->p_tran_data != NULL) { @@ -145,23 +145,41 @@ nni_pipe_close(nni_pipe *p) } -// nni_pipe_remove is called by protocol implementations to indicate that -// they are finished using the pipe (it should be closed already), and the -// owning socket and endpoint should de-register it. +// We have to stop asynchronously using a task, because otherwise we can +// wind up having a callback from an AIO trying to cancel itself. That +// simply will not work. void nni_pipe_remove(nni_pipe *p) { - // Make sure the pipe is closed, in case it wasn't already done. + // Transport close... nni_pipe_close(p); nni_ep_pipe_remove(p->p_ep, p); - nni_sock_pipe_remove(p->p_sock, p); + + // Tell the protocol to stop. + nni_sock_pipe_stop(p->p_sock, p); // XXX: would be simpler to just do a destroy here nni_pipe_rele(p); } +void +nni_pipe_stop(nni_pipe *p) +{ + // Guard against recursive calls. + nni_mtx_lock(&p->p_mtx); + if (p->p_stop) { + nni_mtx_unlock(&p->p_mtx); + return; + } + p->p_stop = 1; + nni_mtx_unlock(&p->p_mtx); + nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_remove, p); + nni_taskq_dispatch(NULL, &p->p_reap_tqe); +} + + uint16_t nni_pipe_peer(nni_pipe *p) { @@ -175,6 +193,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) nni_pipe *p; int rv; uint32_t id; + void *pdata; rv = nni_objhash_alloc(nni_pipes, &id, (void **) &p); if (rv != 0) { @@ -187,18 +206,24 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *tran->tran_pipe; + // Save the protocol destructor. + p->p_proto_dtor = sock->s_pipe_ops.pipe_fini; + // Initialize the transport pipe data. if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) { nni_objhash_unref(nni_pipes, p->p_id); return (rv); } - if ((rv = nni_ep_pipe_add(ep, p)) != 0) { - nni_pipe_remove(p); + // Initialize protocol pipe data. + rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data); + if (rv != 0) { + nni_objhash_unref(nni_pipes, p->p_id); return (rv); } - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_pipe_remove(p); + + if ((rv = nni_ep_pipe_add(ep, p)) != 0) { + nni_objhash_unref(nni_pipes, p->p_id); return (rv); } @@ -222,16 +247,8 @@ int nni_pipe_start(nni_pipe *p) { int rv; - nni_pipe *scratch; - - rv = nni_objhash_find(nni_pipes, p->p_id, (void **) &scratch); - if (rv != 0) { - return (rv); - } - NNI_ASSERT(p == scratch); if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { - nni_pipe_remove(p); return (rv); } @@ -241,14 +258,6 @@ nni_pipe_start(nni_pipe *p) } -void -nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor) -{ - p->p_proto_data = data; - p->p_proto_dtor = dtor; -} - - void * nni_pipe_get_proto_data(nni_pipe *p) { -- cgit v1.2.3-70-g09d2