diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-02 22:36:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-03 19:00:19 -0700 |
| commit | d1a9c84a6b375cb25a8b7475957130e364b41753 (patch) | |
| tree | 5444721d96a84d92e3ed258b4d51f80adf6b200c /src | |
| parent | a772bcc6ebe198f939889abbda18eded2a326941 (diff) | |
| download | nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.gz nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.bz2 nng-d1a9c84a6b375cb25a8b7475957130e364b41753.zip | |
fixes #572 Several locking errors found
fixes #573 atomic flags could help
This introduces a new atomic flag, and reduces some of the global
locking. The lock refactoring work is not yet complete, but this is
a positive step forward, and should help with certain things.
While here we also fixed a compile warning due to incorrect types.
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/core/dialer.c | 48 | ||||
| -rw-r--r-- | src/core/listener.c | 48 | ||||
| -rw-r--r-- | src/core/pipe.c | 19 | ||||
| -rw-r--r-- | src/core/platform.h | 12 | ||||
| -rw-r--r-- | src/core/socket.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_atomic.c | 57 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 13 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 1 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 4 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 12 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 30 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 5 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 14 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 18 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 28 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 16 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 71 |
19 files changed, 271 insertions, 137 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0ad53392..e9561980 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -92,6 +92,7 @@ if (NNG_PLATFORM_POSIX) platform/posix/posix_pollq.h platform/posix/posix_alloc.c + platform/posix/posix_atomic.c platform/posix/posix_clock.c platform/posix/posix_debug.c platform/posix/posix_epdesc.c diff --git a/src/core/dialer.c b/src/core/dialer.c index 09ecdac5..e93c893e 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -27,7 +27,7 @@ struct nni_dialer { bool d_synch; // synchronous connect in progress? bool d_started; bool d_closed; // full shutdown - bool d_closing; // close pending (waiting on refcnt) + nni_atomic_flag d_closing; // close pending (waiting on refcnt) nni_mtx d_mtx; nni_cv d_cv; nni_list d_pipes; @@ -96,11 +96,9 @@ dialer_destroy(nni_dialer *d) nni_aio_fini(d->d_con_aio); nni_aio_fini(d->d_tmo_aio); - nni_mtx_lock(&d->d_mtx); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); } - nni_mtx_unlock(&d->d_mtx); nni_cv_fini(&d->d_cv); nni_mtx_fini(&d->d_mtx); nni_url_free(d->d_url); @@ -130,12 +128,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) } d->d_url = url; d->d_closed = false; - d->d_closing = false; d->d_started = false; d->d_data = NULL; d->d_refcnt = 1; d->d_sock = s; d->d_tran = tran; + nni_atomic_flag_reset(&d->d_closing); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -205,7 +203,7 @@ nni_dialer_rele(nni_dialer *d) { nni_mtx_lock(&dialers_lk); d->d_refcnt--; - if (d->d_closing) { + if (d->d_refcnt == 0) { nni_cv_wake(&d->d_cv); } nni_mtx_unlock(&dialers_lk); @@ -214,13 +212,9 @@ nni_dialer_rele(nni_dialer *d) int nni_dialer_shutdown(nni_dialer *d) { - nni_mtx_lock(&d->d_mtx); - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); + if (nni_atomic_flag_test_and_set(&d->d_closing)) { return (NNG_ECLOSED); } - d->d_closing = true; - nni_mtx_unlock(&d->d_mtx); // Abort any remaining in-flight operations. nni_aio_close(d->d_con_aio); @@ -269,9 +263,6 @@ dialer_timer_start(nni_dialer *d) { nni_duration backoff; - if (d->d_closing) { - return; - } backoff = d->d_currtime; d->d_currtime *= 2; if (d->d_currtime > d->d_maxrtime) { @@ -319,11 +310,6 @@ dialer_connect_cb(void *arg) synch = d->d_synch; d->d_synch = false; if (rv == 0) { - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); - nni_pipe_stop(p); - return; - } nni_pipe_set_dialer(p, d); nni_list_append(&d->d_pipes, p); @@ -372,10 +358,6 @@ dialer_connect_start(nni_dialer *d) nni_aio *aio = d->d_con_aio; // Call with the Endpoint lock held. - if (d->d_closing) { - return; - } - d->d_ops.d_connect(d->d_data, aio); } @@ -390,11 +372,6 @@ nni_dialer_start(nni_dialer *d, int flags) nni_mtx_lock(&d->d_mtx); - if (d->d_closing) { - nni_mtx_unlock(&d->d_mtx); - return (NNG_ECLOSED); - } - if (d->d_started) { nni_mtx_unlock(&d->d_mtx); return (NNG_ESTATE); @@ -411,10 +388,10 @@ nni_dialer_start(nni_dialer *d, int flags) d->d_started = true; dialer_connect_start(d); - while (d->d_synch && !d->d_closing) { + while (d->d_synch) { nni_cv_wait(&d->d_cv); } - rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; + rv = d->d_lastrv; nni_cv_wake(&d->d_cv); if (rv != 0) { @@ -478,8 +455,6 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, } for (o = d->d_ops.d_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { continue; } @@ -487,10 +462,7 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, return (NNG_EREADONLY); } - nni_mtx_lock(&d->d_mtx); - rv = o->o_set(d->d_data, val, sz, t); - nni_mtx_unlock(&d->d_mtx); - return (rv); + return (o->o_set(d->d_data, val, sz, t)); } return (NNG_ENOTSUP); @@ -518,17 +490,13 @@ nni_dialer_getopt( } for (o = d->d_ops.d_options; o && o->o_name; o++) { - int rv; if (strcmp(o->o_name, name) != 0) { continue; } if (o->o_get == NULL) { return (NNG_EWRITEONLY); } - nni_mtx_lock(&d->d_mtx); - rv = o->o_get(d->d_data, valp, szp, t); - nni_mtx_unlock(&d->d_mtx); - return (rv); + return (o->o_get(d->d_data, valp, szp, t)); } // We provide a fallback on the URL, but let the implementation diff --git a/src/core/listener.c b/src/core/listener.c index 31c154bc..8e06076d 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -25,7 +25,7 @@ struct nni_listener { int l_refcnt; bool l_started; bool l_closed; // full shutdown - bool l_closing; // close pending (waiting on refcnt) + nni_atomic_flag l_closing; // close pending nni_mtx l_mtx; nni_cv l_cv; nni_list l_pipes; @@ -89,11 +89,9 @@ listener_destroy(nni_listener *l) nni_aio_fini(l->l_acc_aio); - nni_mtx_lock(&l->l_mtx); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); } - nni_mtx_unlock(&l->l_mtx); nni_cv_fini(&l->l_cv); nni_mtx_fini(&l->l_mtx); nni_url_free(l->l_url); @@ -123,12 +121,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) } l->l_url = url; l->l_closed = false; - l->l_closing = false; l->l_started = false; l->l_data = NULL; l->l_refcnt = 1; l->l_sock = s; l->l_tran = tran; + nni_atomic_flag_reset(&l->l_closing); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -198,7 +196,7 @@ nni_listener_rele(nni_listener *l) { nni_mtx_lock(&listeners_lk); l->l_refcnt--; - if (l->l_closing) { + if (l->l_refcnt == 0) { nni_cv_wake(&l->l_cv); } nni_mtx_unlock(&listeners_lk); @@ -207,13 +205,9 @@ nni_listener_rele(nni_listener *l) int nni_listener_shutdown(nni_listener *l) { - nni_mtx_lock(&l->l_mtx); - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); + if (nni_atomic_flag_test_and_set(&l->l_closing)) { return (NNG_ECLOSED); } - l->l_closing = true; - nni_mtx_unlock(&l->l_mtx); // Abort any remaining in-flight accepts. nni_aio_close(l->l_acc_aio); @@ -262,11 +256,9 @@ listener_timer_cb(void *arg) nni_listener *l = arg; nni_aio * aio = l->l_tmo_aio; - nni_mtx_lock(&l->l_mtx); if (nni_aio_result(aio) == 0) { listener_accept_start(l); } - nni_mtx_unlock(&l->l_mtx); } static void @@ -282,16 +274,12 @@ listener_accept_cb(void *arg) NNI_ASSERT(data != NULL); rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); } - nni_mtx_lock(&l->l_mtx); switch (rv) { case 0: - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); - nni_pipe_stop(p); - return; - } + nni_mtx_lock(&l->l_mtx); nni_pipe_set_listener(p, l); nni_list_append(&l->l_pipes, p); + nni_mtx_unlock(&l->l_mtx); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown @@ -310,7 +298,6 @@ listener_accept_cb(void *arg) nni_sleep_aio(100, l->l_tmo_aio); break; } - nni_mtx_unlock(&l->l_mtx); if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { nni_pipe_stop(p); @@ -323,9 +310,6 @@ listener_accept_start(nni_listener *l) nni_aio *aio = l->l_acc_aio; // Call with the listener lock held. - if (l->l_closing) { - return; - } l->l_ops.l_accept(l->l_data, aio); } @@ -336,10 +320,6 @@ nni_listener_start(nni_listener *l, int flags) NNI_ARG_UNUSED(flags); nni_mtx_lock(&l->l_mtx); - if (l->l_closing) { - nni_mtx_unlock(&l->l_mtx); - return (NNG_ECLOSED); - } if (l->l_started) { nni_mtx_unlock(&l->l_mtx); return (NNG_ESTATE); @@ -351,9 +331,10 @@ nni_listener_start(nni_listener *l, int flags) } l->l_started = true; - listener_accept_start(l); nni_mtx_unlock(&l->l_mtx); + listener_accept_start(l); + return (0); } @@ -387,8 +368,6 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val, } for (o = l->l_ops.l_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { continue; } @@ -396,10 +375,7 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val, return (NNG_EREADONLY); } - nni_mtx_lock(&l->l_mtx); - rv = o->o_set(l->l_data, val, sz, t); - nni_mtx_unlock(&l->l_mtx); - return (rv); + return (o->o_set(l->l_data, val, sz, t)); } return (NNG_ENOTSUP); @@ -412,17 +388,13 @@ nni_listener_getopt( nni_tran_option *o; for (o = l->l_ops.l_options; o && o->o_name; o++) { - int rv; if (strcmp(o->o_name, name) != 0) { continue; } if (o->o_get == NULL) { return (NNG_EWRITEONLY); } - nni_mtx_lock(&l->l_mtx); - rv = o->o_get(l->l_data, valp, szp, t); - nni_mtx_unlock(&l->l_mtx); - return (rv); + return (o->o_get(l->l_data, valp, szp, t)); } // We provide a fallback on the URL, but let the implementation diff --git a/src/core/pipe.c b/src/core/pipe.c index a42cdeff..02f8ea50 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,7 +29,7 @@ struct nni_pipe { nni_listener * p_listener; nni_dialer * p_dialer; bool p_closed; - bool p_stop; + nni_atomic_flag p_stop; bool p_cbs; int p_refcnt; nni_mtx p_mtx; @@ -102,16 +102,11 @@ nni_pipe_sys_fini(void) void nni_pipe_destroy(nni_pipe *p) { - bool cbs; if (p == NULL) { return; } - nni_mtx_lock(&p->p_mtx); - cbs = p->p_cbs; - nni_mtx_unlock(&p->p_mtx); - - if (cbs) { + if (p->p_cbs) { nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); } @@ -247,13 +242,9 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. - nni_mtx_lock(&p->p_mtx); - if (p->p_stop) { - nni_mtx_unlock(&p->p_mtx); + if (nni_atomic_flag_test_and_set(&p->p_stop)) { return; } - p->p_stop = true; - nni_mtx_unlock(&p->p_mtx); nni_pipe_close(p); @@ -283,9 +274,7 @@ nni_pipe_start_cb(void *arg) return; } - nni_mtx_lock(&p->p_mtx); p->p_cbs = true; // We're running all cbs going forward - nni_mtx_unlock(&p->p_mtx); nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); if (nni_pipe_closed(p)) { @@ -324,10 +313,10 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_data = NULL; p->p_sock = sock; p->p_closed = false; - p->p_stop = false; p->p_cbs = false; p->p_refcnt = 0; + nni_atomic_flag_reset(&p->p_stop); NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); diff --git a/src/core/platform.h b/src/core/platform.h index b709e3ba..607c3827 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -147,6 +147,18 @@ extern void nni_plat_thr_fini(nni_plat_thr *); extern bool nni_plat_thr_is_self(nni_plat_thr *); // +// Atomics support. This will evolve over time. +// + +// nni_atomic_flag supports only test-and-set and reset operations. +// This can be implemented without locks on any reasonable system, and +// it corresponds to C11 atomic flag. +typedef struct nni_atomic_flag nni_atomic_flag; + +extern bool nni_atomic_flag_test_and_set(nni_atomic_flag *); +extern void nni_atomic_flag_reset(nni_atomic_flag *); + +// // Clock Support // diff --git a/src/core/socket.c b/src/core/socket.c index 4d7bbbc1..620c5d19 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -33,8 +33,8 @@ struct nni_ctx { }; typedef struct sock_option { - const char *o_name; - int o_type; + const char * o_name; + nni_opt_type o_type; int (*o_get)(nni_sock *, void *, size_t *, nni_opt_type); int (*o_set)(nni_sock *, const void *, size_t, nni_opt_type); } sock_option; @@ -42,7 +42,7 @@ typedef struct sock_option { typedef struct nni_sockopt { nni_list_node node; char * name; - int typ; + nni_opt_type typ; size_t sz; void * data; } nni_sockopt; diff --git a/src/platform/posix/posix_atomic.c b/src/platform/posix/posix_atomic.c new file mode 100644 index 00000000..a8d2579d --- /dev/null +++ b/src/platform/posix/posix_atomic.c @@ -0,0 +1,57 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +// POSIX atomics. + +#include "core/nng_impl.h" + +#ifdef NNG_PLATFORM_POSIX + +#ifdef NNG_HAVE_STDATOMIC + +#include <stdatomic.h> +bool +nni_atomic_flag_test_and_set(nni_atomic_flag *f) +{ + return (atomic_flag_test_and_set(&f->f)); +} + +void +nni_atomic_flag_reset(nni_atomic_flag *f) +{ + atomic_flag_clear(&f->f); +} +#else + +#include <pthread.h> + +static pthread_mutex_t plat_atomic_lock = PTHREAD_MUTEX_INITIALIZER; + +bool +nni_atomic_flag_test_and_set(nni_atomic_flag *f) +{ + bool v; + pthread_mutex_lock(&plat_atomic_lock); + v = f->f; + f->f = true; + pthread_mutex_unlock(&plat_atomic_lock); + return (v); +} + +void +nni_atomic_flag_reset(nni_atomic_flag *f) +{ + pthread_mutex_lock(&plat_atomic_lock); + f->f = false; + pthread_mutex_unlock(&plat_atomic_lock); +} +#endif + +#endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 33a9c293..70a3615f 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -75,6 +75,19 @@ struct nni_plat_flock { #define NNG_PLATFORM_DIR_SEP "/" +#ifdef NNG_HAVE_STDATOMIC + +#include <stdatomic.h> + +struct nni_atomic_flag { + atomic_flag f; +}; +#else // NNG_HAVE_C11_ATOMIC +struct nni_atomic_flag { + bool f; +}; +#endif + #endif extern int nni_posix_pollq_sysinit(void); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index df2ee9d2..cc2ade6f 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -348,4 +348,5 @@ nni_plat_ncpu(void) return (1); #endif } + #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 0bd12b24..73dc3660 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -48,6 +48,10 @@ struct nni_plat_cv { PSRWLOCK srl; }; +struct nni_atomic_flag { + unsigned f; +}; + // nni_win_event is used with io completion ports. This allows us to get // to a specific completion callback without requiring the poller (in the // completion port) to know anything about the event itself. We also use diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index a3d932aa..243811a0 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -103,6 +103,18 @@ nni_plat_cv_fini(nni_plat_cv *cv) NNI_ARG_UNUSED(cv); } +bool +nni_atomic_flag_test_and_set(nni_atomic_flag *f) +{ + return (InterlockedExchange(&f->f, 1) != 0); +} + +void +nni_atomic_flag_reset(nni_atomic_flag *f) +{ + InterlockedExchange(&f->f, 0); +} + static unsigned int __stdcall nni_plat_thr_main(void *arg) { nni_plat_thr *thr = arg; diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 1f4f0b33..f725cadb 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -413,15 +413,14 @@ rep0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - nni_mtx_lock(&s->lk); - p->busy = false; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->pipe); - nni_mtx_unlock(&s->lk); return; } + nni_mtx_lock(&s->lk); + p->busy = false; if ((ctx = nni_list_first(&p->sendq)) == NULL) { // Nothing else to send. if (p->id == s->ctx->pipe_id) { diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 8a0dd4d8..43751d14 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -82,7 +82,8 @@ struct req0_pipe { nni_pipe * pipe; req0_sock * req; nni_list_node node; - nni_list ctxs; // ctxs with pending traffic + nni_list ctxs; // ctxs with pending traffic + bool sending; // if busy sending nni_aio * aio_send; nni_aio * aio_recv; }; @@ -264,6 +265,7 @@ req0_pipe_close(void *arg) nni_mtx_lock(&s->mtx); // This removes the node from either busypipes or readypipes. // It doesn't much matter which. + p->sending = false; if (nni_list_node_active(&p->node)) { nni_list_node_remove(&p->node); if (s->closed) { @@ -311,20 +313,19 @@ req0_send_cb(void *arg) // in the ready list, and re-run the sendq. nni_mtx_lock(&s->mtx); - if (nni_list_active(&s->busypipes, p)) { - nni_list_remove(&s->busypipes, p); - nni_list_append(&s->readypipes, p); - if (nni_list_empty(&s->sendq)) { - nni_pollable_raise(s->sendable); - } - req0_run_sendq(s, &aios); - } else { - // We wind up here if stop was called from the reader - // side while we were waiting to be scheduled to run for the - // writer side. In this case we can't complete the operation, - // and we have to abort. - nni_pipe_stop(p->pipe); + if (!p->sending) { + // This occurs if the req0_pipe_close has been called. + // In that case we don't want any more processing. + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_remove(&s->busypipes, p); + nni_list_append(&s->readypipes, p); + p->sending = false; + if (nni_list_empty(&s->sendq)) { + nni_pollable_raise(s->sendable); } + req0_run_sendq(s, &aios); nni_mtx_unlock(&s->mtx); while ((aio = nni_list_first(&aios)) != NULL) { @@ -533,6 +534,7 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist) nni_list_remove(&s->readypipes, p); nni_list_append(&s->busypipes, p); + p->sending = true; if ((aio = ctx->saio) != NULL) { ctx->saio = NULL; diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index db18a4e8..fbdeb65a 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -402,15 +402,14 @@ resp0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - nni_mtx_lock(&s->mtx); - p->busy = false; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); - nni_mtx_unlock(&s->mtx); return; } + nni_mtx_lock(&s->mtx); + p->busy = false; if ((ctx = nni_list_first(&p->sendq)) == NULL) { // Nothing else to send. if (p->id == s->ctx->pipe_id) { diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index b48b82d9..7d99e507 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -827,14 +827,22 @@ static int ipc_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t) { ipc_ep *ep = arg; - return (nni_copyout_size(ep->rcvmax, data, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, data, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int ipc_ep_get_addr(void *arg, void *data, size_t *szp, nni_opt_type t) { ipc_ep *ep = arg; - return (nni_copyout_sockaddr(&ep->sa, data, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_sockaddr(&ep->sa, data, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -868,7 +876,9 @@ ipc_ep_set_sec_desc(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_ptr(&ptr, data, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nni_plat_ipc_ep_set_security_descriptor(ep->iep, ptr); + nni_mtx_unlock(&ep->mtx); } return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1a183ecd..e8aa04d0 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -882,7 +882,11 @@ static int tcp_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { tcp_ep *ep = arg; - return (nni_copyout_bool(ep->nodelay, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_bool(ep->nodelay, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -903,7 +907,11 @@ static int tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { tcp_ep *ep = arg; - return (nni_copyout_bool(ep->keepalive, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_bool(ep->keepalive, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -931,7 +939,11 @@ static int tcp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tcp_ep *ep = arg; - return (nni_copyout_size(ep->rcvmax, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static nni_tran_option tcp_pipe_options[] = { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index b4f555da..5e1a1e8d 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -877,7 +877,11 @@ static int tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t) { tls_ep *ep = arg; - return (nni_copyout_bool(ep->nodelay, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_bool(ep->nodelay, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -898,7 +902,11 @@ static int tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) { tls_ep *ep = arg; - return (nni_copyout_bool(ep->keepalive, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_bool(ep->keepalive, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -947,7 +955,11 @@ static int tls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { tls_ep *ep = arg; - return (nni_copyout_size(ep->rcvmax, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -974,9 +986,11 @@ tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t) if (cfg == NULL) { return (NNG_EINVAL); } + nni_mtx_lock(&ep->mtx); old = ep->cfg; nni_tls_config_hold(cfg); ep->cfg = cfg; + nni_mtx_unlock(&ep->mtx); if (old != NULL) { nni_tls_config_fini(old); } @@ -987,7 +1001,11 @@ static int tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t) { tls_ep *ep = arg; - return (nni_copyout_ptr(ep->cfg, v, szp, t)); + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_ptr(ep->cfg, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); } static int @@ -1055,7 +1073,9 @@ tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) int rv; if ((rv = tls_ep_chk_string(v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); rv = nng_tls_config_cert_key_file(ep->cfg, v, NULL); + nni_mtx_unlock(&ep->mtx); } return (rv); } diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 2fa0fd67..b3aef756 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -416,7 +416,11 @@ static int ws_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { ws_dialer *d = arg; - return (nni_copyout_size(d->rcvmax, v, szp, t)); + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_size(d->rcvmax, v, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); } static int @@ -439,7 +443,11 @@ static int ws_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { ws_listener *l = arg; - return (nni_copyout_size(l->rcvmax, v, szp, t)); + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_size(l->rcvmax, v, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); } static int @@ -538,7 +546,9 @@ ws_dialer_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t) } if ((rv = ws_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); rv = ws_set_headers(&d->headers, v); + nni_mtx_unlock(&d->mtx); } return (rv); } @@ -553,7 +563,9 @@ ws_listener_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t) return (NNG_EBUSY); } if ((rv = ws_check_string(v, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); rv = ws_set_headers(&l->headers, v); + nni_mtx_unlock(&l->mtx); } return (rv); } diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 3535a248..a2163e3f 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -2600,7 +2600,9 @@ zt_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&zt_lk); ep->ze_rcvmax = val; + nni_mtx_unlock(&zt_lk); } return (rv); } @@ -2609,7 +2611,11 @@ static int zt_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_size(ep->ze_rcvmax, data, szp, t)); + int rv; + nni_mtx_lock(&zt_lk); + rv = nni_copyout_size(ep->ze_rcvmax, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2634,16 +2640,16 @@ zt_ep_set_home(void *arg, const void *data, size_t sz, nni_opt_type t) zt_ep *ep = arg; if ((rv = zt_ep_chk_string(data, sz, t)) == 0) { + nni_mtx_lock(&zt_lk); if (ep->ze_running) { rv = NNG_ESTATE; } else { - nni_mtx_lock(&zt_lk); nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home)); if ((rv = zt_node_find(ep)) != 0) { ep->ze_ztn = NULL; } - nni_mtx_unlock(&zt_lk); } + nni_mtx_unlock(&zt_lk); } return (rv); @@ -2653,7 +2659,12 @@ static int zt_ep_get_home(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_str(ep->ze_home, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_str(ep->ze_home, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2663,11 +2674,13 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t) zt_ep * ep = arg; uint64_t addr; + nni_mtx_lock(&zt_lk); addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr; snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u", (unsigned long long) addr >> zt_port_shift, (unsigned long long) ep->ze_nwid, (unsigned) (addr & zt_port_mask)); + nni_mtx_unlock(&zt_lk); return (nni_copyout_str(ustr, data, szp, t)); } @@ -2750,14 +2763,24 @@ static int zt_ep_get_node(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_u64(ep->ze_ztn->zn_self, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_u64(ep->ze_ztn->zn_self, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int zt_ep_get_nwid(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_u64(ep->ze_nwid, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_u64(ep->ze_nwid, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2788,7 +2811,9 @@ zt_ep_set_ping_time(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) { + nni_mtx_lock(&zt_lk); ep->ze_ping_time = val; + nni_mtx_unlock(&zt_lk); } return (rv); } @@ -2797,7 +2822,12 @@ static int zt_ep_get_ping_time(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_ms(ep->ze_ping_time, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_ms(ep->ze_ping_time, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2814,7 +2844,9 @@ zt_ep_set_ping_tries(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) { + nni_mtx_lock(&zt_lk); ep->ze_ping_tries = val; + nni_mtx_unlock(&zt_lk); } return (rv); } @@ -2823,7 +2855,12 @@ static int zt_ep_get_ping_tries(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_int(ep->ze_ping_tries, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_int(ep->ze_ping_tries, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2834,7 +2871,9 @@ zt_ep_set_conn_time(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) { + nni_mtx_lock(&zt_lk); ep->ze_conn_time = val; + nni_mtx_unlock(&zt_lk); } return (rv); } @@ -2843,7 +2882,12 @@ static int zt_ep_get_conn_time(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_ms(ep->ze_conn_time, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_ms(ep->ze_conn_time, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int @@ -2854,7 +2898,9 @@ zt_ep_set_conn_tries(void *arg, const void *data, size_t sz, nni_opt_type t) int rv; if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) { + nni_mtx_lock(&zt_lk); ep->ze_conn_tries = val; + nni_mtx_unlock(&zt_lk); } return (rv); } @@ -2863,7 +2909,12 @@ static int zt_ep_get_conn_tries(void *arg, void *data, size_t *szp, nni_opt_type t) { zt_ep *ep = arg; - return (nni_copyout_int(ep->ze_conn_tries, data, szp, t)); + int rv; + + nni_mtx_lock(&zt_lk); + rv = nni_copyout_int(ep->ze_conn_tries, data, szp, t); + nni_mtx_unlock(&zt_lk); + return (rv); } static int |
