aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c65
1 files changed, 16 insertions, 49 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8cc21941..5b51a38b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
-
+ NNI_ASSERT(p->p_refcnt != 0xDEAD);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
+ p->p_refcnt = 0xDEAD;
+
+ nni_aio_stop(&p->p_start_aio);
+ nni_aio_fini(&p->p_start_aio);
if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
@@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p)
}
p->p_reap = 1;
- // abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
-
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
@@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
- // Ensure that the negotiation step is aborted fully.
- nni_aio_fini(&p->p_start_aio);
+ // abort any pending negotiation/start process.
+ nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
// Pipe reap is called on a taskq when the pipe should be closed. No
@@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p)
// Transport close...
nni_pipe_close(p);
+ nni_aio_stop(&p->p_start_aio);
+
// Remove the pipe from the socket and the endpoint. Note
// that it is in theory possible for either of these to be null
// if the pipe is being torn down before it is fully initialized.
@@ -148,6 +151,7 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
+ nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg)
}
}
-void
-nni_pipe_hold(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt++;
- nni_mtx_unlock(&p->p_mtx);
-}
-
-void
-nni_pipe_rele(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
- nni_cv_wake(&p->p_cv);
- }
- nni_mtx_unlock(&p->p_mtx);
-}
-
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
@@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
- tran->tran_pipe->p_fini(p);
+ tran->tran_pipe->p_fini(tdata);
return (NNG_ENOMEM);
}
@@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata)
return (rv);
}
- nni_pipe_hold(p);
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- goto fail;
- }
-
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
- goto fail;
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Attempt to initialize sock protocol & endpoint.
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- goto fail;
- }
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- goto fail;
+ if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Start the pipe running.
- nni_pipe_start(p);
- nni_pipe_rele(p);
-
return (0);
-
-fail:
- nni_pipe_stop(p);
- nni_pipe_rele(p);
-
- return (rv);
}
int