aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-02 22:36:08 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-03 19:00:19 -0700
commitd1a9c84a6b375cb25a8b7475957130e364b41753 (patch)
tree5444721d96a84d92e3ed258b4d51f80adf6b200c /src/core
parenta772bcc6ebe198f939889abbda18eded2a326941 (diff)
downloadnng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.gz
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.bz2
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.zip
fixes #572 Several locking errors found
fixes #573 atomic flags could help This introduces a new atomic flag, and reduces some of the global locking. The lock refactoring work is not yet complete, but this is a positive step forward, and should help with certain things. While here we also fixed a compile warning due to incorrect types.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/dialer.c48
-rw-r--r--src/core/listener.c48
-rw-r--r--src/core/pipe.c19
-rw-r--r--src/core/platform.h12
-rw-r--r--src/core/socket.c6
5 files changed, 37 insertions, 96 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 09ecdac5..e93c893e 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -27,7 +27,7 @@ struct nni_dialer {
bool d_synch; // synchronous connect in progress?
bool d_started;
bool d_closed; // full shutdown
- bool d_closing; // close pending (waiting on refcnt)
+ nni_atomic_flag d_closing; // close pending (waiting on refcnt)
nni_mtx d_mtx;
nni_cv d_cv;
nni_list d_pipes;
@@ -96,11 +96,9 @@ dialer_destroy(nni_dialer *d)
nni_aio_fini(d->d_con_aio);
nni_aio_fini(d->d_tmo_aio);
- nni_mtx_lock(&d->d_mtx);
if (d->d_data != NULL) {
d->d_ops.d_fini(d->d_data);
}
- nni_mtx_unlock(&d->d_mtx);
nni_cv_fini(&d->d_cv);
nni_mtx_fini(&d->d_mtx);
nni_url_free(d->d_url);
@@ -130,12 +128,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
}
d->d_url = url;
d->d_closed = false;
- d->d_closing = false;
d->d_started = false;
d->d_data = NULL;
d->d_refcnt = 1;
d->d_sock = s;
d->d_tran = tran;
+ nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -205,7 +203,7 @@ nni_dialer_rele(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
- if (d->d_closing) {
+ if (d->d_refcnt == 0) {
nni_cv_wake(&d->d_cv);
}
nni_mtx_unlock(&dialers_lk);
@@ -214,13 +212,9 @@ nni_dialer_rele(nni_dialer *d)
int
nni_dialer_shutdown(nni_dialer *d)
{
- nni_mtx_lock(&d->d_mtx);
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
return (NNG_ECLOSED);
}
- d->d_closing = true;
- nni_mtx_unlock(&d->d_mtx);
// Abort any remaining in-flight operations.
nni_aio_close(d->d_con_aio);
@@ -269,9 +263,6 @@ dialer_timer_start(nni_dialer *d)
{
nni_duration backoff;
- if (d->d_closing) {
- return;
- }
backoff = d->d_currtime;
d->d_currtime *= 2;
if (d->d_currtime > d->d_maxrtime) {
@@ -319,11 +310,6 @@ dialer_connect_cb(void *arg)
synch = d->d_synch;
d->d_synch = false;
if (rv == 0) {
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
- nni_pipe_stop(p);
- return;
- }
nni_pipe_set_dialer(p, d);
nni_list_append(&d->d_pipes, p);
@@ -372,10 +358,6 @@ dialer_connect_start(nni_dialer *d)
nni_aio *aio = d->d_con_aio;
// Call with the Endpoint lock held.
- if (d->d_closing) {
- return;
- }
-
d->d_ops.d_connect(d->d_data, aio);
}
@@ -390,11 +372,6 @@ nni_dialer_start(nni_dialer *d, int flags)
nni_mtx_lock(&d->d_mtx);
- if (d->d_closing) {
- nni_mtx_unlock(&d->d_mtx);
- return (NNG_ECLOSED);
- }
-
if (d->d_started) {
nni_mtx_unlock(&d->d_mtx);
return (NNG_ESTATE);
@@ -411,10 +388,10 @@ nni_dialer_start(nni_dialer *d, int flags)
d->d_started = true;
dialer_connect_start(d);
- while (d->d_synch && !d->d_closing) {
+ while (d->d_synch) {
nni_cv_wait(&d->d_cv);
}
- rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ rv = d->d_lastrv;
nni_cv_wake(&d->d_cv);
if (rv != 0) {
@@ -478,8 +455,6 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
}
for (o = d->d_ops.d_options; o && o->o_name; o++) {
- int rv;
-
if (strcmp(o->o_name, name) != 0) {
continue;
}
@@ -487,10 +462,7 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
return (NNG_EREADONLY);
}
- nni_mtx_lock(&d->d_mtx);
- rv = o->o_set(d->d_data, val, sz, t);
- nni_mtx_unlock(&d->d_mtx);
- return (rv);
+ return (o->o_set(d->d_data, val, sz, t));
}
return (NNG_ENOTSUP);
@@ -518,17 +490,13 @@ nni_dialer_getopt(
}
for (o = d->d_ops.d_options; o && o->o_name; o++) {
- int rv;
if (strcmp(o->o_name, name) != 0) {
continue;
}
if (o->o_get == NULL) {
return (NNG_EWRITEONLY);
}
- nni_mtx_lock(&d->d_mtx);
- rv = o->o_get(d->d_data, valp, szp, t);
- nni_mtx_unlock(&d->d_mtx);
- return (rv);
+ return (o->o_get(d->d_data, valp, szp, t));
}
// We provide a fallback on the URL, but let the implementation
diff --git a/src/core/listener.c b/src/core/listener.c
index 31c154bc..8e06076d 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -25,7 +25,7 @@ struct nni_listener {
int l_refcnt;
bool l_started;
bool l_closed; // full shutdown
- bool l_closing; // close pending (waiting on refcnt)
+ nni_atomic_flag l_closing; // close pending
nni_mtx l_mtx;
nni_cv l_cv;
nni_list l_pipes;
@@ -89,11 +89,9 @@ listener_destroy(nni_listener *l)
nni_aio_fini(l->l_acc_aio);
- nni_mtx_lock(&l->l_mtx);
if (l->l_data != NULL) {
l->l_ops.l_fini(l->l_data);
}
- nni_mtx_unlock(&l->l_mtx);
nni_cv_fini(&l->l_cv);
nni_mtx_fini(&l->l_mtx);
nni_url_free(l->l_url);
@@ -123,12 +121,12 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
}
l->l_url = url;
l->l_closed = false;
- l->l_closing = false;
l->l_started = false;
l->l_data = NULL;
l->l_refcnt = 1;
l->l_sock = s;
l->l_tran = tran;
+ nni_atomic_flag_reset(&l->l_closing);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -198,7 +196,7 @@ nni_listener_rele(nni_listener *l)
{
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
- if (l->l_closing) {
+ if (l->l_refcnt == 0) {
nni_cv_wake(&l->l_cv);
}
nni_mtx_unlock(&listeners_lk);
@@ -207,13 +205,9 @@ nni_listener_rele(nni_listener *l)
int
nni_listener_shutdown(nni_listener *l)
{
- nni_mtx_lock(&l->l_mtx);
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
return (NNG_ECLOSED);
}
- l->l_closing = true;
- nni_mtx_unlock(&l->l_mtx);
// Abort any remaining in-flight accepts.
nni_aio_close(l->l_acc_aio);
@@ -262,11 +256,9 @@ listener_timer_cb(void *arg)
nni_listener *l = arg;
nni_aio * aio = l->l_tmo_aio;
- nni_mtx_lock(&l->l_mtx);
if (nni_aio_result(aio) == 0) {
listener_accept_start(l);
}
- nni_mtx_unlock(&l->l_mtx);
}
static void
@@ -282,16 +274,12 @@ listener_accept_cb(void *arg)
NNI_ASSERT(data != NULL);
rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
}
- nni_mtx_lock(&l->l_mtx);
switch (rv) {
case 0:
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
- nni_pipe_stop(p);
- return;
- }
+ nni_mtx_lock(&l->l_mtx);
nni_pipe_set_listener(p, l);
nni_list_append(&l->l_pipes, p);
+ nni_mtx_unlock(&l->l_mtx);
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
@@ -310,7 +298,6 @@ listener_accept_cb(void *arg)
nni_sleep_aio(100, l->l_tmo_aio);
break;
}
- nni_mtx_unlock(&l->l_mtx);
if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
nni_pipe_stop(p);
@@ -323,9 +310,6 @@ listener_accept_start(nni_listener *l)
nni_aio *aio = l->l_acc_aio;
// Call with the listener lock held.
- if (l->l_closing) {
- return;
- }
l->l_ops.l_accept(l->l_data, aio);
}
@@ -336,10 +320,6 @@ nni_listener_start(nni_listener *l, int flags)
NNI_ARG_UNUSED(flags);
nni_mtx_lock(&l->l_mtx);
- if (l->l_closing) {
- nni_mtx_unlock(&l->l_mtx);
- return (NNG_ECLOSED);
- }
if (l->l_started) {
nni_mtx_unlock(&l->l_mtx);
return (NNG_ESTATE);
@@ -351,9 +331,10 @@ nni_listener_start(nni_listener *l, int flags)
}
l->l_started = true;
- listener_accept_start(l);
nni_mtx_unlock(&l->l_mtx);
+ listener_accept_start(l);
+
return (0);
}
@@ -387,8 +368,6 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val,
}
for (o = l->l_ops.l_options; o && o->o_name; o++) {
- int rv;
-
if (strcmp(o->o_name, name) != 0) {
continue;
}
@@ -396,10 +375,7 @@ nni_listener_setopt(nni_listener *l, const char *name, const void *val,
return (NNG_EREADONLY);
}
- nni_mtx_lock(&l->l_mtx);
- rv = o->o_set(l->l_data, val, sz, t);
- nni_mtx_unlock(&l->l_mtx);
- return (rv);
+ return (o->o_set(l->l_data, val, sz, t));
}
return (NNG_ENOTSUP);
@@ -412,17 +388,13 @@ nni_listener_getopt(
nni_tran_option *o;
for (o = l->l_ops.l_options; o && o->o_name; o++) {
- int rv;
if (strcmp(o->o_name, name) != 0) {
continue;
}
if (o->o_get == NULL) {
return (NNG_EWRITEONLY);
}
- nni_mtx_lock(&l->l_mtx);
- rv = o->o_get(l->l_data, valp, szp, t);
- nni_mtx_unlock(&l->l_mtx);
- return (rv);
+ return (o->o_get(l->l_data, valp, szp, t));
}
// We provide a fallback on the URL, but let the implementation
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a42cdeff..02f8ea50 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,7 +29,7 @@ struct nni_pipe {
nni_listener * p_listener;
nni_dialer * p_dialer;
bool p_closed;
- bool p_stop;
+ nni_atomic_flag p_stop;
bool p_cbs;
int p_refcnt;
nni_mtx p_mtx;
@@ -102,16 +102,11 @@ nni_pipe_sys_fini(void)
void
nni_pipe_destroy(nni_pipe *p)
{
- bool cbs;
if (p == NULL) {
return;
}
- nni_mtx_lock(&p->p_mtx);
- cbs = p->p_cbs;
- nni_mtx_unlock(&p->p_mtx);
-
- if (cbs) {
+ if (p->p_cbs) {
nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
}
@@ -247,13 +242,9 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
- nni_mtx_lock(&p->p_mtx);
- if (p->p_stop) {
- nni_mtx_unlock(&p->p_mtx);
+ if (nni_atomic_flag_test_and_set(&p->p_stop)) {
return;
}
- p->p_stop = true;
- nni_mtx_unlock(&p->p_mtx);
nni_pipe_close(p);
@@ -283,9 +274,7 @@ nni_pipe_start_cb(void *arg)
return;
}
- nni_mtx_lock(&p->p_mtx);
p->p_cbs = true; // We're running all cbs going forward
- nni_mtx_unlock(&p->p_mtx);
nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
if (nni_pipe_closed(p)) {
@@ -324,10 +313,10 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_proto_data = NULL;
p->p_sock = sock;
p->p_closed = false;
- p->p_stop = false;
p->p_cbs = false;
p->p_refcnt = 0;
+ nni_atomic_flag_reset(&p->p_stop);
NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
diff --git a/src/core/platform.h b/src/core/platform.h
index b709e3ba..607c3827 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -147,6 +147,18 @@ extern void nni_plat_thr_fini(nni_plat_thr *);
extern bool nni_plat_thr_is_self(nni_plat_thr *);
//
+// Atomics support. This will evolve over time.
+//
+
+// nni_atomic_flag supports only test-and-set and reset operations.
+// This can be implemented without locks on any reasonable system, and
+// it corresponds to C11 atomic flag.
+typedef struct nni_atomic_flag nni_atomic_flag;
+
+extern bool nni_atomic_flag_test_and_set(nni_atomic_flag *);
+extern void nni_atomic_flag_reset(nni_atomic_flag *);
+
+//
// Clock Support
//
diff --git a/src/core/socket.c b/src/core/socket.c
index 4d7bbbc1..620c5d19 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -33,8 +33,8 @@ struct nni_ctx {
};
typedef struct sock_option {
- const char *o_name;
- int o_type;
+ const char * o_name;
+ nni_opt_type o_type;
int (*o_get)(nni_sock *, void *, size_t *, nni_opt_type);
int (*o_set)(nni_sock *, const void *, size_t, nni_opt_type);
} sock_option;
@@ -42,7 +42,7 @@ typedef struct sock_option {
typedef struct nni_sockopt {
nni_list_node node;
char * name;
- int typ;
+ nni_opt_type typ;
size_t sz;
void * data;
} nni_sockopt;