aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c512
1 files changed, 512 insertions, 0 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
new file mode 100644
index 00000000..ee0d2916
--- /dev/null
+++ b/src/core/dialer.c
@@ -0,0 +1,512 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_dialer {
+ nni_tran_dialer_ops d_ops; // transport ops
+ nni_tran * d_tran; // transport pointer
+ void * d_data; // transport private
+ uint64_t d_id; // endpoint id
+ nni_list_node d_node; // per socket list
+ nni_sock * d_sock;
+ nni_url * d_url;
+ int d_refcnt;
+ int d_lastrv; // last result from synchronous
+ bool d_synch; // synchronous connect in progress?
+ bool d_started;
+ bool d_closed; // full shutdown
+ bool d_closing; // close pending (waiting on refcnt)
+ nni_mtx d_mtx;
+ nni_cv d_cv;
+ nni_list d_pipes;
+ nni_aio * d_con_aio;
+ nni_aio * d_tmo_aio; // backoff timer
+ nni_duration d_maxrtime; // maximum time for reconnect
+ nni_duration d_currtime; // current time for reconnect
+ nni_duration d_inirtime; // initial time for reconnect
+ nni_time d_conntime; // time of last good connect
+};
+
+// Functionality related to dialers.
+static void dialer_connect_start(nni_dialer *);
+static void dialer_connect_cb(void *);
+static void dialer_timer_cb(void *);
+
+static nni_idhash *dialers;
+static nni_mtx dialers_lk;
+
+int
+nni_dialer_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&dialers)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&dialers_lk);
+ nni_idhash_set_limits(
+ dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_dialer_sys_fini(void)
+{
+ nni_mtx_fini(&dialers_lk);
+ nni_idhash_fini(dialers);
+ dialers = NULL;
+}
+
+uint32_t
+nni_dialer_id(nni_dialer *d)
+{
+ return ((uint32_t) d->d_id);
+}
+
+static void
+dialer_destroy(nni_dialer *d)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (d->d_id != 0) {
+ nni_idhash_remove(dialers, d->d_id);
+ }
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_sock_remove_dialer(d->d_sock, d);
+
+ nni_aio_fini(d->d_con_aio);
+ nni_aio_fini(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_data != NULL) {
+ d->d_ops.d_fini(d->d_data);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+ nni_cv_fini(&d->d_cv);
+ nni_mtx_fini(&d->d_mtx);
+ nni_url_free(d->d_url);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_dialer *d;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_dialer == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_closing = false;
+ d->d_started = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ d->d_ops = *tran->tran_dialer;
+
+ NNI_LIST_NODE_INIT(&d->d_node);
+
+ nni_pipe_ep_list_init(&d->d_pipes);
+
+ nni_mtx_init(&d->d_mtx);
+ nni_cv_init(&d->d_cv, &d->d_mtx);
+
+ if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ dialer_destroy(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+int
+nni_dialer_find(nni_dialer **dp, uint32_t id)
+{
+ int rv;
+ nni_dialer *d;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&dialers_lk);
+ if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) {
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ *dp = d;
+ }
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+int
+nni_dialer_hold(nni_dialer *d)
+{
+ int rv;
+ nni_mtx_lock(&dialers_lk);
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+void
+nni_dialer_rele(nni_dialer *d)
+{
+ nni_mtx_lock(&dialers_lk);
+ d->d_refcnt--;
+ if (d->d_closing) {
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&dialers_lk);
+}
+
+int
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+ d->d_closing = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ return (0);
+}
+
+void
+nni_dialer_close(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closed) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_dialer_rele(d);
+ return;
+ }
+ d->d_closed = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ nni_dialer_shutdown(d);
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
+ nni_cv_wait(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+
+ dialer_destroy(d);
+}
+
+// This function starts an exponential backoff timer for reconnecting.
+static void
+dialer_timer_start(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ if (d->d_closing) {
+ return;
+ }
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+static void
+dialer_timer_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio * aio = d->d_tmo_aio;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (nni_aio_result(aio) == 0) {
+ dialer_connect_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_pipe * p;
+ nni_aio * aio = d->d_con_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ }
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_dialer(p, d);
+ nni_list_append(&d->d_pipes, p);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+
+ // Good connect, so reset the backoff timer.
+ // Note that a host that accepts the connect, but drops
+ // us immediately, is going to get hit pretty hard
+ // (depending on the initial backoff) with no
+ // exponential backoff. This can happen if we wind up
+ // trying to connect to some port that does not speak
+ // SP for example.
+ d->d_currtime = d->d_inirtime;
+
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // redial, but only if we are not synchronous
+ if (!d->d_synch) {
+ dialer_timer_start(d);
+ }
+ break;
+ }
+ if (d->d_synch) {
+ if (rv != 0) {
+ d->d_started = false;
+ }
+ d->d_lastrv = rv;
+ d->d_synch = false;
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_start(nni_dialer *d)
+{
+ nni_aio *aio = d->d_con_aio;
+
+ // Call with the Endpoint lock held.
+ if (d->d_closing) {
+ return;
+ }
+
+ d->d_ops.d_connect(d->d_data, aio);
+}
+
+int
+nni_dialer_start(nni_dialer *d, int flags)
+{
+ int rv = 0;
+
+ nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime);
+ d->d_currtime = d->d_inirtime;
+
+ nni_mtx_lock(&d->d_mtx);
+
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if (d->d_started) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((flags & NNG_FLAG_NONBLOCK) != 0) {
+ d->d_started = true;
+ dialer_connect_start(d);
+ nni_mtx_unlock(&d->d_mtx);
+ return (0);
+ }
+
+ d->d_synch = true;
+ d->d_started = true;
+ dialer_connect_start(d);
+
+ while (d->d_synch && !d->d_closing) {
+ nni_cv_wait(&d->d_cv);
+ }
+ rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ nni_cv_wake(&d->d_cv);
+
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+}
+
+void
+nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Break up the relationship between the dialer and the pipe.
+ nni_mtx_lock(&d->d_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&d->d_pipes, p)) {
+ nni_list_remove(&d->d_pipes, p);
+ }
+ // Wake up the close thread if it is waiting.
+ if (d->d_closed) {
+ if (nni_list_empty(&d->d_pipes)) {
+ nni_cv_wake(&d->d_cv);
+ }
+ } else {
+ // If this pipe closed, then lets restart the dial operation.
+ // Since the remote side seems to have closed, lets start with
+ // a backoff. This keeps us from pounding the crap out of the
+ // thing if a remote server accepts but then disconnects
+ // immediately.
+ dialer_timer_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+int
+nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
+ nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_set(d->d_data, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_dialer_getopt(
+ nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_get(d->d_data, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
+}
+
+void
+nni_dialer_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_dialer, d_node);
+}