From 953ca274ae57f8edd12536a3dd15d134aa6e5576 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 6 Jul 2018 14:42:53 -0700 Subject: fixes #568 Want a single reader/write lock on socket child objects fixes #170 Make more use of reaper This is a complete restructure/rethink of how child objects interact with the socket. (This also backs out #576 as it turns out not to be needed.) While 568 says reader/writer lock, for now we have settled for a single writer lock. Its likely that this is sufficient. Essentially we use the single socket lock to guard lists of the socket children. We also use deferred deletion in the idhash to facilitate teardown, which means endpoint closes are no longer synchronous. We use the reaper to clean up objects when the reference count drops to zero. We make a special exception for pipes, since they really are not reference counted by their parents, and they are leaf objects anyway. We believe this addresses the main outstanding race conditions in a much more correct and holistic way. Note that endpoint shutdown is a little tricky, as it makes use of atomic flags to guard against double entry, and against recursive lock entry. This is something that would be nice to make a bit more obvious, but what we have is safe, and the complexity is at least confined to one place. --- src/core/pipe.c | 207 ++++++++++---------------------------------------------- 1 file changed, 34 insertions(+), 173 deletions(-) (limited to 'src/core/pipe.c') diff --git a/src/core/pipe.c b/src/core/pipe.c index 6b9b082c..e95fe1d4 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include @@ -17,54 +18,17 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. -struct nni_pipe { - uint32_t p_id; - uint32_t p_sock_id; - uint32_t p_dialer_id; - uint32_t p_listener_id; - nni_tran_pipe_ops p_tran_ops; - nni_proto_pipe_ops p_proto_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_listener * p_listener; - nni_dialer * p_dialer; - bool p_closed; - nni_atomic_flag p_stop; - bool p_cbs; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; -}; - static nni_idhash *nni_pipes; static nni_mtx nni_pipe_lk; -static nni_list nni_pipe_reap_list; -static nni_mtx nni_pipe_reap_lk; -static nni_cv nni_pipe_reap_cv; -static nni_thr nni_pipe_reap_thr; -static int nni_pipe_reap_run; - -static void nni_pipe_reaper(void *); - int nni_pipe_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); nni_mtx_init(&nni_pipe_lk); - nni_mtx_init(&nni_pipe_reap_lk); - nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); - if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != - 0)) { + if ((rv = nni_idhash_init(&nni_pipes)) != 0) { return (rv); } @@ -76,25 +40,13 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_pipe_reap_run = 1; - nni_thr_run(&nni_pipe_reap_thr); - return (0); } void nni_pipe_sys_fini(void) { - if (nni_pipe_reap_run) { - nni_mtx_lock(&nni_pipe_reap_lk); - nni_pipe_reap_run = 0; - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); - } - - nni_thr_fini(&nni_pipe_reap_thr); - nni_cv_fini(&nni_pipe_reap_cv); - nni_mtx_fini(&nni_pipe_reap_lk); + nni_reap_drain(); nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); @@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p) return; } - if (p->p_cbs) { - nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); - } - - // Stop any pending negotiation. - nni_aio_stop(p->p_start_aio); - - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - - // We have exclusive access at this point, so we can check if - // we are still on any lists. - nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL - nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL - - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_id != 0) { nni_idhash_remove(nni_pipes, p->p_id); } + // This wait guarantees that all callers are done with us. while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } + + nni_pipe_remove(p); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_fini(p->p_proto_data); } @@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } + + nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p); } bool @@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p) return (rv); } -void -nni_pipe_stop(nni_pipe *p) -{ - // Guard against recursive calls. - if (nni_atomic_flag_test_and_set(&p->p_stop)) { - return; - } - - nni_pipe_close(p); - - // Put it on the reaplist for async cleanup - nni_mtx_lock(&nni_pipe_reap_lk); - nni_list_append(&nni_pipe_reap_list, p); - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); -} - uint16_t nni_pipe_peer(nni_pipe *p) { @@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; - uint32_t id = nni_pipe_id(p); if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - p->p_cbs = true; // We're running all cbs going forward - - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); if (nni_pipe_closed(p)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || nni_sock_closing(s)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); } int -nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) +nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; @@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_sock = sock; - p->p_sock_id = nni_sock_id(sock); p->p_closed = false; p->p_cbs = false; p->p_refcnt = 0; nni_atomic_flag_reset(&p->p_stop); - NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); @@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) return (0); } -void -nni_pipe_set_listener(nni_pipe *p, nni_listener *l) -{ - p->p_listener = l; - p->p_listener_id = nni_listener_id(l); -} - -void -nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) -{ - p->p_dialer = d; - p->p_dialer_id = nni_dialer_id(d); -} - int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { nni_tran_option *o; - nni_dialer * d; - nni_listener * l; for (o = p->p_tran_ops.p_options; o && o->o_name; o++) { if (strcmp(o->o_name, name) != 0) { @@ -376,24 +284,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } - // Maybe the endpoint knows? We look up by ID, instead of using - // the links directly, to avoid needing a hold on them. The pipe - // can wind up outliving the endpoint in certain circumstances. - // This means that getting these properties from the pipe may wind - // up being somewhat more expensive. - if ((p->p_dialer_id != 0) && - (nni_dialer_find(&d, p->p_dialer_id) == 0)) { - int rv; - rv = nni_dialer_getopt(d, name, val, szp, t); - nni_dialer_rele(d); - return (rv); + // Maybe the endpoint knows? The guarantees on pipes ensure that the + // pipe will not outlive its creating endpoint. + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); } - if ((p->p_listener_id != 0) && - (nni_listener_find(&l, p->p_listener_id) == 0)) { - int rv; - rv = nni_listener_getopt(l, name, val, szp, t); - nni_listener_rele(l); - return (rv); + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); } return (NNG_ENOTSUP); } @@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p) return (p->p_proto_data); } -void -nni_pipe_sock_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_sock_node); -} - -void -nni_pipe_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_ep_node); -} - uint32_t nni_pipe_sock_id(nni_pipe *p) { - return (p->p_sock_id); + return (nni_sock_id(p->p_sock)); } uint32_t nni_pipe_listener_id(nni_pipe *p) { - return (p->p_listener_id); + return (p->p_listener ? nni_listener_id(p->p_listener) : 0); } uint32_t nni_pipe_dialer_id(nni_pipe *p) { - return (p->p_dialer_id); -} - -static void -nni_pipe_reaper(void *notused) -{ - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_pipe_reap_lk); - for (;;) { - nni_pipe *p; - if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { - nni_list_remove(&nni_pipe_reap_list, p); - - nni_mtx_unlock(&nni_pipe_reap_lk); - nni_pipe_destroy(p); - nni_mtx_lock(&nni_pipe_reap_lk); - continue; - } - if (!nni_pipe_reap_run) { - break; - } - nni_cv_wait(&nni_pipe_reap_cv); - } - nni_mtx_unlock(&nni_pipe_reap_lk); + return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } -- cgit v1.2.3-70-g09d2