diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 93 |
1 files changed, 54 insertions, 39 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 010d306d..ba3027cf 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -18,21 +18,22 @@ // performed in the context of the protocol. struct nni_pipe { - uint32_t p_id; - nni_tran_pipe p_tran_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_ep * p_ep; - bool p_closed; - bool p_stop; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; + uint32_t p_id; + nni_tran_pipe_ops p_tran_ops; + nni_proto_pipe_ops p_proto_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_ep * p_ep; + bool p_closed; + bool p_stop; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_list_node p_reap_node; + nni_aio * p_start_aio; }; static nni_idhash *nni_pipes; @@ -106,6 +107,10 @@ nni_pipe_destroy(nni_pipe *p) // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + // We have exclusive access at this point, so we can check if // we are still on any lists. if (nni_list_node_active(&p->p_ep_node)) { @@ -126,6 +131,9 @@ nni_pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); + } if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -189,6 +197,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { + // abort any pending negotiation/start process. + nni_aio_close(p->p_start_aio); + nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -196,16 +207,16 @@ nni_pipe_close(nni_pipe *p) return; } p->p_closed = true; + nni_mtx_unlock(&p->p_mtx); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_close(p->p_proto_data); + } // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } - - nni_mtx_unlock(&p->p_mtx); - - // abort any pending negotiation/start process. - nni_aio_abort(p->p_start_aio, NNG_ECLOSED); } bool @@ -222,7 +233,6 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. - nni_pipe_close(p); nni_mtx_lock(&p->p_mtx); if (p->p_stop) { nni_mtx_unlock(&p->p_mtx); @@ -231,6 +241,8 @@ nni_pipe_stop(nni_pipe *p) p->p_stop = true; nni_mtx_unlock(&p->p_mtx); + nni_pipe_close(p); + // Put it on the reaplist for async cleanup nni_mtx_lock(&nni_pipe_reap_lk); nni_list_append(&nni_pipe_reap_list, p); @@ -250,12 +262,9 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_aio * aio = p->p_start_aio; - if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); - return; - } - - if (nni_sock_pipe_start(p->p_sock, p) != 0) { + if ((nni_aio_result(aio) != 0) || + (nni_sock_pipe_start(p->p_sock, p) != 0) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { nni_pipe_stop(p); } } @@ -263,10 +272,12 @@ nni_pipe_start_cb(void *arg) int nni_pipe_create(nni_ep *ep, void *tdata) { - nni_pipe *p; - int rv; - nni_tran *tran = nni_ep_tran(ep); - nni_sock *sock = nni_ep_sock(ep); + nni_pipe * p; + int rv; + nni_tran * tran = nni_ep_tran(ep); + nni_sock * sock = nni_ep_sock(ep); + void * sdata = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -277,6 +288,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) // Make a private copy of the transport ops. p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; + p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; @@ -290,6 +302,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { uint64_t id; nni_mtx_lock(&nni_pipe_lk); @@ -299,11 +312,19 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_unlock(&nni_pipe_lk); } - if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || - ((rv = nni_sock_pipe_add(sock, p)) != 0)) { + if ((rv != 0) || + ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || + ((rv = nni_ep_pipe_add(ep, p)) != 0)) { nni_pipe_destroy(p); + return (rv); } + // At this point the endpoint knows about it, and the protocol + // might too, so on failure we have to tear it down fully as if done + // after a successful result. + if ((rv = nni_sock_pipe_add(sock, p)) != 0) { + nni_pipe_stop(p); + } return (rv); } @@ -339,12 +360,6 @@ nni_pipe_get_proto_data(nni_pipe *p) } void -nni_pipe_set_proto_data(nni_pipe *p, void *data) -{ - p->p_proto_data = data; -} - -void nni_pipe_sock_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_pipe, p_sock_node); |
