diff options
Diffstat (limited to 'src/core/pipe.c')
| -rw-r--r-- | src/core/pipe.c | 207 |
1 files changed, 34 insertions, 173 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 6b9b082c..e95fe1d4 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <string.h> @@ -17,54 +18,17 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. -struct nni_pipe { - uint32_t p_id; - uint32_t p_sock_id; - uint32_t p_dialer_id; - uint32_t p_listener_id; - nni_tran_pipe_ops p_tran_ops; - nni_proto_pipe_ops p_proto_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_listener * p_listener; - nni_dialer * p_dialer; - bool p_closed; - nni_atomic_flag p_stop; - bool p_cbs; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; -}; - static nni_idhash *nni_pipes; static nni_mtx nni_pipe_lk; -static nni_list nni_pipe_reap_list; -static nni_mtx nni_pipe_reap_lk; -static nni_cv nni_pipe_reap_cv; -static nni_thr nni_pipe_reap_thr; -static int nni_pipe_reap_run; - -static void nni_pipe_reaper(void *); - int nni_pipe_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); nni_mtx_init(&nni_pipe_lk); - nni_mtx_init(&nni_pipe_reap_lk); - nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); - if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != - 0)) { + if ((rv = nni_idhash_init(&nni_pipes)) != 0) { return (rv); } @@ -76,25 +40,13 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_pipe_reap_run = 1; - nni_thr_run(&nni_pipe_reap_thr); - return (0); } void nni_pipe_sys_fini(void) { - if (nni_pipe_reap_run) { - nni_mtx_lock(&nni_pipe_reap_lk); - nni_pipe_reap_run = 0; - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); - } - - nni_thr_fini(&nni_pipe_reap_thr); - nni_cv_fini(&nni_pipe_reap_cv); - nni_mtx_fini(&nni_pipe_reap_lk); + nni_reap_drain(); nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); @@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p) return; } - if (p->p_cbs) { - nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); - } - - // Stop any pending negotiation. - nni_aio_stop(p->p_start_aio); - - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - - // We have exclusive access at this point, so we can check if - // we are still on any lists. - nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL - nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL - - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_id != 0) { nni_idhash_remove(nni_pipes, p->p_id); } + // This wait guarantees that all callers are done with us. while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } + + nni_pipe_remove(p); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_fini(p->p_proto_data); } @@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } + + nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p); } bool @@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p) return (rv); } -void -nni_pipe_stop(nni_pipe *p) -{ - // Guard against recursive calls. - if (nni_atomic_flag_test_and_set(&p->p_stop)) { - return; - } - - nni_pipe_close(p); - - // Put it on the reaplist for async cleanup - nni_mtx_lock(&nni_pipe_reap_lk); - nni_list_append(&nni_pipe_reap_list, p); - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); -} - uint16_t nni_pipe_peer(nni_pipe *p) { @@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; - uint32_t id = nni_pipe_id(p); if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - p->p_cbs = true; // We're running all cbs going forward - - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); if (nni_pipe_closed(p)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || nni_sock_closing(s)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); } int -nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) +nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; @@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_sock = sock; - p->p_sock_id = nni_sock_id(sock); p->p_closed = false; p->p_cbs = false; p->p_refcnt = 0; nni_atomic_flag_reset(&p->p_stop); - NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); @@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) return (0); } -void -nni_pipe_set_listener(nni_pipe *p, nni_listener *l) -{ - p->p_listener = l; - p->p_listener_id = nni_listener_id(l); -} - -void -nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) -{ - p->p_dialer = d; - p->p_dialer_id = nni_dialer_id(d); -} - int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { nni_tran_option *o; - nni_dialer * d; - nni_listener * l; for (o = p->p_tran_ops.p_options; o && o->o_name; o++) { if (strcmp(o->o_name, name) != 0) { @@ -376,24 +284,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } - // Maybe the endpoint knows? We look up by ID, instead of using - // the links directly, to avoid needing a hold on them. The pipe - // can wind up outliving the endpoint in certain circumstances. - // This means that getting these properties from the pipe may wind - // up being somewhat more expensive. - if ((p->p_dialer_id != 0) && - (nni_dialer_find(&d, p->p_dialer_id) == 0)) { - int rv; - rv = nni_dialer_getopt(d, name, val, szp, t); - nni_dialer_rele(d); - return (rv); + // Maybe the endpoint knows? The guarantees on pipes ensure that the + // pipe will not outlive its creating endpoint. + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); } - if ((p->p_listener_id != 0) && - (nni_listener_find(&l, p->p_listener_id) == 0)) { - int rv; - rv = nni_listener_getopt(l, name, val, szp, t); - nni_listener_rele(l); - return (rv); + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); } return (NNG_ENOTSUP); } @@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p) return (p->p_proto_data); } -void -nni_pipe_sock_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_sock_node); -} - -void -nni_pipe_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_ep_node); -} - uint32_t nni_pipe_sock_id(nni_pipe *p) { - return (p->p_sock_id); + return (nni_sock_id(p->p_sock)); } uint32_t nni_pipe_listener_id(nni_pipe *p) { - return (p->p_listener_id); + return (p->p_listener ? nni_listener_id(p->p_listener) : 0); } uint32_t nni_pipe_dialer_id(nni_pipe *p) { - return (p->p_dialer_id); -} - -static void -nni_pipe_reaper(void *notused) -{ - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_pipe_reap_lk); - for (;;) { - nni_pipe *p; - if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { - nni_list_remove(&nni_pipe_reap_list, p); - - nni_mtx_unlock(&nni_pipe_reap_lk); - nni_pipe_destroy(p); - nni_mtx_lock(&nni_pipe_reap_lk); - continue; - } - if (!nni_pipe_reap_run) { - break; - } - nni_cv_wait(&nni_pipe_reap_cv); - } - nni_mtx_unlock(&nni_pipe_reap_lk); + return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } |
