aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-02 22:36:08 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-03 19:00:19 -0700
commitd1a9c84a6b375cb25a8b7475957130e364b41753 (patch)
tree5444721d96a84d92e3ed258b4d51f80adf6b200c /src
parenta772bcc6ebe198f939889abbda18eded2a326941 (diff)
downloadnng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.gz
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.bz2
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.zip
fixes #572 Several locking errors found
fixes #573 atomic flags could help This introduces a new atomic flag, and reduces some of the global locking. The lock refactoring work is not yet complete, but this is a positive step forward, and should help with certain things. While here we also fixed a compile warning due to incorrect types.
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/core/dialer.c48
-rw-r--r--src/core/listener.c48
-rw-r--r--src/core/pipe.c19
-rw-r--r--src/core/platform.h12
-rw-r--r--src/core/socket.c6
-rw-r--r--src/platform/posix/posix_atomic.c57
-rw-r--r--src/platform/posix/posix_impl.h13
-rw-r--r--src/platform/posix/posix_thread.c1
-rw-r--r--src/platform/windows/win_impl.h4
-rw-r--r--src/platform/windows/win_thread.c12
-rw-r--r--src/protocol/reqrep0/rep.c5
-rw-r--r--src/protocol/reqrep0/req.c30
-rw-r--r--src/protocol/survey0/respond.c5
-rw-r--r--src/transport/ipc/ipc.c14
-rw-r--r--src/transport/tcp/tcp.c18
-rw-r--r--src/transport/tls/tls.c28
-rw-r--r--src/transport/ws/websocket.c16
-rw-r--r--src/transport/zerotier/zerotier.c71
19 files changed, 271 insertions, 137 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0ad53392..e9561980 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -92,6 +92,7 @@ if (NNG_PLATFORM_POSIX)
platform/posix/posix_pollq.h
platform/posix/posix_alloc.c
+ platform/posix/posix_atomic.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
platform/posix/posix_epdesc.c
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 09ecdac5..e93c893e 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -27,7 +27,7 @@ struct nni_dialer {
bool d_synch; // synchronous connect in progress?
bool d_started;
bool d_closed; // full shutdown
- bool d_closing; // close pending (waiting on refcnt)
+ nni_atomic_flag d_closing; // close pending (waiting on refcnt)
nni_mtx d_mtx;
nni_cv d_cv;
nni_list d_pipes;
@@ -96,11 +96,9 @@ dialer_destroy(nni_dialer *d)
nni_aio_fini(d->d_con_aio);
nni_aio_fini(d->d_tmo_aio);
- nni_mtx_lock(&d->d_mtx);
if (d->d_data != NULL) {
d->d_ops.d_fini(d->d_data);
}
- nni_mtx_unlock(&d->d_mtx);
nni_cv_fini(&d->d_cv);
nni_mtx_fini(&d->d_mtx);
nni_url_free(d->d_url);
@@ -130,12 +128,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
}
d->d_url = url;
d->d_closed = false;
- d->d_closing = false;
d->d_started = false;
d->d_data = NULL;
d->d_refcnt = 1;
d->d_sock = s;
d->d_tran = tran;
+ nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -205,7 +203,7 @@ nni_dialer_rele(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
- if (d->d_closing) {
+ if (d->d_refcnt == 0) {
nni_cv_wake(&d->d_cv);
}
nni_mtx_unlock(&dialers_lk);
@@ -214,13 +212,9 @@ nni_dialer_rele(nni_dialer *d)
int
nni_dialer_shutdown(nni_dialer *d)
{
- nni_mtx_lock(&d->d_mtx);
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
return (NNG_ECLOSED);
}
- d->d_closing = true;
- nni_mtx_unlock(&d->d_mtx);
// Abort any remaining in-flight operations.
nni_aio_close(d->d_con_aio);
@@ -269,9 +263,6 @@ dialer_timer_start(nni_dialer *d)
{
nni_duration backoff;
- if (d->d_closing) {
- return;
- }
backoff = d->d_currtime;
d->d_currtime *= 2;
if (d->d_currtime > d->d_maxrtime) {
@@ -319,11 +310,6 @@ dialer_connect_cb(void *arg)
synch = d->d_synch;
d->d_synch = false;
if (rv == 0) {
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
- nni_pipe_stop(p);
- return;
- }
nni_pipe_set_dialer(p, d);
nni_list_append(&d->d_pipes, p);
@@ -372,10 +358,6 @@ dialer_connect_start(nni_dialer *d)
nni_aio *aio = d->d_con_aio;
// Call with the Endpoint lock held.
- if (d->d_closing) {
- return;
- }
-
d->d_ops.d_connect(d->d_data, aio);
}
@@ -390,11 +372,6 @@ nni_dialer_start(nni_dialer *d, int flags)
nni_mtx_lock(&d->d_mtx);
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
- return (NNG_ECLOSED);
- }
-
if (d->d_started) {
nni_mtx_unlock(&d->d_mtx);
return (NNG_ESTATE);
@@ -411,10 +388,10 @@ nni_dialer_start(nni_dialer *d, int flags)
d->d_started = true;
dialer_connect_start(d);
- while (d->d_synch && !d->d_closing) {
+ while (d->d_synch) {
nni_cv_wait(&d->d_cv);
}
- rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ rv = d->d_lastrv;
nni_cv_wake(&d->d_cv);
if (rv != 0) {
@@ -478,8 +455,6 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
}
for (o = d->d_ops.d_options; o && o->o_name; o++) {
- int rv;
-
if (strcmp(o->o_name, name) != 0) {
continue;
}
@@ -487,10 +462,7 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
return (NNG_EREADONLY);
}
- nni_mtx_lock(&d->d_mtx);
- rv = o->o_set(d->d_data, val, sz, t);
- nni_mtx_unlock(&d->d_mtx);
- return (rv);
+ return (o->o_set(d->d_data, val, sz, t));
}
return (NNG_ENOTSUP);
@@ -518,17 +490,13 @@ nni_dialer_getopt(
}
for (o = d->d_ops.d_options; o && o->o_name; o++) {
- int rv;
if (strcmp(o->o_name, name) != 0) {
continue;
}
if (o->o_get == NULL) {
return (NNG_EWRITEONLY);
}
- nni_mtx_lock(&d->d_mtx);
- rv = o->o_get(d->d_data, valp, szp, t);
- nni_mtx_unlock(&d->d_mtx);
- return (rv);
+ return (o->o_get(d->d_data, valp, szp, t));
}
// We provide a fallback on the URL, but let the implementation
diff --git a/src/core/listener.c b/src/core/listener.c
index 31c154bc..8e06076d 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -25,7 +25,7 @@ struct nni_listener {
int l_refcnt;
bool l_started;
bool l_closed; // full shutdown
- bool l_closing; // close pending (waiting on refcnt)
+ nni_atomic_flag l_closing; // close pending
nni_mtx l_mtx;
nni_cv l_cv;
nni_list l_pipes;
@@ -89,11 +89,9 @@ listener_destroy(nni_listener *l)
nni_aio_fini(l->l_acc_aio);
- nni_mtx_lock(&l->l_mtx);
if (l->l_data != NULL) {
l->l_ops.l_fini(l->l_data);
}
- nni_mtx_unlock(&l->l_mtx);
nni_cv_fini(&l->l_cv);
nni_mtx_fini(&l->l_mtx);
nni_url_free(l->l_url);
@@ -123,12 +121,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
}
l->l_url = url;
l->l_closed = false;
- l->l_closing = false;
l->l_started = false;
l->l_data = NULL;
l->l_refcnt = 1;
l->l_sock = s;
l->l_tran = tran;
+ nni_atomic_flag_reset(&l->l_closing);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -198,7 +196,7 @@ nni_listener_rele(nni_listener *l)
{
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
- if (l->l_closing) {
+ if (l->l_refcnt == 0) {
nni_cv_wake(&l->l_cv);
}
nni_mtx_unlock(&listeners_lk);
@@ -207,13 +205,9 @@ nni_listener_rele(nni_listener *l)
int
nni_listener_shutdown(nni_listener *l)
{
- nni_mtx_lock(&l->l_mtx);
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
return (NNG_ECLOSED);
}
- l->l_closing = true;
- nni_mtx_unlock(&l->l_mtx);
// Abort any remaining in-flight accepts.
nni_aio_close(l->l_acc_aio);
@@ -262,11 +256,9 @@ listener_timer_cb(void *arg)
nni_listener *l = arg;
nni_aio * aio = l->l_tmo_aio;
- nni_mtx_lock(&l->l_mtx);
if (nni_aio_result(aio) == 0) {
listener_accept_start(l);
}
- nni_mtx_unlock(&l->l_mtx);
}
static void
@@ -282,16 +274,12 @@ listener_accept_cb(void *arg)
NNI_ASSERT(data != NULL);
rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
}
- nni_mtx_lock(&l->l_mtx);
switch (rv) {
case 0:
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
- nni_pipe_stop(p);
- return;
- }
+ nni_mtx_lock(&l->l_mtx);
nni_pipe_set_listener(p, l);
nni_list_append(&l->l_pipes, p);
+ nni_mtx_unlock(&l->l_mtx);
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
@@ -310,7 +298,6 @@ listener_accept_cb(void *arg)
nni_sleep_aio(100, l->l_tmo_aio);
break;
}
- nni_mtx_unlock(&l->l_mtx);
if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
nni_pipe_stop(p);
@@ -323,9 +310,6 @@ listener_accept_start(nni_listener *l)
nni_aio *aio = l->l_acc_aio;
// Call with the listener lock held.
- if (l->l_closing) {
- return;
- }
l->l_ops.l_accept(l->l_data, aio);
}
@@ -336,10 +320,6 @@ nni_listener_start(nni_listener *l, int flags)
NNI_ARG_UNUSED(flags);
nni_mtx_lock(&l->l_mtx);
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
- return (NNG_ECLOSED);
- }
if (l->l_started) {
nni_mtx_unlock(&l->l_mtx);
return (NNG_ESTATE);
@@ -351,9 +331,10 @@ nni_listener_start(nni_listener *l, int flags)
}
l->l_started = true;
- listener_accept_start(l);
nni_mtx_unlock(&l->l_mtx);
+ listener_accept_start(l);
+
return (0);
}
@@ -387,8 +368,6 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val,
}
for (o = l->l_ops.l_options; o && o->o_name; o++) {
- int rv;
-
if (strcmp(o->o_name, name) != 0) {
continue;
}
@@ -396,10 +375,7 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val,
return (NNG_EREADONLY);
}
- nni_mtx_lock(&l->l_mtx);
- rv = o->o_set(l->l_data, val, sz, t);
- nni_mtx_unlock(&l->l_mtx);
- return (rv);
+ return (o->o_set(l->l_data, val, sz, t));
}
return (NNG_ENOTSUP);
@@ -412,17 +388,13 @@ nni_listener_getopt(
nni_tran_option *o;
for (o = l->l_ops.l_options; o && o->o_name; o++) {
- int rv;
if (strcmp(o->o_name, name) != 0) {
continue;
}
if (o->o_get == NULL) {
return (NNG_EWRITEONLY);
}
- nni_mtx_lock(&l->l_mtx);
- rv = o->o_get(l->l_data, valp, szp, t);
- nni_mtx_unlock(&l->l_mtx);
- return (rv);
+ return (o->o_get(l->l_data, valp, szp, t));
}
// We provide a fallback on the URL, but let the implementation
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a42cdeff..02f8ea50 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,7 +29,7 @@ struct nni_pipe {
nni_listener * p_listener;
nni_dialer * p_dialer;
bool p_closed;
- bool p_stop;
+ nni_atomic_flag p_stop;
bool p_cbs;
int p_refcnt;
nni_mtx p_mtx;
@@ -102,16 +102,11 @@ nni_pipe_sys_fini(void)
void
nni_pipe_destroy(nni_pipe *p)
{
- bool cbs;
if (p == NULL) {
return;
}
- nni_mtx_lock(&p->p_mtx);
- cbs = p->p_cbs;
- nni_mtx_unlock(&p->p_mtx);
-
- if (cbs) {
+ if (p->p_cbs) {
nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
}
@@ -247,13 +242,9 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
- nni_mtx_lock(&p->p_mtx);
- if (p->p_stop) {
- nni_mtx_unlock(&p->p_mtx);
+ if (nni_atomic_flag_test_and_set(&p->p_stop)) {
return;
}
- p->p_stop = true;
- nni_mtx_unlock(&p->p_mtx);
nni_pipe_close(p);
@@ -283,9 +274,7 @@ nni_pipe_start_cb(void *arg)
return;
}
- nni_mtx_lock(&p->p_mtx);
p->p_cbs = true; // We're running all cbs going forward
- nni_mtx_unlock(&p->p_mtx);
nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
if (nni_pipe_closed(p)) {
@@ -324,10 +313,10 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_proto_data = NULL;
p->p_sock = sock;
p->p_closed = false;
- p->p_stop = false;
p->p_cbs = false;
p->p_refcnt = 0;
+ nni_atomic_flag_reset(&p->p_stop);
NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
diff --git a/src/core/platform.h b/src/core/platform.h
index b709e3ba..607c3827 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -147,6 +147,18 @@ extern void nni_plat_thr_fini(nni_plat_thr *);
extern bool nni_plat_thr_is_self(nni_plat_thr *);
//
+// Atomics support. This will evolve over time.
+//
+
+// nni_atomic_flag supports only test-and-set and reset operations.
+// This can be implemented without locks on any reasonable system, and
+// it corresponds to C11 atomic flag.
+typedef struct nni_atomic_flag nni_atomic_flag;
+
+extern bool nni_atomic_flag_test_and_set(nni_atomic_flag *);
+extern void nni_atomic_flag_reset(nni_atomic_flag *);
+
+//
// Clock Support
//
diff --git a/src/core/socket.c b/src/core/socket.c
index 4d7bbbc1..620c5d19 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -33,8 +33,8 @@ struct nni_ctx {
};
typedef struct sock_option {
- const char *o_name;
- int o_type;
+ const char * o_name;
+ nni_opt_type o_type;
int (*o_get)(nni_sock *, void *, size_t *, nni_opt_type);
int (*o_set)(nni_sock *, const void *, size_t, nni_opt_type);
} sock_option;
@@ -42,7 +42,7 @@ typedef struct sock_option {
typedef struct nni_sockopt {
nni_list_node node;
char * name;
- int typ;
+ nni_opt_type typ;
size_t sz;
void * data;
} nni_sockopt;
diff --git a/src/platform/posix/posix_atomic.c b/src/platform/posix/posix_atomic.c
new file mode 100644
index 00000000..a8d2579d
--- /dev/null
+++ b/src/platform/posix/posix_atomic.c
@@ -0,0 +1,57 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// POSIX atomics.
+
+#include "core/nng_impl.h"
+
+#ifdef NNG_PLATFORM_POSIX
+
+#ifdef NNG_HAVE_STDATOMIC
+
+#include <stdatomic.h>
+bool
+nni_atomic_flag_test_and_set(nni_atomic_flag *f)
+{
+ return (atomic_flag_test_and_set(&f->f));
+}
+
+void
+nni_atomic_flag_reset(nni_atomic_flag *f)
+{
+ atomic_flag_clear(&f->f);
+}
+#else
+
+#include <pthread.h>
+
+static pthread_mutex_t plat_atomic_lock = PTHREAD_MUTEX_INITIALIZER;
+
+bool
+nni_atomic_flag_test_and_set(nni_atomic_flag *f)
+{
+ bool v;
+ pthread_mutex_lock(&plat_atomic_lock);
+ v = f->f;
+ f->f = true;
+ pthread_mutex_unlock(&plat_atomic_lock);
+ return (v);
+}
+
+void
+nni_atomic_flag_reset(nni_atomic_flag *f)
+{
+ pthread_mutex_lock(&plat_atomic_lock);
+ f->f = false;
+ pthread_mutex_unlock(&plat_atomic_lock);
+}
+#endif
+
+#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 33a9c293..70a3615f 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -75,6 +75,19 @@ struct nni_plat_flock {
#define NNG_PLATFORM_DIR_SEP "/"
+#ifdef NNG_HAVE_STDATOMIC
+
+#include <stdatomic.h>
+
+struct nni_atomic_flag {
+ atomic_flag f;
+};
+#else // NNG_HAVE_C11_ATOMIC
+struct nni_atomic_flag {
+ bool f;
+};
+#endif
+
#endif
extern int nni_posix_pollq_sysinit(void);
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index df2ee9d2..cc2ade6f 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -348,4 +348,5 @@ nni_plat_ncpu(void)
return (1);
#endif
}
+
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index 0bd12b24..73dc3660 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -48,6 +48,10 @@ struct nni_plat_cv {
PSRWLOCK srl;
};
+struct nni_atomic_flag {
+ unsigned f;
+};
+
// nni_win_event is used with io completion ports. This allows us to get
// to a specific completion callback without requiring the poller (in the
// completion port) to know anything about the event itself. We also use
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index a3d932aa..243811a0 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -103,6 +103,18 @@ nni_plat_cv_fini(nni_plat_cv *cv)
NNI_ARG_UNUSED(cv);
}
+bool
+nni_atomic_flag_test_and_set(nni_atomic_flag *f)
+{
+ return (InterlockedExchange(&f->f, 1) != 0);
+}
+
+void
+nni_atomic_flag_reset(nni_atomic_flag *f)
+{
+ InterlockedExchange(&f->f, 0);
+}
+
static unsigned int __stdcall nni_plat_thr_main(void *arg)
{
nni_plat_thr *thr = arg;
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 1f4f0b33..f725cadb 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -413,15 +413,14 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- nni_mtx_lock(&s->lk);
- p->busy = false;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->pipe);
- nni_mtx_unlock(&s->lk);
return;
}
+ nni_mtx_lock(&s->lk);
+ p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
if (p->id == s->ctx->pipe_id) {
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 8a0dd4d8..43751d14 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -82,7 +82,8 @@ struct req0_pipe {
nni_pipe * pipe;
req0_sock * req;
nni_list_node node;
- nni_list ctxs; // ctxs with pending traffic
+ nni_list ctxs; // ctxs with pending traffic
+ bool sending; // if busy sending
nni_aio * aio_send;
nni_aio * aio_recv;
};
@@ -264,6 +265,7 @@ req0_pipe_close(void *arg)
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
+ p->sending = false;
if (nni_list_node_active(&p->node)) {
nni_list_node_remove(&p->node);
if (s->closed) {
@@ -311,20 +313,19 @@ req0_send_cb(void *arg)
// in the ready list, and re-run the sendq.
nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->busypipes, p)) {
- nni_list_remove(&s->busypipes, p);
- nni_list_append(&s->readypipes, p);
- if (nni_list_empty(&s->sendq)) {
- nni_pollable_raise(s->sendable);
- }
- req0_run_sendq(s, &aios);
- } else {
- // We wind up here if stop was called from the reader
- // side while we were waiting to be scheduled to run for the
- // writer side. In this case we can't complete the operation,
- // and we have to abort.
- nni_pipe_stop(p->pipe);
+ if (!p->sending) {
+ // This occurs if the req0_pipe_close has been called.
+ // In that case we don't want any more processing.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_remove(&s->busypipes, p);
+ nni_list_append(&s->readypipes, p);
+ p->sending = false;
+ if (nni_list_empty(&s->sendq)) {
+ nni_pollable_raise(s->sendable);
}
+ req0_run_sendq(s, &aios);
nni_mtx_unlock(&s->mtx);
while ((aio = nni_list_first(&aios)) != NULL) {
@@ -533,6 +534,7 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist)
nni_list_remove(&s->readypipes, p);
nni_list_append(&s->busypipes, p);
+ p->sending = true;
if ((aio = ctx->saio) != NULL) {
ctx->saio = NULL;
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index db18a4e8..fbdeb65a 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -402,15 +402,14 @@ resp0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- nni_mtx_lock(&s->mtx);
- p->busy = false;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
- nni_mtx_unlock(&s->mtx);
return;
}
+ nni_mtx_lock(&s->mtx);
+ p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
if (p->id == s->ctx->pipe_id) {
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index b48b82d9..7d99e507 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -827,14 +827,22 @@ static int
ipc_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t)
{
ipc_ep *ep = arg;
- return (nni_copyout_size(ep->rcvmax, data, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, data, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
ipc_ep_get_addr(void *arg, void *data, size_t *szp, nni_opt_type t)
{
ipc_ep *ep = arg;
- return (nni_copyout_sockaddr(&ep->sa, data, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_sockaddr(&ep->sa, data, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -868,7 +876,9 @@ ipc_ep_set_sec_desc(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_ptr(&ptr, data, sz, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
rv = nni_plat_ipc_ep_set_security_descriptor(ep->iep, ptr);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 1a183ecd..e8aa04d0 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -882,7 +882,11 @@ static int
tcp_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcp_ep *ep = arg;
- return (nni_copyout_bool(ep->nodelay, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_bool(ep->nodelay, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -903,7 +907,11 @@ static int
tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcp_ep *ep = arg;
- return (nni_copyout_bool(ep->keepalive, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_bool(ep->keepalive, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -931,7 +939,11 @@ static int
tcp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcp_ep *ep = arg;
- return (nni_copyout_size(ep->rcvmax, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static nni_tran_option tcp_pipe_options[] = {
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index b4f555da..5e1a1e8d 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -877,7 +877,11 @@ static int
tls_ep_get_nodelay(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tls_ep *ep = arg;
- return (nni_copyout_bool(ep->nodelay, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_bool(ep->nodelay, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -898,7 +902,11 @@ static int
tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tls_ep *ep = arg;
- return (nni_copyout_bool(ep->keepalive, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_bool(ep->keepalive, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -947,7 +955,11 @@ static int
tls_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tls_ep *ep = arg;
- return (nni_copyout_size(ep->rcvmax, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_size(ep->rcvmax, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -974,9 +986,11 @@ tls_ep_set_config(void *arg, const void *data, size_t sz, nni_opt_type t)
if (cfg == NULL) {
return (NNG_EINVAL);
}
+ nni_mtx_lock(&ep->mtx);
old = ep->cfg;
nni_tls_config_hold(cfg);
ep->cfg = cfg;
+ nni_mtx_unlock(&ep->mtx);
if (old != NULL) {
nni_tls_config_fini(old);
}
@@ -987,7 +1001,11 @@ static int
tls_ep_get_config(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tls_ep *ep = arg;
- return (nni_copyout_ptr(ep->cfg, v, szp, t));
+ int rv;
+ nni_mtx_lock(&ep->mtx);
+ rv = nni_copyout_ptr(ep->cfg, v, szp, t);
+ nni_mtx_unlock(&ep->mtx);
+ return (rv);
}
static int
@@ -1055,7 +1073,9 @@ tls_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t)
int rv;
if ((rv = tls_ep_chk_string(v, sz, t)) == 0) {
+ nni_mtx_lock(&ep->mtx);
rv = nng_tls_config_cert_key_file(ep->cfg, v, NULL);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 2fa0fd67..b3aef756 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -416,7 +416,11 @@ static int
ws_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
ws_dialer *d = arg;
- return (nni_copyout_size(d->rcvmax, v, szp, t));
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_size(d->rcvmax, v, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
}
static int
@@ -439,7 +443,11 @@ static int
ws_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
ws_listener *l = arg;
- return (nni_copyout_size(l->rcvmax, v, szp, t));
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_size(l->rcvmax, v, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
}
static int
@@ -538,7 +546,9 @@ ws_dialer_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
}
if ((rv = ws_check_string(v, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
rv = ws_set_headers(&d->headers, v);
+ nni_mtx_unlock(&d->mtx);
}
return (rv);
}
@@ -553,7 +563,9 @@ ws_listener_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
return (NNG_EBUSY);
}
if ((rv = ws_check_string(v, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
rv = ws_set_headers(&l->headers, v);
+ nni_mtx_unlock(&l->mtx);
}
return (rv);
}
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 3535a248..a2163e3f 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -2600,7 +2600,9 @@ zt_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
ep->ze_rcvmax = val;
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
}
@@ -2609,7 +2611,11 @@ static int
zt_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_size(ep->ze_rcvmax, data, szp, t));
+ int rv;
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_size(ep->ze_rcvmax, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2634,16 +2640,16 @@ zt_ep_set_home(void *arg, const void *data, size_t sz, nni_opt_type t)
zt_ep *ep = arg;
if ((rv = zt_ep_chk_string(data, sz, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
if (ep->ze_running) {
rv = NNG_ESTATE;
} else {
- nni_mtx_lock(&zt_lk);
nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home));
if ((rv = zt_node_find(ep)) != 0) {
ep->ze_ztn = NULL;
}
- nni_mtx_unlock(&zt_lk);
}
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
@@ -2653,7 +2659,12 @@ static int
zt_ep_get_home(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_str(ep->ze_home, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_str(ep->ze_home, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2663,11 +2674,13 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t)
zt_ep * ep = arg;
uint64_t addr;
+ nni_mtx_lock(&zt_lk);
addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr;
snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u",
(unsigned long long) addr >> zt_port_shift,
(unsigned long long) ep->ze_nwid,
(unsigned) (addr & zt_port_mask));
+ nni_mtx_unlock(&zt_lk);
return (nni_copyout_str(ustr, data, szp, t));
}
@@ -2750,14 +2763,24 @@ static int
zt_ep_get_node(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_u64(ep->ze_ztn->zn_self, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_u64(ep->ze_ztn->zn_self, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
zt_ep_get_nwid(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_u64(ep->ze_nwid, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_u64(ep->ze_nwid, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2788,7 +2811,9 @@ zt_ep_set_ping_time(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
ep->ze_ping_time = val;
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
}
@@ -2797,7 +2822,12 @@ static int
zt_ep_get_ping_time(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_ms(ep->ze_ping_time, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_ms(ep->ze_ping_time, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2814,7 +2844,9 @@ zt_ep_set_ping_tries(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
ep->ze_ping_tries = val;
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
}
@@ -2823,7 +2855,12 @@ static int
zt_ep_get_ping_tries(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_int(ep->ze_ping_tries, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_int(ep->ze_ping_tries, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2834,7 +2871,9 @@ zt_ep_set_conn_time(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
ep->ze_conn_time = val;
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
}
@@ -2843,7 +2882,12 @@ static int
zt_ep_get_conn_time(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_ms(ep->ze_conn_time, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_ms(ep->ze_conn_time, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int
@@ -2854,7 +2898,9 @@ zt_ep_set_conn_tries(void *arg, const void *data, size_t sz, nni_opt_type t)
int rv;
if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) {
+ nni_mtx_lock(&zt_lk);
ep->ze_conn_tries = val;
+ nni_mtx_unlock(&zt_lk);
}
return (rv);
}
@@ -2863,7 +2909,12 @@ static int
zt_ep_get_conn_tries(void *arg, void *data, size_t *szp, nni_opt_type t)
{
zt_ep *ep = arg;
- return (nni_copyout_int(ep->ze_conn_tries, data, szp, t));
+ int rv;
+
+ nni_mtx_lock(&zt_lk);
+ rv = nni_copyout_int(ep->ze_conn_tries, data, szp, t);
+ nni_mtx_unlock(&zt_lk);
+ return (rv);
}
static int