diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 49 |
1 files changed, 38 insertions, 11 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 321e1a09..ccd0dbe7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,6 +29,7 @@ struct nni_pipe { nni_ep * p_ep; bool p_closed; bool p_stop; + bool p_cbs; int p_refcnt; nni_mtx p_mtx; nni_cv p_cv; @@ -100,10 +101,19 @@ nni_pipe_sys_fini(void) static void nni_pipe_destroy(nni_pipe *p) { + bool cbs; if (p == NULL) { return; } + nni_mtx_lock(&p->p_mtx); + cbs = p->p_cbs; + nni_mtx_unlock(&p->p_mtx); + + if (cbs) { + nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); + } + // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); @@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p) if (nni_list_node_active(&p->p_ep_node)) { nni_ep_pipe_remove(p->p_ep, p); } + if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); } @@ -263,13 +274,32 @@ static void nni_pipe_start_cb(void *arg) { nni_pipe *p = arg; + nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; + uint32_t id = nni_pipe_id(p); - if ((nni_aio_result(aio) != 0) || - (nni_sock_pipe_start(p->p_sock, p) != 0) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (nni_aio_result(aio) != 0) { nni_pipe_stop(p); + return; } + + nni_mtx_lock(&p->p_mtx); + p->p_cbs = true; // We're running all cbs going forward + nni_mtx_unlock(&p->p_mtx); + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + if (nni_pipe_closed(p)) { + nni_pipe_stop(p); + return; + } + + if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || + nni_sock_closing(s)) { + nni_pipe_stop(p); + return; + } + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); } int @@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) } // Make a private copy of the transport ops. + p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_sock = sock; p->p_closed = false; p->p_stop = false; + p->p_cbs = false; p->p_refcnt = 0; NNI_LIST_NODE_INIT(&p->p_reap_node); @@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0)) { + ((rv = nni_ep_pipe_add(ep, p)) != 0) || + ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); return (rv); } - // At this point the endpoint knows about it, and the protocol - // might too, so on failure we have to tear it down fully as if done - // after a successful result. - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_pipe_stop(p); - } - return (rv); + return (0); } int |
