aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-26 17:39:17 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-27 17:28:05 -0700
commit251553b13e6bc8019914b9edd1292f97e856dd43 (patch)
tree9193b8b4d4df86253f0a469cd96d8bb304a64c82 /src/core/endpt.c
parent91f9061ad9289afffb0111c03a8390d0f82d7114 (diff)
downloadnng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.gz
nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.bz2
nng-251553b13e6bc8019914b9edd1292f97e856dd43.zip
fixes #522 Separate out the endpoint plumbing
This separates the plumbing for endpoints into distinct dialer and listeners. Some of the transports could benefit from further separation, but we've done some rather larger separation e.g. for the websocket transport. IPC would be a good one to update later, when we start looking at exposing a more natural underlying API.
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c665
1 files changed, 0 insertions, 665 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
deleted file mode 100644
index 8e678fb0..00000000
--- a/src/core/endpt.c
+++ /dev/null
@@ -1,665 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-struct nni_ep {
- nni_tran_ep_ops ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint64_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- nni_url * ep_url;
- int ep_mode;
- int ep_refcnt;
- bool ep_started;
- bool ep_closed; // full shutdown
- bool ep_closing; // close pending (waiting on refcnt)
- bool ep_tmo_run;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio * ep_acc_aio;
- nni_aio * ep_con_aio;
- nni_aio * ep_con_syn; // used for sync connect
- nni_aio * ep_tmo_aio; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
-};
-
-// Functionality related to end points.
-
-static void nni_ep_acc_start(nni_ep *);
-static void nni_ep_acc_cb(void *);
-static void nni_ep_con_start(nni_ep *);
-static void nni_ep_con_cb(void *);
-static void nni_ep_tmo_start(nni_ep *);
-static void nni_ep_tmo_cb(void *);
-
-static nni_idhash *nni_eps;
-static nni_mtx nni_ep_lk;
-
-int
-nni_ep_sys_init(void)
-{
- int rv;
-
- if ((rv = nni_idhash_init(&nni_eps)) != 0) {
- return (rv);
- }
- nni_mtx_init(&nni_ep_lk);
- nni_idhash_set_limits(
- nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
-
- return (0);
-}
-
-void
-nni_ep_sys_fini(void)
-{
- nni_mtx_fini(&nni_ep_lk);
- nni_idhash_fini(nni_eps);
- nni_eps = NULL;
-}
-
-uint32_t
-nni_ep_id(nni_ep *ep)
-{
- return ((uint32_t) ep->ep_id);
-}
-
-static void
-nni_ep_destroy(nni_ep *ep)
-{
- if (ep == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (ep->ep_id != 0) {
- nni_idhash_remove(nni_eps, ep->ep_id);
- }
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_sock_ep_remove(ep->ep_sock, ep);
-
- nni_aio_fini(ep->ep_acc_aio);
- nni_aio_fini(ep->ep_con_aio);
- nni_aio_fini(ep->ep_con_syn);
- nni_aio_fini(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_data != NULL) {
- ep->ep_ops.ep_fini(ep->ep_data);
- }
- nni_mtx_unlock(&ep->ep_mtx);
- nni_cv_fini(&ep->ep_cv);
- nni_mtx_fini(&ep->ep_mtx);
- nni_url_free(ep->ep_url);
- NNI_FREE_STRUCT(ep);
-}
-
-static int
-nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode)
-{
- nni_tran *tran;
- nni_ep * ep;
- int rv;
- nni_url * url;
-
- if ((rv = nni_url_parse(&url, urlstr)) != 0) {
- return (rv);
- }
- if ((tran = nni_tran_find(url)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOTSUP);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOMEM);
- }
- ep->ep_url = url;
- ep->ep_closed = false;
- ep->ep_closing = false;
- ep->ep_started = false;
- ep->ep_data = NULL;
- ep->ep_refcnt = 1;
- ep->ep_sock = s;
- ep->ep_tran = tran;
- ep->ep_mode = mode;
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- ep->ep_ops = *tran->tran_ep;
-
- NNI_LIST_NODE_INIT(&ep->ep_node);
-
- nni_pipe_ep_list_init(&ep->ep_pipes);
-
- nni_mtx_init(&ep->ep_mtx);
- nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
-
- if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) ||
- ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
- ((rv = nni_sock_ep_add(s, ep)) != 0)) {
- nni_ep_destroy(ep);
- return (rv);
- }
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL));
-}
-
-int
-nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN));
-}
-
-int
-nni_ep_find(nni_ep **epp, uint32_t id)
-{
- int rv;
- nni_ep *ep;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
- nni_mtx_lock(&nni_ep_lk);
- if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) {
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- *epp = ep;
- }
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-int
-nni_ep_hold(nni_ep *ep)
-{
- int rv;
- nni_mtx_lock(&nni_ep_lk);
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- rv = 0;
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-void
-nni_ep_rele(nni_ep *ep)
-{
- nni_mtx_lock(&nni_ep_lk);
- ep->ep_refcnt--;
- if (ep->ep_closing) {
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&nni_ep_lk);
-}
-
-int
-nni_ep_shutdown(nni_ep *ep)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- ep->ep_closing = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- // Abort any remaining in-flight operations.
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- // Stop the underlying transport.
- ep->ep_ops.ep_close(ep->ep_data);
-
- return (0);
-}
-
-void
-nni_ep_close(nni_ep *ep)
-{
- nni_pipe *p;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
- return;
- }
- ep->ep_closed = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_shutdown(ep);
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) {
- nni_cv_wait(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_destroy(ep);
-}
-
-static void
-nni_ep_tmo_cancel(nni_aio *aio, int rv)
-{
- nni_ep *ep = nni_aio_get_prov_data(aio);
- // The only way this ever gets "finished", is via cancellation.
- if (ep != NULL) {
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_tmo_run) {
- nni_aio_finish_error(aio, rv);
- }
- ep->ep_tmo_run = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
-}
-
-static void
-nni_ep_tmo_start(nni_ep *ep)
-{
- nni_duration backoff;
- int rv;
-
- if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
- return;
- }
- backoff = ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
-
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
-
- nni_aio_set_timeout(
- ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
-
- if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
- 0) {
- nni_aio_finish_error(ep->ep_tmo_aio, rv);
- }
-
- ep->ep_tmo_run = true;
-}
-
-static void
-nni_ep_tmo_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_tmo_aio;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
- if (ep->ep_mode == NNI_EP_MODE_DIAL) {
- nni_ep_con_start(ep);
- } else {
- nni_ep_acc_start(ep);
- }
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_con_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- ep->ep_currtime = ep->ep_inirtime;
-
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
- break;
- default:
- // Other errors involve the use of the backoff timer.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_con_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
-
- ep->ep_ops.ep_connect(ep->ep_data, aio);
-}
-
-int
-nni_ep_dial(nni_ep *ep, int flags)
-{
- int rv = 0;
- nni_aio *aio;
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
-
- if (ep->ep_mode != NNI_EP_MODE_DIAL) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
-
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- ep->ep_started = true;
- nni_ep_con_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
- }
-
- // Synchronous mode: so we have to wait for it to complete.
- aio = ep->ep_con_syn;
- ep->ep_ops.ep_connect(ep->ep_data, aio);
- ep->ep_started = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_aio_wait(aio);
-
- // As we're synchronous, we also have to handle the completion.
- if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) {
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_started = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
- return (rv);
-}
-
-static void
-nni_ep_acc_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_acc_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL);
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- nni_ep_acc_start(ep);
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled or closed, no further action.
- break;
- case NNG_ECONNABORTED:
- case NNG_ECONNRESET:
- // These are remote conditions, no cool down.
- nni_ep_acc_start(ep);
- break;
- default:
- // We don't really know why we failed, but we backoff
- // here. This is because errors here are probably due
- // to system failures (resource exhaustion) and we hope
- // by not thrashing we give the system a chance to
- // recover.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_acc_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_acc_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
- ep->ep_ops.ep_accept(ep->ep_data, aio);
-}
-
-int
-nni_ep_listen(nni_ep *ep, int flags)
-{
- int rv = 0;
- NNI_ARG_UNUSED(flags);
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- rv = ep->ep_ops.ep_bind(ep->ep_data);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- ep->ep_started = true;
- nni_ep_acc_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (0);
-}
-
-int
-nni_ep_pipe_add(nni_ep *ep, nni_pipe *p)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- nni_list_append(&ep->ep_pipes, p);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
-}
-
-void
-nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
-{
- // Break up the relationship between the EP and the pipe.
- nni_mtx_lock(&ep->ep_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&ep->ep_pipes, pipe)) {
- nni_list_remove(&ep->ep_pipes, pipe);
- }
- // Wake up the close thread if it is waiting.
- if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wake(&ep->ep_cv);
- }
-
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) {
- nni_ep_tmo_start(ep);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-int
-nni_ep_setopt(
- nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t)
-{
- nni_tran_option *o;
-
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (NNG_EREADONLY);
- }
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
-
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_set == NULL) {
- return (NNG_EREADONLY);
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_set(ep->ep_data, val, sz, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- return (NNG_ENOTSUP);
-}
-
-int
-nni_ep_mode(nni_ep *ep)
-{
- return (ep->ep_mode);
-}
-
-int
-nni_ep_getopt(
- nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t)
-{
- nni_tran_option *o;
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_get == NULL) {
- return (NNG_EWRITEONLY);
- }
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_get(ep->ep_data, valp, szp, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- // We provide a fallback on the URL, but let the implementation
- // override. This allows the URL to be created with wildcards,
- // that are resolved later.
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t));
- }
-
- return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t));
-}
-
-void
-nni_ep_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_ep, ep_node);
-}
-
-nni_tran *
-nni_ep_tran(nni_ep *ep)
-{
- return (ep->ep_tran);
-}
-
-nni_sock *
-nni_ep_sock(nni_ep *ep)
-{
- return (ep->ep_sock);
-}