summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-06 14:42:53 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-06 18:57:29 -0700
commit953ca274ae57f8edd12536a3dd15d134aa6e5576 (patch)
tree7a0e889fbae7b525befefedcb5cb8f10820e7a47 /src/core/socket.c
parent89cba92d13fbc5e059336fd054be30e50d8a2621 (diff)
downloadnng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.gz
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.bz2
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.zip
fixes #568 Want a single reader/write lock on socket child objects
fixes #170 Make more use of reaper This is a complete restructure/rethink of how child objects interact with the socket. (This also backs out #576 as it turns out not to be needed.) While 568 says reader/writer lock, for now we have settled for a single writer lock. Its likely that this is sufficient. Essentially we use the single socket lock to guard lists of the socket children. We also use deferred deletion in the idhash to facilitate teardown, which means endpoint closes are no longer synchronous. We use the reaper to clean up objects when the reference count drops to zero. We make a special exception for pipes, since they really are not reference counted by their parents, and they are leaf objects anyway. We believe this addresses the main outstanding race conditions in a much more correct and holistic way. Note that endpoint shutdown is a little tricky, as it makes use of atomic flags to guard against double entry, and against recursive lock entry. This is something that would be nice to make a bit more obvious, but what we have is safe, and the complexity is at least confined to one place.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c369
1 files changed, 284 insertions, 85 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 620c5d19..640e0db6 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <string.h>
@@ -58,7 +59,7 @@ struct nni_socket {
nni_cv s_cv;
nni_cv s_close_cv;
- uint64_t s_id;
+ uint32_t s_id;
uint32_t s_flags;
unsigned s_refcnt; // protected by global lock
void * s_data; // Protocol private
@@ -97,6 +98,9 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
+static void dialer_shutdown_locked(nni_dialer *);
+static void listener_shutdown_locked(nni_listener *);
+
static int
sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt)
uint32_t
nni_sock_id(nni_sock *s)
{
- return ((uint32_t) s->s_id);
+ return (s->s_id);
}
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
@@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s)
return (rv);
}
-void
-nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id)
-{
- if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
- nng_pipe_cb cb;
- void * arg;
-
- nni_mtx_lock(&s->s_pipe_cbs_mtx);
- cb = s->s_pipe_cbs[ev].cb_fn;
- arg = s->s_pipe_cbs[ev].cb_arg;
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
-
- if (cb != NULL) {
- nng_pipe p;
- p.id = id;
- cb(p, ev, arg);
- }
- }
-}
-
-int
-nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
-{
- // Initialize protocol pipe data.
- nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- nni_mtx_unlock(&s->s_mx);
- return (NNG_ECLOSED);
- }
-
- nni_list_append(&s->s_pipes, p);
-
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
-
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-
-void
-nni_sock_pipe_remove(nni_sock *s, nni_pipe *p)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_pipes, p)) {
- nni_list_remove(&s->s_pipes, p);
- }
- if (s->s_closing && nni_list_empty(&s->s_pipes)) {
- nni_cv_wake(&s->s_cv);
- }
- nni_mtx_unlock(&s->s_mx);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
-
- nni_pipe_sock_list_init(&s->s_pipes);
- nni_listener_list_init(&s->s_listeners);
- nni_dialer_list_init(&s->s_dialers);
+ NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);
+ NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);
+ NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
}
nni_mtx_lock(&sock_lk);
- if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) {
+ if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) {
sock_destroy(s);
} else {
nni_list_append(&sock_list, s);
@@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
nni_mtx_unlock(&sock_lk);
// Set the sockname.
- (void) snprintf(
- s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
+ (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
return (rv);
}
@@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock)
{
nni_pipe * pipe;
nni_dialer * d;
- nni_dialer * nd;
nni_listener *l;
- nni_listener *nl;
nni_ctx * ctx;
nni_ctx * nctx;
@@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_listeners, l) {
- nni_listener_shutdown(l);
+ listener_shutdown_locked(l);
}
NNI_LIST_FOREACH (&sock->s_dialers, d) {
- nni_dialer_shutdown(d);
+ dialer_shutdown_locked(d);
}
nni_mtx_unlock(&sock->s_mx);
@@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
// Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nl = nni_list_first(&sock->s_listeners);
- while ((l = nl) != NULL) {
- nl = nni_list_next(&sock->s_listeners, nl);
-
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
if (nni_listener_hold(l) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_listener_close(l);
- nni_mtx_lock(&sock->s_mx);
}
}
- nd = nni_list_first(&sock->s_dialers);
- while ((d = nd) != NULL) {
- nd = nni_list_next(&sock->s_dialers, nd);
-
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
if (nni_dialer_hold(d) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_dialer_close(d);
- nni_mtx_lock(&sock->s_mx);
}
}
- // For each pipe, arrange for it to teardown hard.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
@@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
{
nni_ctx *ctx;
int rv;
- uint64_t id;
if (sock->s_ctx_ops.ctx_init == NULL) {
return (NNG_ENOTSUP);
@@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
NNI_FREE_STRUCT(ctx);
return (NNG_ECLOSED);
}
- if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) {
+ if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) {
nni_mtx_unlock(&sock_lk);
NNI_FREE_STRUCT(ctx);
return (rv);
}
- ctx->c_id = (uint32_t) id;
if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
nni_idhash_remove(ctx_hash, ctx->c_id);
@@ -1415,3 +1352,265 @@ nni_ctx_setopt(
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+
+static void
+dialer_timer_start_locked(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+void
+nni_dialer_timer_start(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ nni_mtx_lock(&s->s_mx);
+ dialer_timer_start_locked(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_mtx_lock(&s->s_mx);
+
+ if (s->s_closed || d->d_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+
+ p->p_dialer = d;
+ nni_list_append(&d->d_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+dialer_shutdown_impl(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+dialer_shutdown_locked(nni_dialer *d)
+{
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ dialer_shutdown_impl(d);
+}
+
+void
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ nni_mtx_lock(&s->s_mx);
+ dialer_shutdown_impl(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_reap(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_aio_stop(d->d_tmo_aio);
+ nni_aio_stop(d->d_con_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&d->d_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
+ return;
+ }
+
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_dialer_destroy(d);
+}
+
+void
+nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closed || l->l_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+ p->p_listener = l;
+ nni_list_append(&l->l_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+listener_shutdown_impl(nni_listener *l)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight accepts.
+ nni_aio_close(l->l_acc_aio);
+ nni_aio_close(l->l_tmo_aio);
+
+ // Stop the underlying transport.
+ l->l_ops.l_close(l->l_data);
+
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+listener_shutdown_locked(nni_listener *l)
+{
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+ listener_shutdown_impl(l);
+}
+
+void
+nni_listener_shutdown(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ listener_shutdown_impl(l);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_listener_reap(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_aio_stop(l->l_tmo_aio);
+ nni_aio_stop(l->l_acc_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&l->l_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
+ return;
+ }
+
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_listener_destroy(l);
+}
+
+void
+nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
+{
+ nni_sock * s = p->p_sock;
+ nng_pipe_cb cb;
+ void * arg;
+
+ if (!p->p_cbs) {
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ // First event, after this we want all other events.
+ p->p_cbs = true;
+ } else {
+ return;
+ }
+ }
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+
+ if (cb != NULL) {
+ nng_pipe pid;
+ pid.id = p->p_id;
+ cb(pid, ev, arg);
+ }
+}
+
+void
+nni_pipe_remove(nni_pipe *p)
+{
+ nni_sock * s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_node_remove(&p->p_sock_node);
+ nni_list_node_remove(&p->p_ep_node);
+ p->p_listener = NULL;
+ p->p_dialer = NULL;
+ if ((d != NULL) && (d->d_pipe == p)) {
+ d->d_pipe = NULL;
+ dialer_timer_start_locked(d); // Kick the timer to redial.
+ }
+ if (s->s_closing) {
+ nni_cv_wake(&s->s_cv);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}