diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 153 |
1 files changed, 40 insertions, 113 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index e170289d..e8b1211f 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -109,9 +109,6 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); -static void dialer_shutdown_locked(nni_dialer *); -static void listener_shutdown_locked(nni_listener *); - #define SOCK(s) ((nni_sock *) (s)) static int @@ -693,15 +690,21 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = true; - // Close the EPs. This prevents new connections from forming - // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_listeners, l) { - listener_shutdown_locked(l); - } - NNI_LIST_FOREACH (&sock->s_dialers, d) { - dialer_shutdown_locked(d); + while ((l = nni_list_first(&sock->s_listeners)) != NULL) { + nni_listener_hold(l); + nni_list_node_remove(&l->l_node); + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); } + while ((d = nni_list_first(&sock->s_dialers)) != NULL) { + nni_dialer_hold(d); + nni_list_node_remove(&d->d_node); + nni_mtx_unlock(&sock->s_mx); + nni_dialer_close(d); + nni_mtx_lock(&sock->s_mx); + } nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -738,41 +741,21 @@ nni_sock_shutdown(nni_sock *sock) // At this point, we've done everything we politely can to // give the protocol a chance to flush its write side. Now - // its time to be a little more insistent. + // it is time to be a little more insistent. // Close the upper queues immediately. This can happen // safely while we hold the lock. nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the dialers and listeners, attempting to close them. - // We might already have a close in progress, in which case - // we skip past it; it will be removed from another thread. - NNI_LIST_FOREACH (&sock->s_listeners, l) { - if (nni_listener_hold(l) == 0) { - nni_listener_close_rele(l); - } - } - NNI_LIST_FOREACH (&sock->s_dialers, d) { - if (nni_dialer_hold(d) == 0) { - nni_dialer_close_rele(d); - } - } - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. However, it is possible for - // a pipe to have been added by an endpoint due to racing conditions - // in the shutdown. Therefore it is important that we shutdown pipes - // *last*. + // expect there not to be any here. NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_close(pipe); } - // We have to wait for *both* endpoints and pipes to be - // removed. - while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_listeners)) || - (!nni_list_empty(&sock->s_dialers))) { + // We have to wait for pipes to be removed. + while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } @@ -1453,11 +1436,7 @@ static void dialer_timer_start_locked(nni_dialer *d) { nni_duration back_off; - nni_sock * sock = d->d_sock; - if (d->d_closing || sock->s_closed) { - return; - } back_off = d->d_currtime; if (d->d_maxrtime > 0) { d->d_currtime *= 2; @@ -1494,11 +1473,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { - d->d_tran->tran_pipe->p_fini(tpipe); - nni_mtx_unlock(&s->s_mx); - return; - } if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { nni_mtx_unlock(&s->s_mx); return; @@ -1544,38 +1518,23 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); } -static void -dialer_shutdown_impl(nni_dialer *d) +void +nni_dialer_shutdown(nni_dialer *d) { + nni_sock *s = d->d_sock; nni_pipe *p; - // Abort any remaining in-flight operations. - nni_aio_close(&d->d_con_aio); - nni_aio_close(&d->d_tmo_aio); + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } - // Stop the underlying transport. - d->d_ops.d_close(d->d_data); + nni_dialer_stop(d); + nni_mtx_lock(&s->s_mx); NNI_LIST_FOREACH (&d->d_pipes, p) { nni_pipe_close(p); } -} - -static void -dialer_shutdown_locked(nni_dialer *d) -{ - if (!d->d_closing) { - d->d_closing = true; - dialer_shutdown_impl(d); - } -} - -void -nni_dialer_shutdown(nni_dialer *d) -{ - nni_sock *s = d->d_sock; - nni_mtx_lock(&s->s_mx); - dialer_shutdown_locked(d); + nni_list_node_remove(&d->d_node); nni_mtx_unlock(&s->s_mx); } @@ -1592,9 +1551,6 @@ dialer_reap(void *arg) nni_dialer *d = arg; nni_sock * s = d->d_sock; - nni_aio_stop(&d->d_tmo_aio); - nni_aio_stop(&d->d_con_aio); - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&d->st_root); #endif @@ -1612,13 +1568,12 @@ dialer_reap(void *arg) return; } - nni_list_remove(&s->s_dialers, d); - if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { - nni_cv_wake(&s->s_cv); - } + nni_list_node_remove(&d->d_node); nni_mtx_unlock(&s->s_mx); + nni_sock_rele(s); + nni_dialer_destroy(d); } @@ -1635,12 +1590,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { - l->l_tran->tran_pipe->p_fini(tpipe); - nni_mtx_unlock(&s->s_mx); - return; - } - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { nni_mtx_unlock(&s->s_mx); return; @@ -1684,39 +1633,22 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe_rele(p); } -static void -listener_shutdown_impl(nni_listener *l) +void +nni_listener_shutdown(nni_listener *l) { + nni_sock *s = l->l_sock; nni_pipe *p; - // Abort any remaining in-flight accepts. - nni_aio_close(&l->l_acc_aio); - nni_aio_close(&l->l_tmo_aio); + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } - // Stop the underlying transport. - l->l_ops.l_close(l->l_data); + nni_listener_stop(l); + nni_mtx_lock(&s->s_mx); NNI_LIST_FOREACH (&l->l_pipes, p) { nni_pipe_close(p); } -} - -static void -listener_shutdown_locked(nni_listener *l) -{ - if (!l->l_closing) { - l->l_closing = true; - listener_shutdown_impl(l); - } -} - -void -nni_listener_shutdown(nni_listener *l) -{ - nni_sock *s = l->l_sock; - - nni_mtx_lock(&s->s_mx); - listener_shutdown_locked(l); nni_mtx_unlock(&s->s_mx); } @@ -1733,9 +1665,6 @@ listener_reap(void *arg) nni_listener *l = arg; nni_sock * s = l->l_sock; - nni_aio_stop(&l->l_tmo_aio); - nni_aio_stop(&l->l_acc_aio); - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&l->st_root); #endif @@ -1753,13 +1682,11 @@ listener_reap(void *arg) return; } - nni_list_remove(&s->s_listeners, l); - if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { - nni_cv_wake(&s->s_cv); - } - + nni_list_node_remove(&l->l_node); nni_mtx_unlock(&s->s_mx); + nni_sock_rele(s); + nni_listener_destroy(l); } |
