aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c58
-rw-r--r--src/core/listener.c43
-rw-r--r--src/core/socket.c153
-rw-r--r--src/core/sockimpl.h38
-rw-r--r--src/nng.c8
5 files changed, 105 insertions, 195 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index a9674226..4c3e563d 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -225,15 +225,20 @@ nni_dialer_bump_error(nni_dialer *d, int err)
#endif
}
+// nni_dialer_create creates a dialer on the socket.
+// The caller should have a hold on the socket, and on success
+// the dialer inherits the callers hold. (If the caller wants
+// an additional hold, it should get an extra hold before calling this
+// function.)
int
-nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
+nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
{
- nni_sp_tran * tran;
- nni_dialer *d;
- int rv;
- nni_url * url;
+ nni_sp_tran *tran;
+ nni_dialer *d;
+ int rv;
+ nni_url *url;
- if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ if ((rv = nni_url_parse(&url, url_str)) != 0) {
return (rv);
}
if (((tran = nni_sp_tran_find(url)) == NULL) ||
@@ -246,13 +251,12 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- d->d_url = url;
- d->d_closed = false;
- d->d_closing = false;
- d->d_data = NULL;
- d->d_ref = 1;
- d->d_sock = s;
- d->d_tran = tran;
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_data = NULL;
+ d->d_ref = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
nni_atomic_flag_reset(&d->d_started);
// Make a copy of the endpoint operations. This allows us to
@@ -342,22 +346,6 @@ nni_dialer_rele(nni_dialer *d)
}
void
-nni_dialer_close_rele(nni_dialer *d)
-{
- nni_mtx_lock(&dialers_lk);
- if (d->d_closed) {
- nni_mtx_unlock(&dialers_lk);
- nni_dialer_rele(d);
- return;
- }
- d->d_closed = true;
- nni_id_remove(&dialers, d->d_id);
- nni_mtx_unlock(&dialers_lk);
-
- nni_dialer_rele(d);
-}
-
-void
nni_dialer_close(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
@@ -389,8 +377,8 @@ static void
dialer_connect_cb(void *arg)
{
nni_dialer *d = arg;
- nni_aio * aio = &d->d_con_aio;
- nni_aio * user_aio;
+ nni_aio *aio = &d->d_con_aio;
+ nni_aio *user_aio;
int rv;
nni_mtx_lock(&d->d_mtx);
@@ -465,6 +453,14 @@ nni_dialer_start(nni_dialer *d, unsigned flags)
return (rv);
}
+void
+nni_dialer_stop(nni_dialer *d)
+{
+ nni_aio_stop(&d->d_tmo_aio);
+ nni_aio_stop(&d->d_con_aio);
+ d->d_ops.d_close(d->d_data);
+}
+
nni_sock *
nni_dialer_sock(nni_dialer *d)
{
diff --git a/src/core/listener.c b/src/core/listener.c
index c2a5863d..410988f6 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -50,9 +50,7 @@ nni_listener_id(nni_listener *l)
void
nni_listener_destroy(nni_listener *l)
{
- nni_aio_stop(&l->l_acc_aio);
- nni_aio_stop(&l->l_tmo_aio);
-
+ // NB: both these will have already been stopped.
nni_aio_fini(&l->l_acc_aio);
nni_aio_fini(&l->l_tmo_aio);
@@ -216,13 +214,18 @@ nni_listener_bump_error(nni_listener *l, int err)
#endif
}
+// nni_listener_create creates a listener on the socket.
+// The caller should have a hold on the socket, and on success
+// the listener inherits the callers hold. (If the caller wants
+// an additional hold, it should get an extra hold before calling this
+// function.)
int
nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
{
- nni_sp_tran * tran;
+ nni_sp_tran *tran;
nni_listener *l;
int rv;
- nni_url * url;
+ nni_url *url;
if ((rv = nni_url_parse(&url, url_str)) != 0) {
return (rv);
@@ -239,7 +242,6 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
}
l->l_url = url;
l->l_closed = false;
- l->l_closing = false;
l->l_data = NULL;
l->l_ref = 1;
l->l_sock = s;
@@ -344,24 +346,7 @@ nni_listener_close(nni_listener *l)
nni_listener_shutdown(l);
- nni_listener_rele(l); // This will trigger a reap if id count is zero.
-}
-
-void
-nni_listener_close_rele(nni_listener *l)
-{
- // Listener should already be shutdown. The socket lock may be held.
- nni_mtx_lock(&listeners_lk);
- if (l->l_closed) {
- nni_mtx_unlock(&listeners_lk);
- nni_listener_rele(l);
- return;
- }
- l->l_closed = true;
- nni_id_remove(&listeners, l->l_id);
- nni_mtx_unlock(&listeners_lk);
-
- nni_listener_rele(l); // This will trigger a reap if id count is zero.
+ nni_listener_rele(l); // This will reap if reference count is zero.
}
static void
@@ -378,7 +363,7 @@ static void
listener_accept_cb(void *arg)
{
nni_listener *l = arg;
- nni_aio * aio = &l->l_acc_aio;
+ nni_aio *aio = &l->l_acc_aio;
int rv;
switch ((rv = nni_aio_result(aio))) {
@@ -440,6 +425,14 @@ nni_listener_start(nni_listener *l, int flags)
return (0);
}
+void
+nni_listener_stop(nni_listener *l)
+{
+ nni_aio_stop(&l->l_tmo_aio);
+ nni_aio_stop(&l->l_acc_aio);
+ l->l_ops.l_close(l->l_data);
+}
+
nni_sock *
nni_listener_sock(nni_listener *l)
{
diff --git a/src/core/socket.c b/src/core/socket.c
index e170289d..e8b1211f 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -109,9 +109,6 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
-static void dialer_shutdown_locked(nni_dialer *);
-static void listener_shutdown_locked(nni_listener *);
-
#define SOCK(s) ((nni_sock *) (s))
static int
@@ -693,15 +690,21 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = true;
- // Close the EPs. This prevents new connections from forming
- // but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_listeners, l) {
- listener_shutdown_locked(l);
- }
- NNI_LIST_FOREACH (&sock->s_dialers, d) {
- dialer_shutdown_locked(d);
+ while ((l = nni_list_first(&sock->s_listeners)) != NULL) {
+ nni_listener_hold(l);
+ nni_list_node_remove(&l->l_node);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
}
+ while ((d = nni_list_first(&sock->s_dialers)) != NULL) {
+ nni_dialer_hold(d);
+ nni_list_node_remove(&d->d_node);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_dialer_close(d);
+ nni_mtx_lock(&sock->s_mx);
+ }
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -738,41 +741,21 @@ nni_sock_shutdown(nni_sock *sock)
// At this point, we've done everything we politely can to
// give the protocol a chance to flush its write side. Now
- // its time to be a little more insistent.
+ // it is time to be a little more insistent.
// Close the upper queues immediately. This can happen
// safely while we hold the lock.
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the dialers and listeners, attempting to close them.
- // We might already have a close in progress, in which case
- // we skip past it; it will be removed from another thread.
- NNI_LIST_FOREACH (&sock->s_listeners, l) {
- if (nni_listener_hold(l) == 0) {
- nni_listener_close_rele(l);
- }
- }
- NNI_LIST_FOREACH (&sock->s_dialers, d) {
- if (nni_dialer_hold(d) == 0) {
- nni_dialer_close_rele(d);
- }
- }
-
// For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here. However, it is possible for
- // a pipe to have been added by an endpoint due to racing conditions
- // in the shutdown. Therefore it is important that we shutdown pipes
- // *last*.
+ // expect there not to be any here.
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_close(pipe);
}
- // We have to wait for *both* endpoints and pipes to be
- // removed.
- while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_listeners)) ||
- (!nni_list_empty(&sock->s_dialers))) {
+ // We have to wait for pipes to be removed.
+ while (!nni_list_empty(&sock->s_pipes)) {
nni_cv_wait(&sock->s_cv);
}
@@ -1453,11 +1436,7 @@ static void
dialer_timer_start_locked(nni_dialer *d)
{
nni_duration back_off;
- nni_sock * sock = d->d_sock;
- if (d->d_closing || sock->s_closed) {
- return;
- }
back_off = d->d_currtime;
if (d->d_maxrtime > 0) {
d->d_currtime *= 2;
@@ -1494,11 +1473,6 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || d->d_closing) {
- d->d_tran->tran_pipe->p_fini(tpipe);
- nni_mtx_unlock(&s->s_mx);
- return;
- }
if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;
@@ -1544,38 +1518,23 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_pipe_rele(p);
}
-static void
-dialer_shutdown_impl(nni_dialer *d)
+void
+nni_dialer_shutdown(nni_dialer *d)
{
+ nni_sock *s = d->d_sock;
nni_pipe *p;
- // Abort any remaining in-flight operations.
- nni_aio_close(&d->d_con_aio);
- nni_aio_close(&d->d_tmo_aio);
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
- // Stop the underlying transport.
- d->d_ops.d_close(d->d_data);
+ nni_dialer_stop(d);
+ nni_mtx_lock(&s->s_mx);
NNI_LIST_FOREACH (&d->d_pipes, p) {
nni_pipe_close(p);
}
-}
-
-static void
-dialer_shutdown_locked(nni_dialer *d)
-{
- if (!d->d_closing) {
- d->d_closing = true;
- dialer_shutdown_impl(d);
- }
-}
-
-void
-nni_dialer_shutdown(nni_dialer *d)
-{
- nni_sock *s = d->d_sock;
- nni_mtx_lock(&s->s_mx);
- dialer_shutdown_locked(d);
+ nni_list_node_remove(&d->d_node);
nni_mtx_unlock(&s->s_mx);
}
@@ -1592,9 +1551,6 @@ dialer_reap(void *arg)
nni_dialer *d = arg;
nni_sock * s = d->d_sock;
- nni_aio_stop(&d->d_tmo_aio);
- nni_aio_stop(&d->d_con_aio);
-
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&d->st_root);
#endif
@@ -1612,13 +1568,12 @@ dialer_reap(void *arg)
return;
}
- nni_list_remove(&s->s_dialers, d);
- if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
- nni_cv_wake(&s->s_cv);
- }
+ nni_list_node_remove(&d->d_node);
nni_mtx_unlock(&s->s_mx);
+ nni_sock_rele(s);
+
nni_dialer_destroy(d);
}
@@ -1635,12 +1590,6 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe *p;
nni_mtx_lock(&s->s_mx);
- if (s->s_closed || l->l_closing) {
- l->l_tran->tran_pipe->p_fini(tpipe);
- nni_mtx_unlock(&s->s_mx);
- return;
- }
-
if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
nni_mtx_unlock(&s->s_mx);
return;
@@ -1684,39 +1633,22 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_pipe_rele(p);
}
-static void
-listener_shutdown_impl(nni_listener *l)
+void
+nni_listener_shutdown(nni_listener *l)
{
+ nni_sock *s = l->l_sock;
nni_pipe *p;
- // Abort any remaining in-flight accepts.
- nni_aio_close(&l->l_acc_aio);
- nni_aio_close(&l->l_tmo_aio);
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
- // Stop the underlying transport.
- l->l_ops.l_close(l->l_data);
+ nni_listener_stop(l);
+ nni_mtx_lock(&s->s_mx);
NNI_LIST_FOREACH (&l->l_pipes, p) {
nni_pipe_close(p);
}
-}
-
-static void
-listener_shutdown_locked(nni_listener *l)
-{
- if (!l->l_closing) {
- l->l_closing = true;
- listener_shutdown_impl(l);
- }
-}
-
-void
-nni_listener_shutdown(nni_listener *l)
-{
- nni_sock *s = l->l_sock;
-
- nni_mtx_lock(&s->s_mx);
- listener_shutdown_locked(l);
nni_mtx_unlock(&s->s_mx);
}
@@ -1733,9 +1665,6 @@ listener_reap(void *arg)
nni_listener *l = arg;
nni_sock * s = l->l_sock;
- nni_aio_stop(&l->l_tmo_aio);
- nni_aio_stop(&l->l_acc_aio);
-
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&l->st_root);
#endif
@@ -1753,13 +1682,11 @@ listener_reap(void *arg)
return;
}
- nni_list_remove(&s->s_listeners, l);
- if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
- nni_cv_wake(&s->s_cv);
- }
-
+ nni_list_node_remove(&l->l_node);
nni_mtx_unlock(&s->s_mx);
+ nni_sock_rele(s);
+
nni_listener_destroy(l);
}
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 22e956b0..850a4d80 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -17,20 +17,20 @@
struct nni_dialer {
nni_sp_dialer_ops d_ops; // transport ops
- nni_sp_tran * d_tran; // transport pointer
- void * d_data; // transport private
+ nni_sp_tran *d_tran; // transport pointer
+ void *d_data; // transport private
uint32_t d_id; // endpoint id
nni_list_node d_node; // per socket list
- nni_sock * d_sock;
- nni_url * d_url;
- nni_pipe * d_pipe; // active pipe (for re-dialer)
+ nni_sock *d_sock;
+ nni_url *d_url;
+ nni_pipe *d_pipe; // active pipe (for re-dialer)
int d_ref;
bool d_closed; // full shutdown
- bool d_closing;
+ nni_atomic_flag d_closing;
nni_atomic_flag d_started;
nni_mtx d_mtx;
nni_list d_pipes;
- nni_aio * d_user_aio;
+ nni_aio *d_user_aio;
nni_aio d_con_aio;
nni_aio d_tmo_aio; // backoff timer
nni_duration d_maxrtime; // maximum time for reconnect
@@ -59,15 +59,15 @@ struct nni_dialer {
struct nni_listener {
nni_sp_listener_ops l_ops; // transport ops
- nni_sp_tran * l_tran; // transport pointer
- void * l_data; // transport private
+ nni_sp_tran *l_tran; // transport pointer
+ void *l_data; // transport private
uint32_t l_id; // endpoint id
nni_list_node l_node; // per socket list
- nni_sock * l_sock;
- nni_url * l_url;
+ nni_sock *l_sock;
+ nni_url *l_url;
int l_ref;
bool l_closed; // full shutdown
- bool l_closing; // close started (shutdown)
+ nni_atomic_flag l_closing; // close started (shutdown)
nni_atomic_flag l_started;
nni_list l_pipes;
nni_aio l_acc_aio;
@@ -97,13 +97,13 @@ struct nni_pipe {
nni_sp_pipe_ops p_tran_ops;
nni_proto_pipe_ops p_proto_ops;
size_t p_size;
- void * p_tran_data;
- void * p_proto_data;
+ void *p_tran_data;
+ void *p_proto_data;
nni_list_node p_sock_node;
nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_dialer * p_dialer;
- nni_listener * p_listener;
+ nni_sock *p_sock;
+ nni_dialer *p_dialer;
+ nni_listener *p_listener;
bool p_closed;
nni_atomic_flag p_stop;
bool p_cbs;
@@ -132,13 +132,13 @@ extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
-extern void nni_dialer_close_rele(nni_dialer *);
+extern void nni_dialer_stop(nni_dialer *);
extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
-extern void nni_listener_close_rele(nni_listener *);
+extern void nni_listener_stop(nni_listener *);
extern void nni_pipe_remove(nni_pipe *);
extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
diff --git a/src/nng.c b/src/nng.c
index 1ccc1386..b1ebbd11 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -477,7 +477,6 @@ nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
}
if ((rv = nni_dialer_start(d, flags)) != 0) {
nni_dialer_close(d);
- nni_sock_rele(s);
return (rv);
}
if (dp != NULL) {
@@ -486,7 +485,6 @@ nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
*dp = did;
}
nni_dialer_rele(d);
- nni_sock_rele(s);
return (0);
}
@@ -506,7 +504,6 @@ nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
}
if ((rv = nni_listener_start(l, flags)) != 0) {
nni_listener_close(l);
- nni_sock_rele(s);
return (rv);
}
@@ -516,7 +513,6 @@ nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
*lp = lid;
}
nni_listener_rele(l);
- nni_sock_rele(s);
return (rv);
}
@@ -538,7 +534,6 @@ nng_listener_create(nng_listener *lp, nng_socket sid, const char *addr)
lid.id = nni_listener_id(l);
*lp = lid;
nni_listener_rele(l);
- nni_sock_rele(s);
return (0);
}
@@ -580,7 +575,6 @@ nng_dialer_create(nng_dialer *dp, nng_socket sid, const char *addr)
did.id = nni_dialer_id(d);
*dp = did;
nni_dialer_rele(d);
- nni_sock_rele(s);
return (0);
}