diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 114 |
1 files changed, 48 insertions, 66 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 1658aabc..75c4c8d6 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -15,6 +15,24 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. +struct nni_pipe { + uint32_t p_id; + nni_tran_pipe p_tran_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_ep * p_ep; + int p_reap; + int p_stop; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_list_node p_reap_node; + nni_aio p_start_aio; +}; + static nni_idhash *nni_pipes; static nni_list nni_pipe_reap_list; @@ -78,7 +96,10 @@ nni_pipe_destroy(nni_pipe *p) if (p == NULL) { return; } - NNI_ASSERT(p->p_refcnt != 0xDEAD); + + // Stop any pending negotiation. + nni_aio_stop(&p->p_start_aio); + // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&p->p_mtx); @@ -86,14 +107,17 @@ nni_pipe_destroy(nni_pipe *p) nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&p->p_mtx); - p->p_refcnt = 0xDEAD; - nni_aio_stop(&p->p_start_aio); + // We have exclusive access at this point, so we can check if + // we are still on any lists. + nni_aio_fini(&p->p_start_aio); - if (p->p_proto_data != NULL) { - p->p_proto_dtor(p->p_proto_data); + if (nni_list_node_active(&p->p_ep_node)) { + nni_ep_pipe_remove(p->p_ep, p); } + nni_sock_pipe_remove(p->p_sock, p); + if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -148,31 +172,6 @@ nni_pipe_close(nni_pipe *p) nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); } -// Pipe reap is called on a taskq when the pipe should be closed. No -// locks are held. This routine must take care to synchronously ensure -// that no further references to the pipe are possible, then it may -// destroy the pipe. -static void -nni_pipe_reap(nni_pipe *p) -{ - // Transport close... - nni_pipe_close(p); - - nni_aio_stop(&p->p_start_aio); - - // Remove the pipe from the socket and the endpoint. Note - // that it is in theory possible for either of these to be null - // if the pipe is being torn down before it is fully initialized. - if (p->p_ep != NULL) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (p->p_sock != NULL) { - nni_sock_pipe_remove(p->p_sock, p); - } - - nni_pipe_destroy(p); -} - void nni_pipe_stop(nni_pipe *p) { @@ -206,16 +205,12 @@ nni_pipe_start_cb(void *arg) nni_aio * aio = &p->p_start_aio; int rv; - nni_mtx_lock(&p->p_mtx); if ((rv = nni_aio_result(aio)) != 0) { - nni_mtx_unlock(&p->p_mtx); nni_pipe_stop(p); return; } - nni_mtx_unlock(&p->p_mtx); - - if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { + if ((rv = nni_sock_pipe_start(p->p_sock, p)) != 0) { nni_pipe_stop(p); } } @@ -225,8 +220,8 @@ nni_pipe_create(nni_ep *ep, void *tdata) { nni_pipe *p; int rv; - nni_tran *tran = ep->ep_tran; - nni_sock *sock = ep->ep_sock; + nni_tran *tran = nni_ep_tran(ep); + nni_sock *sock = nni_ep_sock(ep); if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -238,31 +233,24 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_data = NULL; + p->p_ep = ep; + p->p_sock = sock; NNI_LIST_NODE_INIT(&p->p_reap_node); - - if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || - ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) { - tran->tran_pipe->p_fini(p); - nni_mtx_fini(&p->p_mtx); - NNI_FREE_STRUCT(p); - return (rv); - } - NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { - nni_pipe_destroy(p); - return (rv); - } - - if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) { + if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || + ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) || + ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != + 0) || + ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + ((rv = nni_ep_pipe_add(ep, p)) != 0) || + ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); - return (rv); } - return (0); + return (rv); } int @@ -292,6 +280,12 @@ nni_pipe_get_proto_data(nni_pipe *p) } void +nni_pipe_set_proto_data(nni_pipe *p, void *data) +{ + p->p_proto_data = data; +} + +void nni_pipe_sock_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_pipe, p_sock_node); @@ -318,18 +312,6 @@ nni_pipe_reaper(void *notused) // Transport close... nni_pipe_close(p); - nni_aio_stop(&p->p_start_aio); - - // Remove the pipe from the socket and the endpoint. - // Note that it is in theory possible for either of - // these to be null if the pipe is being torn down - // before it is fully initialized. - if (p->p_ep != NULL) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (p->p_sock != NULL) { - nni_sock_pipe_remove(p->p_sock, p); - } nni_pipe_destroy(p); nni_mtx_lock(&nni_pipe_reap_lk); continue; |
