aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c237
1 files changed, 47 insertions, 190 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index e93c893e..3144d673 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -9,36 +9,12 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-struct nni_dialer {
- nni_tran_dialer_ops d_ops; // transport ops
- nni_tran * d_tran; // transport pointer
- void * d_data; // transport private
- uint64_t d_id; // endpoint id
- nni_list_node d_node; // per socket list
- nni_sock * d_sock;
- nni_url * d_url;
- int d_refcnt;
- int d_lastrv; // last result from synchronous
- bool d_synch; // synchronous connect in progress?
- bool d_started;
- bool d_closed; // full shutdown
- nni_atomic_flag d_closing; // close pending (waiting on refcnt)
- nni_mtx d_mtx;
- nni_cv d_cv;
- nni_list d_pipes;
- nni_aio * d_con_aio;
- nni_aio * d_tmo_aio; // backoff timer
- nni_duration d_maxrtime; // maximum time for reconnect
- nni_duration d_currtime; // current time for reconnect
- nni_duration d_inirtime; // initial time for reconnect
- nni_time d_conntime; // time of last good connect
-};
-
// Functionality related to dialers.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
@@ -65,6 +41,7 @@ nni_dialer_sys_init(void)
void
nni_dialer_sys_fini(void)
{
+ nni_reap_drain();
nni_mtx_fini(&dialers_lk);
nni_idhash_fini(dialers);
dialers = NULL;
@@ -73,26 +50,15 @@ nni_dialer_sys_fini(void)
uint32_t
nni_dialer_id(nni_dialer *d)
{
- return ((uint32_t) d->d_id);
+ return (d->d_id);
}
-static void
-dialer_destroy(nni_dialer *d)
+void
+nni_dialer_destroy(nni_dialer *d)
{
- if (d == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (d->d_id != 0) {
- nni_idhash_remove(dialers, d->d_id);
- }
-
nni_aio_stop(d->d_con_aio);
nni_aio_stop(d->d_tmo_aio);
- nni_sock_remove_dialer(d->d_sock, d);
-
nni_aio_fini(d->d_con_aio);
nni_aio_fini(d->d_tmo_aio);
@@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- d->d_url = url;
- d->d_closed = false;
- d->d_started = false;
- d->d_data = NULL;
- d->d_refcnt = 1;
- d->d_sock = s;
- d->d_tran = tran;
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+ nni_atomic_flag_reset(&d->d_started);
nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
@@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
d->d_ops = *tran->tran_dialer;
NNI_LIST_NODE_INIT(&d->d_node);
-
- nni_pipe_ep_list_init(&d->d_pipes);
+ NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
nni_mtx_init(&d->d_mtx);
nni_cv_init(&d->d_cv, &d->d_mtx);
@@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
- ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
- dialer_destroy(d);
+ nni_dialer_destroy(d);
return (rv);
}
@@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
- if (d->d_refcnt == 0) {
- nni_cv_wake(&d->d_cv);
+ if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
}
-int
-nni_dialer_shutdown(nni_dialer *d)
-{
- if (nni_atomic_flag_test_and_set(&d->d_closing)) {
- return (NNG_ECLOSED);
- }
-
- // Abort any remaining in-flight operations.
- nni_aio_close(d->d_con_aio);
- nni_aio_close(d->d_tmo_aio);
-
- // Stop the underlying transport.
- d->d_ops.d_close(d->d_data);
-
- return (0);
-}
-
void
nni_dialer_close(nni_dialer *d)
{
- nni_pipe *p;
-
- nni_mtx_lock(&d->d_mtx);
+ nni_mtx_lock(&dialers_lk);
if (d->d_closed) {
- nni_mtx_unlock(&d->d_mtx);
+ nni_mtx_unlock(&dialers_lk);
nni_dialer_rele(d);
return;
}
d->d_closed = true;
- nni_mtx_unlock(&d->d_mtx);
-
- nni_dialer_shutdown(d);
-
- nni_aio_stop(d->d_con_aio);
- nni_aio_stop(d->d_tmo_aio);
-
- nni_mtx_lock(&d->d_mtx);
- NNI_LIST_FOREACH (&d->d_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
- nni_cv_wait(&d->d_cv);
- }
- nni_mtx_unlock(&d->d_mtx);
-
- dialer_destroy(d);
-}
+ nni_mtx_unlock(&dialers_lk);
-// This function starts an exponential backoff timer for reconnecting.
-static void
-dialer_timer_start(nni_dialer *d)
-{
- nni_duration backoff;
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(dialers, d->d_id);
- backoff = d->d_currtime;
- d->d_currtime *= 2;
- if (d->d_currtime > d->d_maxrtime) {
- d->d_currtime = d->d_maxrtime;
- }
+ nni_dialer_shutdown(d);
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
- nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+ nni_dialer_rele(d);
}
static void
@@ -285,11 +203,9 @@ dialer_timer_cb(void *arg)
nni_dialer *d = arg;
nni_aio * aio = d->d_tmo_aio;
- nni_mtx_lock(&d->d_mtx);
if (nni_aio_result(aio) == 0) {
dialer_connect_start(d);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -304,52 +220,32 @@ dialer_connect_cb(void *arg)
if ((rv = nni_aio_result(aio)) == 0) {
void *data = nni_aio_get_output(aio, 0);
NNI_ASSERT(data != NULL);
- rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
}
nni_mtx_lock(&d->d_mtx);
- synch = d->d_synch;
- d->d_synch = false;
- if (rv == 0) {
- nni_pipe_set_dialer(p, d);
- nni_list_append(&d->d_pipes, p);
-
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- d->d_currtime = d->d_inirtime;
- }
+ synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
-
- nni_mtx_lock(&d->d_mtx);
switch (rv) {
case 0:
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
+ nni_dialer_add_pipe(d, p);
break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
break;
default:
- // redial, but only if we are not synchronous
if (!synch) {
- dialer_timer_start(d);
+ nni_dialer_timer_start(d);
}
break;
}
if (synch) {
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = false;
d->d_lastrv = rv;
nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags)
{
int rv = 0;
- // nni_sock_reconntimes(d->d_sock, &d->d_inirtime,
- //&d->d_maxrtime);
- d->d_currtime = d->d_inirtime;
-
- nni_mtx_lock(&d->d_mtx);
-
- if (d->d_started) {
- nni_mtx_unlock(&d->d_mtx);
+ if (nni_atomic_flag_test_and_set(&d->d_started)) {
return (NNG_ESTATE);
}
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- d->d_started = true;
- dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
+ d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&d->d_mtx);
+ dialer_connect_start(d);
return (0);
}
- d->d_synch = true;
- d->d_started = true;
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = true;
+ nni_mtx_unlock(&d->d_mtx);
+
dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
while (d->d_synch) {
nni_cv_wait(&d->d_cv);
}
rv = d->d_lastrv;
- nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
if (rv != 0) {
- d->d_started = false;
+ nni_atomic_flag_reset(&d->d_started);
}
- nni_mtx_unlock(&d->d_mtx);
return (rv);
}
-void
-nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
-{
- if (d == NULL) {
- return;
- }
-
- // Break up the relationship between the dialer and the pipe.
- nni_mtx_lock(&d->d_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&d->d_pipes, p)) {
- nni_list_remove(&d->d_pipes, p);
- }
- // Wake up the close thread if it is waiting.
- if (d->d_closed) {
- if (nni_list_empty(&d->d_pipes)) {
- nni_cv_wake(&d->d_cv);
- }
- } else {
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- dialer_timer_start(d);
- }
- nni_mtx_unlock(&d->d_mtx);
-}
-
int
nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
nni_opt_type t)
@@ -508,9 +371,3 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
-
-void
-nni_dialer_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_dialer, d_node);
-}