aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-18 13:23:58 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-18 15:54:32 -0700
commitd4d49cef92356778215c757e8f6d1d0d35662a64 (patch)
treef28de7321f0b301faea84cd38ce137db2b126dbc /src
parentb310f712828962bf3187caf3bfe064c3531c5628 (diff)
downloadnng-d4d49cef92356778215c757e8f6d1d0d35662a64.tar.gz
nng-d4d49cef92356778215c757e8f6d1d0d35662a64.tar.bz2
nng-d4d49cef92356778215c757e8f6d1d0d35662a64.zip
fixes #601 pipe destroy can fail to close pipe
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c35
-rw-r--r--src/core/listener.c35
-rw-r--r--src/core/pipe.c8
-rw-r--r--src/core/socket.c36
-rw-r--r--src/core/sockimpl.h9
5 files changed, 79 insertions, 44 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 3144d673..34e90891 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -92,14 +92,14 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- d->d_url = url;
- d->d_closed = false;
- d->d_data = NULL;
- d->d_refcnt = 1;
- d->d_sock = s;
- d->d_tran = tran;
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_closing = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
nni_atomic_flag_reset(&d->d_started);
- nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -175,6 +175,27 @@ nni_dialer_rele(nni_dialer *d)
}
void
+nni_dialer_close_rele(nni_dialer *d)
+{
+ nni_mtx_lock(&dialers_lk);
+ if (d->d_closed) {
+ nni_mtx_unlock(&dialers_lk);
+ nni_dialer_rele(d);
+ return;
+ }
+ d->d_closed = true;
+ nni_mtx_unlock(&dialers_lk);
+
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(dialers, d->d_id);
+
+ nni_dialer_rele(d);
+}
+
+void
nni_dialer_close(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
diff --git a/src/core/listener.c b/src/core/listener.c
index 1bfa657d..debfa5f1 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -91,13 +91,13 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- l->l_url = url;
- l->l_closed = false;
- l->l_data = NULL;
- l->l_refcnt = 1;
- l->l_sock = s;
- l->l_tran = tran;
- nni_atomic_flag_reset(&l->l_closing);
+ l->l_url = url;
+ l->l_closed = false;
+ l->l_closing = false;
+ l->l_data = NULL;
+ l->l_refcnt = 1;
+ l->l_sock = s;
+ l->l_tran = tran;
nni_atomic_flag_reset(&l->l_started);
// Make a copy of the endpoint operations. This allows us to
@@ -193,6 +193,27 @@ nni_listener_close(nni_listener *l)
nni_listener_rele(l); // This will trigger a reap if id count is zero.
}
+void
+nni_listener_close_rele(nni_listener *l)
+{
+ // Listener should already be shutdown. The socket lock may be held.
+ nni_mtx_lock(&listeners_lk);
+ if (l->l_closed) {
+ nni_mtx_unlock(&listeners_lk);
+ nni_listener_rele(l);
+ return;
+ }
+ l->l_closed = true;
+ nni_mtx_unlock(&listeners_lk);
+
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(listeners, l->l_id);
+ nni_listener_rele(l); // This will trigger a reap if id count is zero.
+}
+
static void
listener_timer_cb(void *arg)
{
diff --git a/src/core/pipe.c b/src/core/pipe.c
index e95fe1d4..bac9203b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -54,8 +54,8 @@ nni_pipe_sys_fini(void)
}
}
-void
-nni_pipe_destroy(nni_pipe *p)
+static void
+pipe_destroy(nni_pipe *p)
{
if (p == NULL) {
return;
@@ -174,7 +174,7 @@ nni_pipe_close(nni_pipe *p)
p->p_tran_ops.p_close(p->p_tran_data);
}
- nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p);
+ nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
bool
@@ -263,7 +263,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
- nni_pipe_destroy(p);
+ nni_pipe_close(p);
return (rv);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 0ca5e22d..f4e59af5 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -449,7 +449,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
}
s->s_sndtimeo = -1;
s->s_rcvtimeo = -1;
- s->s_closing = 0;
s->s_reconn = NNI_SECOND;
s->s_reconnmax = 0;
s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
@@ -665,12 +664,12 @@ nni_sock_shutdown(nni_sock *sock)
// we skip past it; it will be removed from another thread.
NNI_LIST_FOREACH (&sock->s_listeners, l) {
if (nni_listener_hold(l) == 0) {
- nni_listener_close(l);
+ nni_listener_close_rele(l);
}
}
NNI_LIST_FOREACH (&sock->s_dialers, d) {
if (nni_dialer_hold(d) == 0) {
- nni_dialer_close(d);
+ nni_dialer_close_rele(d);
}
}
@@ -1390,9 +1389,9 @@ nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closed) {
+ if (s->s_closed || d->d_closing) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_destroy(p);
+ nni_pipe_close(p);
return;
}
@@ -1427,21 +1426,18 @@ dialer_shutdown_impl(nni_dialer *d)
static void
dialer_shutdown_locked(nni_dialer *d)
{
- if (nni_atomic_flag_test_and_set(&d->d_closing)) {
- return;
+ if (!d->d_closing) {
+ d->d_closing = true;
+ dialer_shutdown_impl(d);
}
- dialer_shutdown_impl(d);
}
void
nni_dialer_shutdown(nni_dialer *d)
{
nni_sock *s = d->d_sock;
- if (nni_atomic_flag_test_and_set(&d->d_closing)) {
- return;
- }
nni_mtx_lock(&s->s_mx);
- dialer_shutdown_impl(d);
+ dialer_shutdown_locked(d);
nni_mtx_unlock(&s->s_mx);
}
@@ -1482,9 +1478,9 @@ nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
nni_sock *s = l->l_sock;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closed) {
+ if (s->s_closed || l->l_closing) {
nni_mtx_unlock(&s->s_mx);
- nni_pipe_destroy(p);
+ nni_pipe_close(p);
return;
}
p->p_listener = l;
@@ -1516,10 +1512,10 @@ listener_shutdown_impl(nni_listener *l)
static void
listener_shutdown_locked(nni_listener *l)
{
- if (nni_atomic_flag_test_and_set(&l->l_closing)) {
- return;
+ if (!l->l_closing) {
+ l->l_closing = true;
+ listener_shutdown_impl(l);
}
- listener_shutdown_impl(l);
}
void
@@ -1527,12 +1523,8 @@ nni_listener_shutdown(nni_listener *l)
{
nni_sock *s = l->l_sock;
- if (nni_atomic_flag_test_and_set(&l->l_closing)) {
- return;
- }
-
nni_mtx_lock(&s->s_mx);
- listener_shutdown_impl(l);
+ listener_shutdown_locked(l);
nni_mtx_unlock(&s->s_mx);
}
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 207a83b3..569e5cae 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -28,8 +28,8 @@ struct nni_dialer {
int d_lastrv; // last result from synchronous
bool d_synch; // synchronous connect in progress?
bool d_closed; // full shutdown
+ bool d_closing;
nni_atomic_flag d_started;
- nni_atomic_flag d_closing; // close pending (waiting on refcnt)
nni_mtx d_mtx;
nni_cv d_cv;
nni_list d_pipes;
@@ -51,9 +51,9 @@ struct nni_listener {
nni_sock * l_sock;
nni_url * l_url;
int l_refcnt;
- bool l_closed; // full shutdown
+ bool l_closed; // full shutdown
+ bool l_closing; // close started (shutdown)
nni_atomic_flag l_started;
- nni_atomic_flag l_closing; // close started (shutdown)
nni_list l_pipes;
nni_aio * l_acc_aio;
nni_aio * l_tmo_aio;
@@ -92,15 +92,16 @@ extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
+extern void nni_dialer_close_rele(nni_dialer *);
extern void nni_listener_add_pipe(nni_listener *, nni_pipe *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
+extern void nni_listener_close_rele(nni_listener *);
extern void nni_pipe_remove(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
-extern void nni_pipe_destroy(nni_pipe *);
extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *);
extern void nni_pipe_start(nni_pipe *);