aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-06 14:42:53 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-06 18:57:29 -0700
commit953ca274ae57f8edd12536a3dd15d134aa6e5576 (patch)
tree7a0e889fbae7b525befefedcb5cb8f10820e7a47 /src/core/dialer.c
parent89cba92d13fbc5e059336fd054be30e50d8a2621 (diff)
downloadnng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.gz
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.bz2
nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.zip
fixes #568 Want a single reader/write lock on socket child objects
fixes #170 Make more use of reaper This is a complete restructure/rethink of how child objects interact with the socket. (This also backs out #576 as it turns out not to be needed.) While 568 says reader/writer lock, for now we have settled for a single writer lock. Its likely that this is sufficient. Essentially we use the single socket lock to guard lists of the socket children. We also use deferred deletion in the idhash to facilitate teardown, which means endpoint closes are no longer synchronous. We use the reaper to clean up objects when the reference count drops to zero. We make a special exception for pipes, since they really are not reference counted by their parents, and they are leaf objects anyway. We believe this addresses the main outstanding race conditions in a much more correct and holistic way. Note that endpoint shutdown is a little tricky, as it makes use of atomic flags to guard against double entry, and against recursive lock entry. This is something that would be nice to make a bit more obvious, but what we have is safe, and the complexity is at least confined to one place.
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c237
1 files changed, 47 insertions, 190 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index e93c893e..3144d673 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -9,36 +9,12 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-struct nni_dialer {
- nni_tran_dialer_ops d_ops; // transport ops
- nni_tran * d_tran; // transport pointer
- void * d_data; // transport private
- uint64_t d_id; // endpoint id
- nni_list_node d_node; // per socket list
- nni_sock * d_sock;
- nni_url * d_url;
- int d_refcnt;
- int d_lastrv; // last result from synchronous
- bool d_synch; // synchronous connect in progress?
- bool d_started;
- bool d_closed; // full shutdown
- nni_atomic_flag d_closing; // close pending (waiting on refcnt)
- nni_mtx d_mtx;
- nni_cv d_cv;
- nni_list d_pipes;
- nni_aio * d_con_aio;
- nni_aio * d_tmo_aio; // backoff timer
- nni_duration d_maxrtime; // maximum time for reconnect
- nni_duration d_currtime; // current time for reconnect
- nni_duration d_inirtime; // initial time for reconnect
- nni_time d_conntime; // time of last good connect
-};
-
// Functionality related to dialers.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
@@ -65,6 +41,7 @@ nni_dialer_sys_init(void)
void
nni_dialer_sys_fini(void)
{
+ nni_reap_drain();
nni_mtx_fini(&dialers_lk);
nni_idhash_fini(dialers);
dialers = NULL;
@@ -73,26 +50,15 @@ nni_dialer_sys_fini(void)
uint32_t
nni_dialer_id(nni_dialer *d)
{
- return ((uint32_t) d->d_id);
+ return (d->d_id);
}
-static void
-dialer_destroy(nni_dialer *d)
+void
+nni_dialer_destroy(nni_dialer *d)
{
- if (d == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (d->d_id != 0) {
- nni_idhash_remove(dialers, d->d_id);
- }
-
nni_aio_stop(d->d_con_aio);
nni_aio_stop(d->d_tmo_aio);
- nni_sock_remove_dialer(d->d_sock, d);
-
nni_aio_fini(d->d_con_aio);
nni_aio_fini(d->d_tmo_aio);
@@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- d->d_url = url;
- d->d_closed = false;
- d->d_started = false;
- d->d_data = NULL;
- d->d_refcnt = 1;
- d->d_sock = s;
- d->d_tran = tran;
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+ nni_atomic_flag_reset(&d->d_started);
nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
@@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
d->d_ops = *tran->tran_dialer;
NNI_LIST_NODE_INIT(&d->d_node);
-
- nni_pipe_ep_list_init(&d->d_pipes);
+ NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
nni_mtx_init(&d->d_mtx);
nni_cv_init(&d->d_cv, &d->d_mtx);
@@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
- ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
- dialer_destroy(d);
+ nni_dialer_destroy(d);
return (rv);
}
@@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
- if (d->d_refcnt == 0) {
- nni_cv_wake(&d->d_cv);
+ if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
}
-int
-nni_dialer_shutdown(nni_dialer *d)
-{
- if (nni_atomic_flag_test_and_set(&d->d_closing)) {
- return (NNG_ECLOSED);
- }
-
- // Abort any remaining in-flight operations.
- nni_aio_close(d->d_con_aio);
- nni_aio_close(d->d_tmo_aio);
-
- // Stop the underlying transport.
- d->d_ops.d_close(d->d_data);
-
- return (0);
-}
-
void
nni_dialer_close(nni_dialer *d)
{
- nni_pipe *p;
-
- nni_mtx_lock(&d->d_mtx);
+ nni_mtx_lock(&dialers_lk);
if (d->d_closed) {
- nni_mtx_unlock(&d->d_mtx);
+ nni_mtx_unlock(&dialers_lk);
nni_dialer_rele(d);
return;
}
d->d_closed = true;
- nni_mtx_unlock(&d->d_mtx);
-
- nni_dialer_shutdown(d);
-
- nni_aio_stop(d->d_con_aio);
- nni_aio_stop(d->d_tmo_aio);
-
- nni_mtx_lock(&d->d_mtx);
- NNI_LIST_FOREACH (&d->d_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
- nni_cv_wait(&d->d_cv);
- }
- nni_mtx_unlock(&d->d_mtx);
-
- dialer_destroy(d);
-}
+ nni_mtx_unlock(&dialers_lk);
-// This function starts an exponential backoff timer for reconnecting.
-static void
-dialer_timer_start(nni_dialer *d)
-{
- nni_duration backoff;
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(dialers, d->d_id);
- backoff = d->d_currtime;
- d->d_currtime *= 2;
- if (d->d_currtime > d->d_maxrtime) {
- d->d_currtime = d->d_maxrtime;
- }
+ nni_dialer_shutdown(d);
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
- nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+ nni_dialer_rele(d);
}
static void
@@ -285,11 +203,9 @@ dialer_timer_cb(void *arg)
nni_dialer *d = arg;
nni_aio * aio = d->d_tmo_aio;
- nni_mtx_lock(&d->d_mtx);
if (nni_aio_result(aio) == 0) {
dialer_connect_start(d);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -304,52 +220,32 @@ dialer_connect_cb(void *arg)
if ((rv = nni_aio_result(aio)) == 0) {
void *data = nni_aio_get_output(aio, 0);
NNI_ASSERT(data != NULL);
- rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
}
nni_mtx_lock(&d->d_mtx);
- synch = d->d_synch;
- d->d_synch = false;
- if (rv == 0) {
- nni_pipe_set_dialer(p, d);
- nni_list_append(&d->d_pipes, p);
-
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- d->d_currtime = d->d_inirtime;
- }
+ synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
-
- nni_mtx_lock(&d->d_mtx);
switch (rv) {
case 0:
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
+ nni_dialer_add_pipe(d, p);
break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
break;
default:
- // redial, but only if we are not synchronous
if (!synch) {
- dialer_timer_start(d);
+ nni_dialer_timer_start(d);
}
break;
}
if (synch) {
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = false;
d->d_lastrv = rv;
nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags)
{
int rv = 0;
- // nni_sock_reconntimes(d->d_sock, &d->d_inirtime,
- //&d->d_maxrtime);
- d->d_currtime = d->d_inirtime;
-
- nni_mtx_lock(&d->d_mtx);
-
- if (d->d_started) {
- nni_mtx_unlock(&d->d_mtx);
+ if (nni_atomic_flag_test_and_set(&d->d_started)) {
return (NNG_ESTATE);
}
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- d->d_started = true;
- dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
+ d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&d->d_mtx);
+ dialer_connect_start(d);
return (0);
}
- d->d_synch = true;
- d->d_started = true;
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = true;
+ nni_mtx_unlock(&d->d_mtx);
+
dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
while (d->d_synch) {
nni_cv_wait(&d->d_cv);
}
rv = d->d_lastrv;
- nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
if (rv != 0) {
- d->d_started = false;
+ nni_atomic_flag_reset(&d->d_started);
}
- nni_mtx_unlock(&d->d_mtx);
return (rv);
}
-void
-nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
-{
- if (d == NULL) {
- return;
- }
-
- // Break up the relationship between the dialer and the pipe.
- nni_mtx_lock(&d->d_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&d->d_pipes, p)) {
- nni_list_remove(&d->d_pipes, p);
- }
- // Wake up the close thread if it is waiting.
- if (d->d_closed) {
- if (nni_list_empty(&d->d_pipes)) {
- nni_cv_wake(&d->d_cv);
- }
- } else {
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- dialer_timer_start(d);
- }
- nni_mtx_unlock(&d->d_mtx);
-}
-
int
nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
nni_opt_type t)
@@ -508,9 +371,3 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
-
-void
-nni_dialer_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_dialer, d_node);
-}