aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-09-04 17:27:54 -0700
committerGitHub <noreply@github.com>2021-09-04 17:27:54 -0700
commitd137bf383892c53265593d9a5ac17e64444091c9 (patch)
treed06570c8f18d47057ff2158f5a9e5fd92e5cfeaf /src/core/socket.c
parentd4b91214c125de33d2e8d3f52fecd8eac18e476c (diff)
downloadnng-d137bf383892c53265593d9a5ac17e64444091c9.tar.gz
nng-d137bf383892c53265593d9a5ac17e64444091c9.tar.bz2
nng-d137bf383892c53265593d9a5ac17e64444091c9.zip
fixes #1498 Endpoint close/shutdown could be synchronous (#1499)
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c153
1 files changed, 40 insertions, 113 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index e170289d..e8b1211f 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -109,9 +109,6 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
-static void dialer_shutdown_locked(nni_dialer *);
-static void listener_shutdown_locked(nni_listener *);
-
#define SOCK(s) ((nni_sock *) (s))
static int
@@ -693,15 +690,21 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = true;
- // Close the EPs. This prevents new connections from forming
- // but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_listeners, l) {
- listener_shutdown_locked(l);
- }
- NNI_LIST_FOREACH (&sock->s_dialers, d) {
- dialer_shutdown_locked(d);
+ while ((l = nni_list_first(&sock->s_listeners)) != NULL) {
+ nni_listener_hold(l);
+ nni_list_node_remove(&l->l_node);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
}
+ while ((d = nni_list_first(&sock->s_dialers)) != NULL) {
+ nni_dialer_hold(d);
+ nni_list_node_remove(&d->d_node);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_dialer_close(d);
+ nni_mtx_lock(&sock->s_mx);
+ }
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -738,41 +741,21 @@ nni_sock_shutdown(nni_sock *sock)
// At this point, we've done everything we politely can to
// give the protocol a chance to flush its write side. Now
- // its time to be a little more insistent.
+ // it is time to be a little more insistent.
// Close the upper queues immediately. This can happen
// safely while we hold the lock.
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the dialers and listeners, attempting to close them.
- // We might already have a close in progress, in which case
- // we skip past it; it will be removed from another thread.
- NNI_LIST_FOREACH (&sock->s_listeners, l) {
- if (nni_listener_hold(l) == 0) {
- nni_listener_close_rele(l);
- }
- }
- NNI_LIST_FOREACH (&sock->s_dialers, d) {
- if (nni_dialer_hold(d) == 0) {
- nni_dialer_close_rele(d);
- }
- }
-
// For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here. However, it is possible for
- // a pipe to have been added by an endpoint due to racing conditions
- // in the shutdown. Therefore it is important that we shutdown pipes
- // *last*.
+ // expect there not to be any here.
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_close(pipe);
}
- // We have to wait for *both* endpoints and pipes to be
- // removed.
- while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_listeners)) ||
- (!nni_list_empty(&sock->s_dialers))) {
+ // We have to wait for pipes to be removed.
+ while (!nni_list_empty(&sock->s_pipes)) {
nni_cv_wait(&sock->s_cv);
}
@@ -1453,11 +1436,7 @@ static void
dialer_timer_start_locked(nni_dialer *d)
{
nni_duration back_off;
- nni_sock * sock = d->d_sock;
- if (d->d_closing || sock->s_closed) {
- return;
- }
back_off = d->d_currtime;
if (d->d_maxrtime > 0) {
d->d_currtime *= 2;
@@ -1494,11 +1473,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
- d->d_tran->tran_pipe->p_fini(tpipe);
- nni_mtx_unlock(&s->s_mx);
- return;
- }
if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;
@@ -1544,38 +1518,23 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_pipe_rele(p);
}
-static void
-dialer_shutdown_impl(nni_dialer *d)
+void
+nni_dialer_shutdown(nni_dialer *d)
{
+ nni_sock *s = d->d_sock;
nni_pipe *p;
- // Abort any remaining in-flight operations.
- nni_aio_close(&d->d_con_aio);
- nni_aio_close(&d->d_tmo_aio);
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
- // Stop the underlying transport.
- d->d_ops.d_close(d->d_data);
+ nni_dialer_stop(d);
+ nni_mtx_lock(&s->s_mx);
NNI_LIST_FOREACH (&d->d_pipes, p) {
nni_pipe_close(p);
}
-}
-
-static void
-dialer_shutdown_locked(nni_dialer *d)
-{
- if (!d->d_closing) {
- d->d_closing = true;
- dialer_shutdown_impl(d);
- }
-}
-
-void
-nni_dialer_shutdown(nni_dialer *d)
-{
- nni_sock *s = d->d_sock;
- nni_mtx_lock(&s->s_mx);
- dialer_shutdown_locked(d);
+ nni_list_node_remove(&d->d_node);
nni_mtx_unlock(&s->s_mx);
}
@@ -1592,9 +1551,6 @@ dialer_reap(void *arg)
nni_dialer *d = arg;
nni_sock * s = d->d_sock;
- nni_aio_stop(&d->d_tmo_aio);
- nni_aio_stop(&d->d_con_aio);
-
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&d->st_root);
#endif
@@ -1612,13 +1568,12 @@ dialer_reap(void *arg)
return;
}
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
+ nni_list_node_remove(&d->d_node);
nni_mtx_unlock(&s->s_mx);
+ nni_sock_rele(s);
+
nni_dialer_destroy(d);
}
@@ -1635,12 +1590,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
- l->l_tran->tran_pipe->p_fini(tpipe);
- nni_mtx_unlock(&s->s_mx);
- return;
- }
-
if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;
@@ -1684,39 +1633,22 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe_rele(p);
}
-static void
-listener_shutdown_impl(nni_listener *l)
+void
+nni_listener_shutdown(nni_listener *l)
{
+ nni_sock *s = l->l_sock;
nni_pipe *p;
- // Abort any remaining in-flight accepts.
- nni_aio_close(&l->l_acc_aio);
- nni_aio_close(&l->l_tmo_aio);
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
- // Stop the underlying transport.
- l->l_ops.l_close(l->l_data);
+ nni_listener_stop(l);
+ nni_mtx_lock(&s->s_mx);
NNI_LIST_FOREACH (&l->l_pipes, p) {
nni_pipe_close(p);
}
-}
-
-static void
-listener_shutdown_locked(nni_listener *l)
-{
- if (!l->l_closing) {
- l->l_closing = true;
- listener_shutdown_impl(l);
- }
-}
-
-void
-nni_listener_shutdown(nni_listener *l)
-{
- nni_sock *s = l->l_sock;
-
- nni_mtx_lock(&s->s_mx);
- listener_shutdown_locked(l);
nni_mtx_unlock(&s->s_mx);
}
@@ -1733,9 +1665,6 @@ listener_reap(void *arg)
nni_listener *l = arg;
nni_sock * s = l->l_sock;
- nni_aio_stop(&l->l_tmo_aio);
- nni_aio_stop(&l->l_acc_aio);
-
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&l->st_root);
#endif
@@ -1753,13 +1682,11 @@ listener_reap(void *arg)
return;
}
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
-
+ nni_list_node_remove(&l->l_node);
nni_mtx_unlock(&s->s_mx);
+ nni_sock_rele(s);
+
nni_listener_destroy(l);
}