aboutsummaryrefslogtreecommitdiff
path: root/src/core/dialer.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-26 17:39:17 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-27 17:28:05 -0700
commit251553b13e6bc8019914b9edd1292f97e856dd43 (patch)
tree9193b8b4d4df86253f0a469cd96d8bb304a64c82 /src/core/dialer.c
parent91f9061ad9289afffb0111c03a8390d0f82d7114 (diff)
downloadnng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.gz
nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.bz2
nng-251553b13e6bc8019914b9edd1292f97e856dd43.zip
fixes #522 Separate out the endpoint plumbing
This separates the plumbing for endpoints into distinct dialer and listeners. Some of the transports could benefit from further separation, but we've done some rather larger separation e.g. for the websocket transport. IPC would be a good one to update later, when we start looking at exposing a more natural underlying API.
Diffstat (limited to 'src/core/dialer.c')
-rw-r--r--src/core/dialer.c512
1 files changed, 512 insertions, 0 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
new file mode 100644
index 00000000..ee0d2916
--- /dev/null
+++ b/src/core/dialer.c
@@ -0,0 +1,512 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_dialer {
+ nni_tran_dialer_ops d_ops; // transport ops
+ nni_tran * d_tran; // transport pointer
+ void * d_data; // transport private
+ uint64_t d_id; // endpoint id
+ nni_list_node d_node; // per socket list
+ nni_sock * d_sock;
+ nni_url * d_url;
+ int d_refcnt;
+ int d_lastrv; // last result from synchronous
+ bool d_synch; // synchronous connect in progress?
+ bool d_started;
+ bool d_closed; // full shutdown
+ bool d_closing; // close pending (waiting on refcnt)
+ nni_mtx d_mtx;
+ nni_cv d_cv;
+ nni_list d_pipes;
+ nni_aio * d_con_aio;
+ nni_aio * d_tmo_aio; // backoff timer
+ nni_duration d_maxrtime; // maximum time for reconnect
+ nni_duration d_currtime; // current time for reconnect
+ nni_duration d_inirtime; // initial time for reconnect
+ nni_time d_conntime; // time of last good connect
+};
+
+// Functionality related to dialers.
+static void dialer_connect_start(nni_dialer *);
+static void dialer_connect_cb(void *);
+static void dialer_timer_cb(void *);
+
+static nni_idhash *dialers;
+static nni_mtx dialers_lk;
+
+int
+nni_dialer_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&dialers)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&dialers_lk);
+ nni_idhash_set_limits(
+ dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_dialer_sys_fini(void)
+{
+ nni_mtx_fini(&dialers_lk);
+ nni_idhash_fini(dialers);
+ dialers = NULL;
+}
+
+uint32_t
+nni_dialer_id(nni_dialer *d)
+{
+ return ((uint32_t) d->d_id);
+}
+
+static void
+dialer_destroy(nni_dialer *d)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (d->d_id != 0) {
+ nni_idhash_remove(dialers, d->d_id);
+ }
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_sock_remove_dialer(d->d_sock, d);
+
+ nni_aio_fini(d->d_con_aio);
+ nni_aio_fini(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_data != NULL) {
+ d->d_ops.d_fini(d->d_data);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+ nni_cv_fini(&d->d_cv);
+ nni_mtx_fini(&d->d_mtx);
+ nni_url_free(d->d_url);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_dialer *d;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_dialer == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_closing = false;
+ d->d_started = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ d->d_ops = *tran->tran_dialer;
+
+ NNI_LIST_NODE_INIT(&d->d_node);
+
+ nni_pipe_ep_list_init(&d->d_pipes);
+
+ nni_mtx_init(&d->d_mtx);
+ nni_cv_init(&d->d_cv, &d->d_mtx);
+
+ if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ dialer_destroy(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+int
+nni_dialer_find(nni_dialer **dp, uint32_t id)
+{
+ int rv;
+ nni_dialer *d;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&dialers_lk);
+ if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) {
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ *dp = d;
+ }
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+int
+nni_dialer_hold(nni_dialer *d)
+{
+ int rv;
+ nni_mtx_lock(&dialers_lk);
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+void
+nni_dialer_rele(nni_dialer *d)
+{
+ nni_mtx_lock(&dialers_lk);
+ d->d_refcnt--;
+ if (d->d_closing) {
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&dialers_lk);
+}
+
+int
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+ d->d_closing = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ return (0);
+}
+
+void
+nni_dialer_close(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closed) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_dialer_rele(d);
+ return;
+ }
+ d->d_closed = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ nni_dialer_shutdown(d);
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
+ nni_cv_wait(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+
+ dialer_destroy(d);
+}
+
+// This function starts an exponential backoff timer for reconnecting.
+static void
+dialer_timer_start(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ if (d->d_closing) {
+ return;
+ }
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+static void
+dialer_timer_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio * aio = d->d_tmo_aio;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (nni_aio_result(aio) == 0) {
+ dialer_connect_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_pipe * p;
+ nni_aio * aio = d->d_con_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ }
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_dialer(p, d);
+ nni_list_append(&d->d_pipes, p);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+
+ // Good connect, so reset the backoff timer.
+ // Note that a host that accepts the connect, but drops
+ // us immediately, is going to get hit pretty hard
+ // (depending on the initial backoff) with no
+ // exponential backoff. This can happen if we wind up
+ // trying to connect to some port that does not speak
+ // SP for example.
+ d->d_currtime = d->d_inirtime;
+
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // redial, but only if we are not synchronous
+ if (!d->d_synch) {
+ dialer_timer_start(d);
+ }
+ break;
+ }
+ if (d->d_synch) {
+ if (rv != 0) {
+ d->d_started = false;
+ }
+ d->d_lastrv = rv;
+ d->d_synch = false;
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_start(nni_dialer *d)
+{
+ nni_aio *aio = d->d_con_aio;
+
+ // Call with the Endpoint lock held.
+ if (d->d_closing) {
+ return;
+ }
+
+ d->d_ops.d_connect(d->d_data, aio);
+}
+
+int
+nni_dialer_start(nni_dialer *d, int flags)
+{
+ int rv = 0;
+
+ nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime);
+ d->d_currtime = d->d_inirtime;
+
+ nni_mtx_lock(&d->d_mtx);
+
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if (d->d_started) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((flags & NNG_FLAG_NONBLOCK) != 0) {
+ d->d_started = true;
+ dialer_connect_start(d);
+ nni_mtx_unlock(&d->d_mtx);
+ return (0);
+ }
+
+ d->d_synch = true;
+ d->d_started = true;
+ dialer_connect_start(d);
+
+ while (d->d_synch && !d->d_closing) {
+ nni_cv_wait(&d->d_cv);
+ }
+ rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ nni_cv_wake(&d->d_cv);
+
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+}
+
+void
+nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Break up the relationship between the dialer and the pipe.
+ nni_mtx_lock(&d->d_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&d->d_pipes, p)) {
+ nni_list_remove(&d->d_pipes, p);
+ }
+ // Wake up the close thread if it is waiting.
+ if (d->d_closed) {
+ if (nni_list_empty(&d->d_pipes)) {
+ nni_cv_wake(&d->d_cv);
+ }
+ } else {
+ // If this pipe closed, then lets restart the dial operation.
+ // Since the remote side seems to have closed, lets start with
+ // a backoff. This keeps us from pounding the crap out of the
+ // thing if a remote server accepts but then disconnects
+ // immediately.
+ dialer_timer_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+int
+nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
+ nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_set(d->d_data, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_dialer_getopt(
+ nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_get(d->d_data, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
+}
+
+void
+nni_dialer_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_dialer, d_node);
+}