aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/pipe.c65
-rw-r--r--src/core/pipe.h2
2 files changed, 51 insertions, 16 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f24ea9c3..8cc21941 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -53,6 +53,14 @@ nni_pipe_destroy(nni_pipe *p)
return;
}
+ // Make sure any unlocked holders are done with this.
+ // This happens during initialization for example.
+ nni_mtx_lock(&p->p_mtx);
+ while (p->p_refcnt != 0) {
+ nni_cv_wait(&p->p_cv);
+ }
+ nni_mtx_unlock(&p->p_mtx);
+
if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
}
@@ -178,6 +186,25 @@ nni_pipe_start_cb(void *arg)
}
}
+void
+nni_pipe_hold(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt++;
+ nni_mtx_unlock(&p->p_mtx);
+}
+
+void
+nni_pipe_rele(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt--;
+ if (p->p_refcnt == 0) {
+ nni_cv_wake(&p->p_cv);
+ }
+ nni_mtx_unlock(&p->p_mtx);
+}
+
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
@@ -197,40 +224,46 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_data = NULL;
- if ((rv = nni_mtx_init(&p->p_mtx)) != 0) {
- nni_pipe_destroy(p);
- return (rv);
- }
- if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- nni_pipe_destroy(p);
+ if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
+ ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
+ tran->tran_pipe->p_fini(p);
+ nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
return (rv);
}
+ nni_pipe_hold(p);
+
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
- nni_pipe_destroy(p);
- return (rv);
+ if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
+ goto fail;
}
- p->p_tran_ops = *tran->tran_pipe;
- p->p_tran_data = tdata;
+ if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
+ goto fail;
+ }
// Attempt to initialize sock protocol & endpoint.
if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- nni_pipe_destroy(p);
- return (rv);
+ goto fail;
}
if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_ep_pipe_remove(ep, p);
- nni_pipe_destroy(p);
- return (rv);
+ goto fail;
}
// Start the pipe running.
nni_pipe_start(p);
+ nni_pipe_rele(p);
+
return (0);
+
+fail:
+ nni_pipe_stop(p);
+ nni_pipe_rele(p);
+
+ return (rv);
}
int
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 90e9213e..e2c76d4d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -31,7 +31,9 @@ struct nni_pipe {
nni_ep * p_ep;
int p_reap;
int p_stop;
+ int p_refcnt;
nni_mtx p_mtx;
+ nni_cv p_cv;
nni_taskq_ent p_reap_tqe;
nni_aio p_start_aio;
};