aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c49
1 files changed, 38 insertions, 11 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 321e1a09..ccd0dbe7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,6 +29,7 @@ struct nni_pipe {
nni_ep * p_ep;
bool p_closed;
bool p_stop;
+ bool p_cbs;
int p_refcnt;
nni_mtx p_mtx;
nni_cv p_cv;
@@ -100,10 +101,19 @@ nni_pipe_sys_fini(void)
static void
nni_pipe_destroy(nni_pipe *p)
{
+ bool cbs;
if (p == NULL) {
return;
}
+ nni_mtx_lock(&p->p_mtx);
+ cbs = p->p_cbs;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (cbs) {
+ nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
+ }
+
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
@@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p)
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
}
+
if (nni_list_node_active(&p->p_sock_node)) {
nni_sock_pipe_remove(p->p_sock, p);
}
@@ -263,13 +274,32 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
+ nni_sock *s = p->p_sock;
nni_aio * aio = p->p_start_aio;
+ uint32_t id = nni_pipe_id(p);
- if ((nni_aio_result(aio) != 0) ||
- (nni_sock_pipe_start(p->p_sock, p) != 0) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (nni_aio_result(aio) != 0) {
nni_pipe_stop(p);
+ return;
}
+
+ nni_mtx_lock(&p->p_mtx);
+ p->p_cbs = true; // We're running all cbs going forward
+ nni_mtx_unlock(&p->p_mtx);
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
+ if (nni_pipe_closed(p)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
+ nni_sock_closing(s)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id);
}
int
@@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
// Make a private copy of the transport ops.
+ p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_sock = sock;
p->p_closed = false;
p->p_stop = false;
+ p->p_cbs = false;
p->p_refcnt = 0;
NNI_LIST_NODE_INIT(&p->p_reap_node);
@@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
+ ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
return (rv);
}
- // At this point the endpoint knows about it, and the protocol
- // might too, so on failure we have to tear it down fully as if done
- // after a successful result.
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_stop(p);
- }
- return (rv);
+ return (0);
}
int