diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-02 22:36:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-03 19:00:19 -0700 |
| commit | d1a9c84a6b375cb25a8b7475957130e364b41753 (patch) | |
| tree | 5444721d96a84d92e3ed258b4d51f80adf6b200c /src/core | |
| parent | a772bcc6ebe198f939889abbda18eded2a326941 (diff) | |
| download | nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.gz nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.bz2 nng-d1a9c84a6b375cb25a8b7475957130e364b41753.zip | |
fixes #572 Several locking errors found
fixes #573 atomic flags could help
This introduces a new atomic flag, and reduces some of the global
locking. The lock refactoring work is not yet complete, but this is
a positive step forward, and should help with certain things.
While here we also fixed a compile warning due to incorrect types.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/dialer.c | 48 | ||||
| -rw-r--r-- | src/core/listener.c | 48 | ||||
| -rw-r--r-- | src/core/pipe.c | 19 | ||||
| -rw-r--r-- | src/core/platform.h | 12 | ||||
| -rw-r--r-- | src/core/socket.c | 6 |
5 files changed, 37 insertions, 96 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 09ecdac5..e93c893e 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -27,7 +27,7 @@ struct nni_dialer { bool d_synch; // synchronous connect in progress? bool d_started; bool d_closed; // full shutdown - bool d_closing; // close pending (waiting on refcnt) + nni_atomic_flag d_closing; // close pending (waiting on refcnt) nni_mtx d_mtx; nni_cv d_cv; nni_list d_pipes; @@ -96,11 +96,9 @@ dialer_destroy(nni_dialer *d) nni_aio_fini(d->d_con_aio); nni_aio_fini(d->d_tmo_aio); - nni_mtx_lock(&d->d_mtx); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); } - nni_mtx_unlock(&d->d_mtx); nni_cv_fini(&d->d_cv); nni_mtx_fini(&d->d_mtx); nni_url_free(d->d_url); @@ -130,12 +128,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) } d->d_url = url; d->d_closed = false; - d->d_closing = false; d->d_started = false; d->d_data = NULL; d->d_refcnt = 1; d->d_sock = s; d->d_tran = tran; + nni_atomic_flag_reset(&d->d_closing); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -205,7 +203,7 @@ nni_dialer_rele(nni_dialer *d) { nni_mtx_lock(&dialers_lk); d->d_refcnt--; - if (d->d_closing) { + if (d->d_refcnt == 0) { nni_cv_wake(&d->d_cv); } nni_mtx_unlock(&dialers_lk); @@ -214,13 +212,9 @@ nni_dialer_rele(nni_dialer *d) int nni_dialer_shutdown(nni_dialer *d) { - nni_mtx_lock(&d->d_mtx); - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); + if (nni_atomic_flag_test_and_set(&d->d_closing)) { return (NNG_ECLOSED); } - d->d_closing = true; - nni_mtx_unlock(&d->d_mtx); // Abort any remaining in-flight operations. nni_aio_close(d->d_con_aio); @@ -269,9 +263,6 @@ dialer_timer_start(nni_dialer *d) { nni_duration backoff; - if (d->d_closing) { - return; - } backoff = d->d_currtime; d->d_currtime *= 2; if (d->d_currtime > d->d_maxrtime) { @@ -319,11 +310,6 @@ dialer_connect_cb(void *arg) synch = d->d_synch; d->d_synch = false; if (rv == 0) { - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); - nni_pipe_stop(p); - return; - } nni_pipe_set_dialer(p, d); nni_list_append(&d->d_pipes, p); @@ -372,10 +358,6 @@ dialer_connect_start(nni_dialer *d) nni_aio *aio = d->d_con_aio; // Call with the Endpoint lock held. - if (d->d_closing) { - return; - } - d->d_ops.d_connect(d->d_data, aio); } @@ -390,11 +372,6 @@ nni_dialer_start(nni_dialer *d, int flags) nni_mtx_lock(&d->d_mtx); - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); - return (NNG_ECLOSED); - } - if (d->d_started) { nni_mtx_unlock(&d->d_mtx); return (NNG_ESTATE); @@ -411,10 +388,10 @@ nni_dialer_start(nni_dialer *d, int flags) d->d_started = true; dialer_connect_start(d); - while (d->d_synch && !d->d_closing) { + while (d->d_synch) { nni_cv_wait(&d->d_cv); } - rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; + rv = d->d_lastrv; nni_cv_wake(&d->d_cv); if (rv != 0) { @@ -478,8 +455,6 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, } for (o = d->d_ops.d_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { continue; } @@ -487,10 +462,7 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, return (NNG_EREADONLY); } - nni_mtx_lock(&d->d_mtx); - rv = o->o_set(d->d_data, val, sz, t); - nni_mtx_unlock(&d->d_mtx); - return (rv); + return (o->o_set(d->d_data, val, sz, t)); } return (NNG_ENOTSUP); @@ -518,17 +490,13 @@ nni_dialer_getopt( } for (o = d->d_ops.d_options; o && o->o_name; o++) { - int rv; if (strcmp(o->o_name, name) != 0) { continue; } if (o->o_get == NULL) { return (NNG_EWRITEONLY); } - nni_mtx_lock(&d->d_mtx); - rv = o->o_get(d->d_data, valp, szp, t); - nni_mtx_unlock(&d->d_mtx); - return (rv); + return (o->o_get(d->d_data, valp, szp, t)); } // We provide a fallback on the URL, but let the implementation diff --git a/src/core/listener.c b/src/core/listener.c index 31c154bc..8e06076d 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -25,7 +25,7 @@ struct nni_listener { int l_refcnt; bool l_started; bool l_closed; // full shutdown - bool l_closing; // close pending (waiting on refcnt) + nni_atomic_flag l_closing; // close pending nni_mtx l_mtx; nni_cv l_cv; nni_list l_pipes; @@ -89,11 +89,9 @@ listener_destroy(nni_listener *l) nni_aio_fini(l->l_acc_aio); - nni_mtx_lock(&l->l_mtx); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); } - nni_mtx_unlock(&l->l_mtx); nni_cv_fini(&l->l_cv); nni_mtx_fini(&l->l_mtx); nni_url_free(l->l_url); @@ -123,12 +121,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) } l->l_url = url; l->l_closed = false; - l->l_closing = false; l->l_started = false; l->l_data = NULL; l->l_refcnt = 1; l->l_sock = s; l->l_tran = tran; + nni_atomic_flag_reset(&l->l_closing); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -198,7 +196,7 @@ nni_listener_rele(nni_listener *l) { nni_mtx_lock(&listeners_lk); l->l_refcnt--; - if (l->l_closing) { + if (l->l_refcnt == 0) { nni_cv_wake(&l->l_cv); } nni_mtx_unlock(&listeners_lk); @@ -207,13 +205,9 @@ nni_listener_rele(nni_listener *l) int nni_listener_shutdown(nni_listener *l) { - nni_mtx_lock(&l->l_mtx); - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); + if (nni_atomic_flag_test_and_set(&l->l_closing)) { return (NNG_ECLOSED); } - l->l_closing = true; - nni_mtx_unlock(&l->l_mtx); // Abort any remaining in-flight accepts. nni_aio_close(l->l_acc_aio); @@ -262,11 +256,9 @@ listener_timer_cb(void *arg) nni_listener *l = arg; nni_aio * aio = l->l_tmo_aio; - nni_mtx_lock(&l->l_mtx); if (nni_aio_result(aio) == 0) { listener_accept_start(l); } - nni_mtx_unlock(&l->l_mtx); } static void @@ -282,16 +274,12 @@ listener_accept_cb(void *arg) NNI_ASSERT(data != NULL); rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); } - nni_mtx_lock(&l->l_mtx); switch (rv) { case 0: - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); - nni_pipe_stop(p); - return; - } + nni_mtx_lock(&l->l_mtx); nni_pipe_set_listener(p, l); nni_list_append(&l->l_pipes, p); + nni_mtx_unlock(&l->l_mtx); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown @@ -310,7 +298,6 @@ listener_accept_cb(void *arg) nni_sleep_aio(100, l->l_tmo_aio); break; } - nni_mtx_unlock(&l->l_mtx); if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { nni_pipe_stop(p); @@ -323,9 +310,6 @@ listener_accept_start(nni_listener *l) nni_aio *aio = l->l_acc_aio; // Call with the listener lock held. - if (l->l_closing) { - return; - } l->l_ops.l_accept(l->l_data, aio); } @@ -336,10 +320,6 @@ nni_listener_start(nni_listener *l, int flags) NNI_ARG_UNUSED(flags); nni_mtx_lock(&l->l_mtx); - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); - return (NNG_ECLOSED); - } if (l->l_started) { nni_mtx_unlock(&l->l_mtx); return (NNG_ESTATE); @@ -351,9 +331,10 @@ nni_listener_start(nni_listener *l, int flags) } l->l_started = true; - listener_accept_start(l); nni_mtx_unlock(&l->l_mtx); + listener_accept_start(l); + return (0); } @@ -387,8 +368,6 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val, } for (o = l->l_ops.l_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { continue; } @@ -396,10 +375,7 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val, return (NNG_EREADONLY); } - nni_mtx_lock(&l->l_mtx); - rv = o->o_set(l->l_data, val, sz, t); - nni_mtx_unlock(&l->l_mtx); - return (rv); + return (o->o_set(l->l_data, val, sz, t)); } return (NNG_ENOTSUP); @@ -412,17 +388,13 @@ nni_listener_getopt( nni_tran_option *o; for (o = l->l_ops.l_options; o && o->o_name; o++) { - int rv; if (strcmp(o->o_name, name) != 0) { continue; } if (o->o_get == NULL) { return (NNG_EWRITEONLY); } - nni_mtx_lock(&l->l_mtx); - rv = o->o_get(l->l_data, valp, szp, t); - nni_mtx_unlock(&l->l_mtx); - return (rv); + return (o->o_get(l->l_data, valp, szp, t)); } // We provide a fallback on the URL, but let the implementation diff --git a/src/core/pipe.c b/src/core/pipe.c index a42cdeff..02f8ea50 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,7 +29,7 @@ struct nni_pipe { nni_listener * p_listener; nni_dialer * p_dialer; bool p_closed; - bool p_stop; + nni_atomic_flag p_stop; bool p_cbs; int p_refcnt; nni_mtx p_mtx; @@ -102,16 +102,11 @@ nni_pipe_sys_fini(void) void nni_pipe_destroy(nni_pipe *p) { - bool cbs; if (p == NULL) { return; } - nni_mtx_lock(&p->p_mtx); - cbs = p->p_cbs; - nni_mtx_unlock(&p->p_mtx); - - if (cbs) { + if (p->p_cbs) { nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); } @@ -247,13 +242,9 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. - nni_mtx_lock(&p->p_mtx); - if (p->p_stop) { - nni_mtx_unlock(&p->p_mtx); + if (nni_atomic_flag_test_and_set(&p->p_stop)) { return; } - p->p_stop = true; - nni_mtx_unlock(&p->p_mtx); nni_pipe_close(p); @@ -283,9 +274,7 @@ nni_pipe_start_cb(void *arg) return; } - nni_mtx_lock(&p->p_mtx); p->p_cbs = true; // We're running all cbs going forward - nni_mtx_unlock(&p->p_mtx); nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); if (nni_pipe_closed(p)) { @@ -324,10 +313,10 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_data = NULL; p->p_sock = sock; p->p_closed = false; - p->p_stop = false; p->p_cbs = false; p->p_refcnt = 0; + nni_atomic_flag_reset(&p->p_stop); NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); diff --git a/src/core/platform.h b/src/core/platform.h index b709e3ba..607c3827 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -147,6 +147,18 @@ extern void nni_plat_thr_fini(nni_plat_thr *); extern bool nni_plat_thr_is_self(nni_plat_thr *); // +// Atomics support. This will evolve over time. +// + +// nni_atomic_flag supports only test-and-set and reset operations. +// This can be implemented without locks on any reasonable system, and +// it corresponds to C11 atomic flag. +typedef struct nni_atomic_flag nni_atomic_flag; + +extern bool nni_atomic_flag_test_and_set(nni_atomic_flag *); +extern void nni_atomic_flag_reset(nni_atomic_flag *); + +// // Clock Support // diff --git a/src/core/socket.c b/src/core/socket.c index 4d7bbbc1..620c5d19 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -33,8 +33,8 @@ struct nni_ctx { }; typedef struct sock_option { - const char *o_name; - int o_type; + const char * o_name; + nni_opt_type o_type; int (*o_get)(nni_sock *, void *, size_t *, nni_opt_type); int (*o_set)(nni_sock *, const void *, size_t, nni_opt_type); } sock_option; @@ -42,7 +42,7 @@ typedef struct sock_option { typedef struct nni_sockopt { nni_list_node node; char * name; - int typ; + nni_opt_type typ; size_t sz; void * data; } nni_sockopt; |
