From 159dc95695515db767e208c41d5f44b334b71adc Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 2 May 2018 16:01:22 -0700 Subject: fixes #389 Need pipe notification callbacks This adds a new pipe event notification API (callbacks called on either pipe add or remove), including both tests and docs. Also supporting APIs to get the socket or endpoint associated with a pipe are included (tested and documented as well.) --- src/core/pipe.c | 79 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 24 deletions(-) (limited to 'src/core/pipe.c') diff --git a/src/core/pipe.c b/src/core/pipe.c index f849e00e..010d306d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -18,7 +18,7 @@ // performed in the context of the protocol. struct nni_pipe { - uint64_t p_id; + uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; void * p_proto_data; @@ -26,7 +26,7 @@ struct nni_pipe { nni_list_node p_ep_node; nni_sock * p_sock; nni_ep * p_ep; - bool p_reap; + bool p_closed; bool p_stop; int p_refcnt; nni_mtx p_mtx; @@ -106,6 +106,15 @@ nni_pipe_destroy(nni_pipe *p) // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); + // We have exclusive access at this point, so we can check if + // we are still on any lists. + if (nni_list_node_active(&p->p_ep_node)) { + nni_ep_pipe_remove(p->p_ep, p); + } + if (nni_list_node_active(&p->p_sock_node)) { + nni_sock_pipe_remove(p->p_sock, p); + } + // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&nni_pipe_lk); @@ -117,16 +126,6 @@ nni_pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); - // We have exclusive access at this point, so we can check if - // we are still on any lists. - - if (nni_list_node_active(&p->p_ep_node)) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } - if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -141,13 +140,14 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) int rv; nni_pipe *p; nni_mtx_lock(&nni_pipe_lk); + + // We don't care if the pipe is "closed". End users only have + // access to the pipe in order to obtain properties (which may + // be retried during the post-close notification callback) or to + // close the pipe. if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) { - if (p->p_reap) { - rv = NNG_ECLOSED; - } else { - p->p_refcnt++; - *pp = p; - } + p->p_refcnt++; + *pp = p; } nni_mtx_unlock(&nni_pipe_lk); return (rv); @@ -168,7 +168,7 @@ nni_pipe_rele(nni_pipe *p) uint32_t nni_pipe_id(nni_pipe *p) { - return ((uint32_t) p->p_id); + return (p->p_id); } void @@ -184,18 +184,18 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) } // nni_pipe_close closes the underlying connection. It is expected that -// subsequent attempts receive or send (including any waiting receive) will +// subsequent attempts to receive or send (including any waiting receive) will // simply return NNG_ECLOSED. void nni_pipe_close(nni_pipe *p) { nni_mtx_lock(&p->p_mtx); - if (p->p_reap) { + if (p->p_closed) { // We already did a close. nni_mtx_unlock(&p->p_mtx); return; } - p->p_reap = true; + p->p_closed = true; // Close the underlying transport. if (p->p_tran_data != NULL) { @@ -208,6 +208,16 @@ nni_pipe_close(nni_pipe *p) nni_aio_abort(p->p_start_aio, NNG_ECLOSED); } +bool +nni_pipe_closed(nni_pipe *p) +{ + bool rv; + nni_mtx_lock(&p->p_mtx); + rv = p->p_closed; + nni_mtx_unlock(&p->p_mtx); + return (rv); +} + void nni_pipe_stop(nni_pipe *p) { @@ -270,7 +280,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; - p->p_reap = false; + p->p_closed = false; p->p_stop = false; p->p_refcnt = 0; @@ -281,8 +291,11 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { + uint64_t id; nni_mtx_lock(&nni_pipe_lk); - rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = (uint32_t) id; + } nni_mtx_unlock(&nni_pipe_lk); } @@ -343,6 +356,24 @@ nni_pipe_ep_list_init(nni_list *list) NNI_LIST_INIT(list, nni_pipe, p_ep_node); } +uint32_t +nni_pipe_sock_id(nni_pipe *p) +{ + return (nni_sock_id(p->p_sock)); +} + +uint32_t +nni_pipe_ep_id(nni_pipe *p) +{ + return (nni_ep_id(p->p_ep)); +} + +int +nni_pipe_ep_mode(nni_pipe *p) +{ + return (nni_ep_mode(p->p_ep)); +} + static void nni_pipe_reaper(void *notused) { -- cgit v1.2.3-70-g09d2