diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 65 |
1 files changed, 16 insertions, 49 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 8cc21941..5b51a38b 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p) if (p == NULL) { return; } - + NNI_ASSERT(p->p_refcnt != 0xDEAD); // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&p->p_mtx); @@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p) nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&p->p_mtx); + p->p_refcnt = 0xDEAD; + + nni_aio_stop(&p->p_start_aio); + nni_aio_fini(&p->p_start_aio); if (p->p_proto_data != NULL) { p->p_proto_dtor(p->p_proto_data); @@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p) } p->p_reap = 1; - // abort any pending negotiation/start process. - nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); - // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); @@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p) nni_mtx_unlock(&p->p_mtx); - // Ensure that the negotiation step is aborted fully. - nni_aio_fini(&p->p_start_aio); + // abort any pending negotiation/start process. + nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); } // Pipe reap is called on a taskq when the pipe should be closed. No @@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p) // Transport close... nni_pipe_close(p); + nni_aio_stop(&p->p_start_aio); + // Remove the pipe from the socket and the endpoint. Note // that it is in theory possible for either of these to be null // if the pipe is being torn down before it is fully initialized. @@ -148,6 +151,7 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. + nni_pipe_close(p); nni_mtx_lock(&p->p_mtx); if (p->p_stop) { nni_mtx_unlock(&p->p_mtx); @@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg) } } -void -nni_pipe_hold(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt++; - nni_mtx_unlock(&p->p_mtx); -} - -void -nni_pipe_rele(nni_pipe *p) -{ - nni_mtx_lock(&p->p_mtx); - p->p_refcnt--; - if (p->p_refcnt == 0) { - nni_cv_wake(&p->p_cv); - } - nni_mtx_unlock(&p->p_mtx); -} - int nni_pipe_create(nni_ep *ep, void *tdata) { @@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(p); + tran->tran_pipe->p_fini(tdata); return (NNG_ENOMEM); } @@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata) return (rv); } - nni_pipe_hold(p); - NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { - goto fail; - } - - if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) { - goto fail; + nni_pipe_destroy(p); + return (rv); } - // Attempt to initialize sock protocol & endpoint. - if ((rv = nni_ep_pipe_add(ep, p)) != 0) { - goto fail; - } - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - goto fail; + if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) { + nni_pipe_destroy(p); + return (rv); } - // Start the pipe running. - nni_pipe_start(p); - nni_pipe_rele(p); - return (0); - -fail: - nni_pipe_stop(p); - nni_pipe_rele(p); - - return (rv); } int |
