diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/dialer.c | 58 | ||||
| -rw-r--r-- | src/core/listener.c | 43 | ||||
| -rw-r--r-- | src/core/socket.c | 153 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 38 |
4 files changed, 104 insertions, 188 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index a9674226..4c3e563d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -225,15 +225,20 @@ nni_dialer_bump_error(nni_dialer *d, int err) #endif } +// nni_dialer_create creates a dialer on the socket. +// The caller should have a hold on the socket, and on success +// the dialer inherits the callers hold. (If the caller wants +// an additional hold, it should get an extra hold before calling this +// function.) int -nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) +nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) { - nni_sp_tran * tran; - nni_dialer *d; - int rv; - nni_url * url; + nni_sp_tran *tran; + nni_dialer *d; + int rv; + nni_url *url; - if ((rv = nni_url_parse(&url, urlstr)) != 0) { + if ((rv = nni_url_parse(&url, url_str)) != 0) { return (rv); } if (((tran = nni_sp_tran_find(url)) == NULL) || @@ -246,13 +251,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - d->d_url = url; - d->d_closed = false; - d->d_closing = false; - d->d_data = NULL; - d->d_ref = 1; - d->d_sock = s; - d->d_tran = tran; + d->d_url = url; + d->d_closed = false; + d->d_data = NULL; + d->d_ref = 1; + d->d_sock = s; + d->d_tran = tran; nni_atomic_flag_reset(&d->d_started); // Make a copy of the endpoint operations. This allows us to @@ -342,22 +346,6 @@ nni_dialer_rele(nni_dialer *d) } void -nni_dialer_close_rele(nni_dialer *d) -{ - nni_mtx_lock(&dialers_lk); - if (d->d_closed) { - nni_mtx_unlock(&dialers_lk); - nni_dialer_rele(d); - return; - } - d->d_closed = true; - nni_id_remove(&dialers, d->d_id); - nni_mtx_unlock(&dialers_lk); - - nni_dialer_rele(d); -} - -void nni_dialer_close(nni_dialer *d) { nni_mtx_lock(&dialers_lk); @@ -389,8 +377,8 @@ static void dialer_connect_cb(void *arg) { nni_dialer *d = arg; - nni_aio * aio = &d->d_con_aio; - nni_aio * user_aio; + nni_aio *aio = &d->d_con_aio; + nni_aio *user_aio; int rv; nni_mtx_lock(&d->d_mtx); @@ -465,6 +453,14 @@ nni_dialer_start(nni_dialer *d, unsigned flags) return (rv); } +void +nni_dialer_stop(nni_dialer *d) +{ + nni_aio_stop(&d->d_tmo_aio); + nni_aio_stop(&d->d_con_aio); + d->d_ops.d_close(d->d_data); +} + nni_sock * nni_dialer_sock(nni_dialer *d) { diff --git a/src/core/listener.c b/src/core/listener.c index c2a5863d..410988f6 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -50,9 +50,7 @@ nni_listener_id(nni_listener *l) void nni_listener_destroy(nni_listener *l) { - nni_aio_stop(&l->l_acc_aio); - nni_aio_stop(&l->l_tmo_aio); - + // NB: both these will have already been stopped. nni_aio_fini(&l->l_acc_aio); nni_aio_fini(&l->l_tmo_aio); @@ -216,13 +214,18 @@ nni_listener_bump_error(nni_listener *l, int err) #endif } +// nni_listener_create creates a listener on the socket. +// The caller should have a hold on the socket, and on success +// the listener inherits the callers hold. (If the caller wants +// an additional hold, it should get an extra hold before calling this +// function.) int nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) { - nni_sp_tran * tran; + nni_sp_tran *tran; nni_listener *l; int rv; - nni_url * url; + nni_url *url; if ((rv = nni_url_parse(&url, url_str)) != 0) { return (rv); @@ -239,7 +242,6 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) } l->l_url = url; l->l_closed = false; - l->l_closing = false; l->l_data = NULL; l->l_ref = 1; l->l_sock = s; @@ -344,24 +346,7 @@ nni_listener_close(nni_listener *l) nni_listener_shutdown(l); - nni_listener_rele(l); // This will trigger a reap if id count is zero. -} - -void -nni_listener_close_rele(nni_listener *l) -{ - // Listener should already be shutdown. The socket lock may be held. - nni_mtx_lock(&listeners_lk); - if (l->l_closed) { - nni_mtx_unlock(&listeners_lk); - nni_listener_rele(l); - return; - } - l->l_closed = true; - nni_id_remove(&listeners, l->l_id); - nni_mtx_unlock(&listeners_lk); - - nni_listener_rele(l); // This will trigger a reap if id count is zero. + nni_listener_rele(l); // This will reap if reference count is zero. } static void @@ -378,7 +363,7 @@ static void listener_accept_cb(void *arg) { nni_listener *l = arg; - nni_aio * aio = &l->l_acc_aio; + nni_aio *aio = &l->l_acc_aio; int rv; switch ((rv = nni_aio_result(aio))) { @@ -440,6 +425,14 @@ nni_listener_start(nni_listener *l, int flags) return (0); } +void +nni_listener_stop(nni_listener *l) +{ + nni_aio_stop(&l->l_tmo_aio); + nni_aio_stop(&l->l_acc_aio); + l->l_ops.l_close(l->l_data); +} + nni_sock * nni_listener_sock(nni_listener *l) { diff --git a/src/core/socket.c b/src/core/socket.c index e170289d..e8b1211f 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -109,9 +109,6 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); -static void dialer_shutdown_locked(nni_dialer *); -static void listener_shutdown_locked(nni_listener *); - #define SOCK(s) ((nni_sock *) (s)) static int @@ -693,15 +690,21 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = true; - // Close the EPs. This prevents new connections from forming - // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_listeners, l) { - listener_shutdown_locked(l); - } - NNI_LIST_FOREACH (&sock->s_dialers, d) { - dialer_shutdown_locked(d); + while ((l = nni_list_first(&sock->s_listeners)) != NULL) { + nni_listener_hold(l); + nni_list_node_remove(&l->l_node); + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); } + while ((d = nni_list_first(&sock->s_dialers)) != NULL) { + nni_dialer_hold(d); + nni_list_node_remove(&d->d_node); + nni_mtx_unlock(&sock->s_mx); + nni_dialer_close(d); + nni_mtx_lock(&sock->s_mx); + } nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -738,41 +741,21 @@ nni_sock_shutdown(nni_sock *sock) // At this point, we've done everything we politely can to // give the protocol a chance to flush its write side. Now - // its time to be a little more insistent. + // it is time to be a little more insistent. // Close the upper queues immediately. This can happen // safely while we hold the lock. nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the dialers and listeners, attempting to close them. - // We might already have a close in progress, in which case - // we skip past it; it will be removed from another thread. - NNI_LIST_FOREACH (&sock->s_listeners, l) { - if (nni_listener_hold(l) == 0) { - nni_listener_close_rele(l); - } - } - NNI_LIST_FOREACH (&sock->s_dialers, d) { - if (nni_dialer_hold(d) == 0) { - nni_dialer_close_rele(d); - } - } - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. However, it is possible for - // a pipe to have been added by an endpoint due to racing conditions - // in the shutdown. Therefore it is important that we shutdown pipes - // *last*. + // expect there not to be any here. NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_close(pipe); } - // We have to wait for *both* endpoints and pipes to be - // removed. - while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_listeners)) || - (!nni_list_empty(&sock->s_dialers))) { + // We have to wait for pipes to be removed. + while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } @@ -1453,11 +1436,7 @@ static void dialer_timer_start_locked(nni_dialer *d) { nni_duration back_off; - nni_sock * sock = d->d_sock; - if (d->d_closing || sock->s_closed) { - return; - } back_off = d->d_currtime; if (d->d_maxrtime > 0) { d->d_currtime *= 2; @@ -1494,11 +1473,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_mtx_lock(&s->s_mx); - if (s->s_closed || d->d_closing) { - d->d_tran->tran_pipe->p_fini(tpipe); - nni_mtx_unlock(&s->s_mx); - return; - } if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { nni_mtx_unlock(&s->s_mx); return; @@ -1544,38 +1518,23 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) nni_pipe_rele(p); } -static void -dialer_shutdown_impl(nni_dialer *d) +void +nni_dialer_shutdown(nni_dialer *d) { + nni_sock *s = d->d_sock; nni_pipe *p; - // Abort any remaining in-flight operations. - nni_aio_close(&d->d_con_aio); - nni_aio_close(&d->d_tmo_aio); + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } - // Stop the underlying transport. - d->d_ops.d_close(d->d_data); + nni_dialer_stop(d); + nni_mtx_lock(&s->s_mx); NNI_LIST_FOREACH (&d->d_pipes, p) { nni_pipe_close(p); } -} - -static void -dialer_shutdown_locked(nni_dialer *d) -{ - if (!d->d_closing) { - d->d_closing = true; - dialer_shutdown_impl(d); - } -} - -void -nni_dialer_shutdown(nni_dialer *d) -{ - nni_sock *s = d->d_sock; - nni_mtx_lock(&s->s_mx); - dialer_shutdown_locked(d); + nni_list_node_remove(&d->d_node); nni_mtx_unlock(&s->s_mx); } @@ -1592,9 +1551,6 @@ dialer_reap(void *arg) nni_dialer *d = arg; nni_sock * s = d->d_sock; - nni_aio_stop(&d->d_tmo_aio); - nni_aio_stop(&d->d_con_aio); - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&d->st_root); #endif @@ -1612,13 +1568,12 @@ dialer_reap(void *arg) return; } - nni_list_remove(&s->s_dialers, d); - if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { - nni_cv_wake(&s->s_cv); - } + nni_list_node_remove(&d->d_node); nni_mtx_unlock(&s->s_mx); + nni_sock_rele(s); + nni_dialer_destroy(d); } @@ -1635,12 +1590,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (s->s_closed || l->l_closing) { - l->l_tran->tran_pipe->p_fini(tpipe); - nni_mtx_unlock(&s->s_mx); - return; - } - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { nni_mtx_unlock(&s->s_mx); return; @@ -1684,39 +1633,22 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe_rele(p); } -static void -listener_shutdown_impl(nni_listener *l) +void +nni_listener_shutdown(nni_listener *l) { + nni_sock *s = l->l_sock; nni_pipe *p; - // Abort any remaining in-flight accepts. - nni_aio_close(&l->l_acc_aio); - nni_aio_close(&l->l_tmo_aio); + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } - // Stop the underlying transport. - l->l_ops.l_close(l->l_data); + nni_listener_stop(l); + nni_mtx_lock(&s->s_mx); NNI_LIST_FOREACH (&l->l_pipes, p) { nni_pipe_close(p); } -} - -static void -listener_shutdown_locked(nni_listener *l) -{ - if (!l->l_closing) { - l->l_closing = true; - listener_shutdown_impl(l); - } -} - -void -nni_listener_shutdown(nni_listener *l) -{ - nni_sock *s = l->l_sock; - - nni_mtx_lock(&s->s_mx); - listener_shutdown_locked(l); nni_mtx_unlock(&s->s_mx); } @@ -1733,9 +1665,6 @@ listener_reap(void *arg) nni_listener *l = arg; nni_sock * s = l->l_sock; - nni_aio_stop(&l->l_tmo_aio); - nni_aio_stop(&l->l_acc_aio); - #ifdef NNG_ENABLE_STATS nni_stat_unregister(&l->st_root); #endif @@ -1753,13 +1682,11 @@ listener_reap(void *arg) return; } - nni_list_remove(&s->s_listeners, l); - if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { - nni_cv_wake(&s->s_cv); - } - + nni_list_node_remove(&l->l_node); nni_mtx_unlock(&s->s_mx); + nni_sock_rele(s); + nni_listener_destroy(l); } diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 22e956b0..850a4d80 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -17,20 +17,20 @@ struct nni_dialer { nni_sp_dialer_ops d_ops; // transport ops - nni_sp_tran * d_tran; // transport pointer - void * d_data; // transport private + nni_sp_tran *d_tran; // transport pointer + void *d_data; // transport private uint32_t d_id; // endpoint id nni_list_node d_node; // per socket list - nni_sock * d_sock; - nni_url * d_url; - nni_pipe * d_pipe; // active pipe (for re-dialer) + nni_sock *d_sock; + nni_url *d_url; + nni_pipe *d_pipe; // active pipe (for re-dialer) int d_ref; bool d_closed; // full shutdown - bool d_closing; + nni_atomic_flag d_closing; nni_atomic_flag d_started; nni_mtx d_mtx; nni_list d_pipes; - nni_aio * d_user_aio; + nni_aio *d_user_aio; nni_aio d_con_aio; nni_aio d_tmo_aio; // backoff timer nni_duration d_maxrtime; // maximum time for reconnect @@ -59,15 +59,15 @@ struct nni_dialer { struct nni_listener { nni_sp_listener_ops l_ops; // transport ops - nni_sp_tran * l_tran; // transport pointer - void * l_data; // transport private + nni_sp_tran *l_tran; // transport pointer + void *l_data; // transport private uint32_t l_id; // endpoint id nni_list_node l_node; // per socket list - nni_sock * l_sock; - nni_url * l_url; + nni_sock *l_sock; + nni_url *l_url; int l_ref; bool l_closed; // full shutdown - bool l_closing; // close started (shutdown) + nni_atomic_flag l_closing; // close started (shutdown) nni_atomic_flag l_started; nni_list l_pipes; nni_aio l_acc_aio; @@ -97,13 +97,13 @@ struct nni_pipe { nni_sp_pipe_ops p_tran_ops; nni_proto_pipe_ops p_proto_ops; size_t p_size; - void * p_tran_data; - void * p_proto_data; + void *p_tran_data; + void *p_proto_data; nni_list_node p_sock_node; nni_list_node p_ep_node; - nni_sock * p_sock; - nni_dialer * p_dialer; - nni_listener * p_listener; + nni_sock *p_sock; + nni_dialer *p_dialer; + nni_listener *p_listener; bool p_closed; nni_atomic_flag p_stop; bool p_cbs; @@ -132,13 +132,13 @@ extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); extern void nni_dialer_timer_start(nni_dialer *); -extern void nni_dialer_close_rele(nni_dialer *); +extern void nni_dialer_stop(nni_dialer *); extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *); -extern void nni_listener_close_rele(nni_listener *); +extern void nni_listener_stop(nni_listener *); extern void nni_pipe_remove(nni_pipe *); extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); |
