aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h19
-rw-r--r--src/core/dialer.c512
-rw-r--r--src/core/dialer.h32
-rw-r--r--src/core/endpt.c665
-rw-r--r--src/core/endpt.h45
-rw-r--r--src/core/init.c6
-rw-r--r--src/core/listener.c443
-rw-r--r--src/core/listener.h33
-rw-r--r--src/core/nng_impl.h3
-rw-r--r--src/core/pipe.c57
-rw-r--r--src/core/pipe.h16
-rw-r--r--src/core/socket.c139
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/transport.c26
-rw-r--r--src/core/transport.h119
15 files changed, 1292 insertions, 830 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 77078a7a..a0cca368 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -40,14 +40,17 @@ typedef struct nng_event nni_event;
typedef struct nng_notify nni_notify;
// These are our own names.
-typedef struct nni_socket nni_sock;
-typedef struct nni_ctx nni_ctx;
-typedef struct nni_ep nni_ep;
-typedef struct nni_pipe nni_pipe;
-typedef struct nni_tran nni_tran;
-typedef struct nni_tran_option nni_tran_option;
-typedef struct nni_tran_ep_ops nni_tran_ep_ops;
-typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
+typedef struct nni_socket nni_sock;
+typedef struct nni_ctx nni_ctx;
+typedef struct nni_dialer nni_dialer;
+typedef struct nni_listener nni_listener;
+typedef struct nni_pipe nni_pipe;
+
+typedef struct nni_tran nni_tran;
+typedef struct nni_tran_option nni_tran_option;
+typedef struct nni_tran_dialer_ops nni_tran_dialer_ops;
+typedef struct nni_tran_listener_ops nni_tran_listener_ops;
+typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
typedef struct nni_proto_option nni_proto_option;
typedef struct nni_proto_ctx_ops nni_proto_ctx_ops;
diff --git a/src/core/dialer.c b/src/core/dialer.c
new file mode 100644
index 00000000..ee0d2916
--- /dev/null
+++ b/src/core/dialer.c
@@ -0,0 +1,512 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_dialer {
+ nni_tran_dialer_ops d_ops; // transport ops
+ nni_tran * d_tran; // transport pointer
+ void * d_data; // transport private
+ uint64_t d_id; // endpoint id
+ nni_list_node d_node; // per socket list
+ nni_sock * d_sock;
+ nni_url * d_url;
+ int d_refcnt;
+ int d_lastrv; // last result from synchronous
+ bool d_synch; // synchronous connect in progress?
+ bool d_started;
+ bool d_closed; // full shutdown
+ bool d_closing; // close pending (waiting on refcnt)
+ nni_mtx d_mtx;
+ nni_cv d_cv;
+ nni_list d_pipes;
+ nni_aio * d_con_aio;
+ nni_aio * d_tmo_aio; // backoff timer
+ nni_duration d_maxrtime; // maximum time for reconnect
+ nni_duration d_currtime; // current time for reconnect
+ nni_duration d_inirtime; // initial time for reconnect
+ nni_time d_conntime; // time of last good connect
+};
+
+// Functionality related to dialers.
+static void dialer_connect_start(nni_dialer *);
+static void dialer_connect_cb(void *);
+static void dialer_timer_cb(void *);
+
+static nni_idhash *dialers;
+static nni_mtx dialers_lk;
+
+int
+nni_dialer_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&dialers)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&dialers_lk);
+ nni_idhash_set_limits(
+ dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_dialer_sys_fini(void)
+{
+ nni_mtx_fini(&dialers_lk);
+ nni_idhash_fini(dialers);
+ dialers = NULL;
+}
+
+uint32_t
+nni_dialer_id(nni_dialer *d)
+{
+ return ((uint32_t) d->d_id);
+}
+
+static void
+dialer_destroy(nni_dialer *d)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (d->d_id != 0) {
+ nni_idhash_remove(dialers, d->d_id);
+ }
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_sock_remove_dialer(d->d_sock, d);
+
+ nni_aio_fini(d->d_con_aio);
+ nni_aio_fini(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_data != NULL) {
+ d->d_ops.d_fini(d->d_data);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+ nni_cv_fini(&d->d_cv);
+ nni_mtx_fini(&d->d_mtx);
+ nni_url_free(d->d_url);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_dialer *d;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_dialer == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_closing = false;
+ d->d_started = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ d->d_ops = *tran->tran_dialer;
+
+ NNI_LIST_NODE_INIT(&d->d_node);
+
+ nni_pipe_ep_list_init(&d->d_pipes);
+
+ nni_mtx_init(&d->d_mtx);
+ nni_cv_init(&d->d_cv, &d->d_mtx);
+
+ if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ dialer_destroy(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+int
+nni_dialer_find(nni_dialer **dp, uint32_t id)
+{
+ int rv;
+ nni_dialer *d;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&dialers_lk);
+ if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) {
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ *dp = d;
+ }
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+int
+nni_dialer_hold(nni_dialer *d)
+{
+ int rv;
+ nni_mtx_lock(&dialers_lk);
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+void
+nni_dialer_rele(nni_dialer *d)
+{
+ nni_mtx_lock(&dialers_lk);
+ d->d_refcnt--;
+ if (d->d_closing) {
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&dialers_lk);
+}
+
+int
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+ d->d_closing = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ return (0);
+}
+
+void
+nni_dialer_close(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closed) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_dialer_rele(d);
+ return;
+ }
+ d->d_closed = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ nni_dialer_shutdown(d);
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
+ nni_cv_wait(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+
+ dialer_destroy(d);
+}
+
+// This function starts an exponential backoff timer for reconnecting.
+static void
+dialer_timer_start(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ if (d->d_closing) {
+ return;
+ }
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+static void
+dialer_timer_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio * aio = d->d_tmo_aio;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (nni_aio_result(aio) == 0) {
+ dialer_connect_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_pipe * p;
+ nni_aio * aio = d->d_con_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ }
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_dialer(p, d);
+ nni_list_append(&d->d_pipes, p);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+
+ // Good connect, so reset the backoff timer.
+ // Note that a host that accepts the connect, but drops
+ // us immediately, is going to get hit pretty hard
+ // (depending on the initial backoff) with no
+ // exponential backoff. This can happen if we wind up
+ // trying to connect to some port that does not speak
+ // SP for example.
+ d->d_currtime = d->d_inirtime;
+
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // redial, but only if we are not synchronous
+ if (!d->d_synch) {
+ dialer_timer_start(d);
+ }
+ break;
+ }
+ if (d->d_synch) {
+ if (rv != 0) {
+ d->d_started = false;
+ }
+ d->d_lastrv = rv;
+ d->d_synch = false;
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_start(nni_dialer *d)
+{
+ nni_aio *aio = d->d_con_aio;
+
+ // Call with the Endpoint lock held.
+ if (d->d_closing) {
+ return;
+ }
+
+ d->d_ops.d_connect(d->d_data, aio);
+}
+
+int
+nni_dialer_start(nni_dialer *d, int flags)
+{
+ int rv = 0;
+
+ nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime);
+ d->d_currtime = d->d_inirtime;
+
+ nni_mtx_lock(&d->d_mtx);
+
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if (d->d_started) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((flags & NNG_FLAG_NONBLOCK) != 0) {
+ d->d_started = true;
+ dialer_connect_start(d);
+ nni_mtx_unlock(&d->d_mtx);
+ return (0);
+ }
+
+ d->d_synch = true;
+ d->d_started = true;
+ dialer_connect_start(d);
+
+ while (d->d_synch && !d->d_closing) {
+ nni_cv_wait(&d->d_cv);
+ }
+ rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ nni_cv_wake(&d->d_cv);
+
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+}
+
+void
+nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Break up the relationship between the dialer and the pipe.
+ nni_mtx_lock(&d->d_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&d->d_pipes, p)) {
+ nni_list_remove(&d->d_pipes, p);
+ }
+ // Wake up the close thread if it is waiting.
+ if (d->d_closed) {
+ if (nni_list_empty(&d->d_pipes)) {
+ nni_cv_wake(&d->d_cv);
+ }
+ } else {
+ // If this pipe closed, then lets restart the dial operation.
+ // Since the remote side seems to have closed, lets start with
+ // a backoff. This keeps us from pounding the crap out of the
+ // thing if a remote server accepts but then disconnects
+ // immediately.
+ dialer_timer_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+int
+nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
+ nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_set(d->d_data, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_dialer_getopt(
+ nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_get(d->d_data, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
+}
+
+void
+nni_dialer_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_dialer, d_node);
+}
diff --git a/src/core/dialer.h b/src/core/dialer.h
new file mode 100644
index 00000000..56b0fb1b
--- /dev/null
+++ b/src/core/dialer.h
@@ -0,0 +1,32 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_DIALER_H
+#define CORE_DIALER_H
+
+extern int nni_dialer_sys_init(void);
+extern void nni_dialer_sys_fini(void);
+extern int nni_dialer_find(nni_dialer **, uint32_t);
+extern int nni_dialer_hold(nni_dialer *);
+extern void nni_dialer_rele(nni_dialer *);
+extern uint32_t nni_dialer_id(nni_dialer *);
+extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
+extern int nni_dialer_shutdown(nni_dialer *);
+extern void nni_dialer_close(nni_dialer *);
+extern int nni_dialer_start(nni_dialer *, int);
+extern void nni_dialer_list_init(nni_list *);
+extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *);
+
+extern int nni_dialer_setopt(
+ nni_dialer *, const char *, const void *, size_t, nni_opt_type);
+extern int nni_dialer_getopt(
+ nni_dialer *, const char *, void *, size_t *, nni_opt_type);
+
+#endif // CORE_DIALER_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
deleted file mode 100644
index 8e678fb0..00000000
--- a/src/core/endpt.c
+++ /dev/null
@@ -1,665 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-struct nni_ep {
- nni_tran_ep_ops ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint64_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- nni_url * ep_url;
- int ep_mode;
- int ep_refcnt;
- bool ep_started;
- bool ep_closed; // full shutdown
- bool ep_closing; // close pending (waiting on refcnt)
- bool ep_tmo_run;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio * ep_acc_aio;
- nni_aio * ep_con_aio;
- nni_aio * ep_con_syn; // used for sync connect
- nni_aio * ep_tmo_aio; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
-};
-
-// Functionality related to end points.
-
-static void nni_ep_acc_start(nni_ep *);
-static void nni_ep_acc_cb(void *);
-static void nni_ep_con_start(nni_ep *);
-static void nni_ep_con_cb(void *);
-static void nni_ep_tmo_start(nni_ep *);
-static void nni_ep_tmo_cb(void *);
-
-static nni_idhash *nni_eps;
-static nni_mtx nni_ep_lk;
-
-int
-nni_ep_sys_init(void)
-{
- int rv;
-
- if ((rv = nni_idhash_init(&nni_eps)) != 0) {
- return (rv);
- }
- nni_mtx_init(&nni_ep_lk);
- nni_idhash_set_limits(
- nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
-
- return (0);
-}
-
-void
-nni_ep_sys_fini(void)
-{
- nni_mtx_fini(&nni_ep_lk);
- nni_idhash_fini(nni_eps);
- nni_eps = NULL;
-}
-
-uint32_t
-nni_ep_id(nni_ep *ep)
-{
- return ((uint32_t) ep->ep_id);
-}
-
-static void
-nni_ep_destroy(nni_ep *ep)
-{
- if (ep == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (ep->ep_id != 0) {
- nni_idhash_remove(nni_eps, ep->ep_id);
- }
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_sock_ep_remove(ep->ep_sock, ep);
-
- nni_aio_fini(ep->ep_acc_aio);
- nni_aio_fini(ep->ep_con_aio);
- nni_aio_fini(ep->ep_con_syn);
- nni_aio_fini(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_data != NULL) {
- ep->ep_ops.ep_fini(ep->ep_data);
- }
- nni_mtx_unlock(&ep->ep_mtx);
- nni_cv_fini(&ep->ep_cv);
- nni_mtx_fini(&ep->ep_mtx);
- nni_url_free(ep->ep_url);
- NNI_FREE_STRUCT(ep);
-}
-
-static int
-nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode)
-{
- nni_tran *tran;
- nni_ep * ep;
- int rv;
- nni_url * url;
-
- if ((rv = nni_url_parse(&url, urlstr)) != 0) {
- return (rv);
- }
- if ((tran = nni_tran_find(url)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOTSUP);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOMEM);
- }
- ep->ep_url = url;
- ep->ep_closed = false;
- ep->ep_closing = false;
- ep->ep_started = false;
- ep->ep_data = NULL;
- ep->ep_refcnt = 1;
- ep->ep_sock = s;
- ep->ep_tran = tran;
- ep->ep_mode = mode;
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- ep->ep_ops = *tran->tran_ep;
-
- NNI_LIST_NODE_INIT(&ep->ep_node);
-
- nni_pipe_ep_list_init(&ep->ep_pipes);
-
- nni_mtx_init(&ep->ep_mtx);
- nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
-
- if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) ||
- ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
- ((rv = nni_sock_ep_add(s, ep)) != 0)) {
- nni_ep_destroy(ep);
- return (rv);
- }
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL));
-}
-
-int
-nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN));
-}
-
-int
-nni_ep_find(nni_ep **epp, uint32_t id)
-{
- int rv;
- nni_ep *ep;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
- nni_mtx_lock(&nni_ep_lk);
- if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) {
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- *epp = ep;
- }
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-int
-nni_ep_hold(nni_ep *ep)
-{
- int rv;
- nni_mtx_lock(&nni_ep_lk);
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- rv = 0;
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-void
-nni_ep_rele(nni_ep *ep)
-{
- nni_mtx_lock(&nni_ep_lk);
- ep->ep_refcnt--;
- if (ep->ep_closing) {
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&nni_ep_lk);
-}
-
-int
-nni_ep_shutdown(nni_ep *ep)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- ep->ep_closing = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- // Abort any remaining in-flight operations.
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- // Stop the underlying transport.
- ep->ep_ops.ep_close(ep->ep_data);
-
- return (0);
-}
-
-void
-nni_ep_close(nni_ep *ep)
-{
- nni_pipe *p;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
- return;
- }
- ep->ep_closed = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_shutdown(ep);
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) {
- nni_cv_wait(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_destroy(ep);
-}
-
-static void
-nni_ep_tmo_cancel(nni_aio *aio, int rv)
-{
- nni_ep *ep = nni_aio_get_prov_data(aio);
- // The only way this ever gets "finished", is via cancellation.
- if (ep != NULL) {
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_tmo_run) {
- nni_aio_finish_error(aio, rv);
- }
- ep->ep_tmo_run = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
-}
-
-static void
-nni_ep_tmo_start(nni_ep *ep)
-{
- nni_duration backoff;
- int rv;
-
- if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
- return;
- }
- backoff = ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
-
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
-
- nni_aio_set_timeout(
- ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
-
- if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
- 0) {
- nni_aio_finish_error(ep->ep_tmo_aio, rv);
- }
-
- ep->ep_tmo_run = true;
-}
-
-static void
-nni_ep_tmo_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_tmo_aio;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
- if (ep->ep_mode == NNI_EP_MODE_DIAL) {
- nni_ep_con_start(ep);
- } else {
- nni_ep_acc_start(ep);
- }
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_con_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- ep->ep_currtime = ep->ep_inirtime;
-
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
- break;
- default:
- // Other errors involve the use of the backoff timer.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_con_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
-
- ep->ep_ops.ep_connect(ep->ep_data, aio);
-}
-
-int
-nni_ep_dial(nni_ep *ep, int flags)
-{
- int rv = 0;
- nni_aio *aio;
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
-
- if (ep->ep_mode != NNI_EP_MODE_DIAL) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
-
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- ep->ep_started = true;
- nni_ep_con_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
- }
-
- // Synchronous mode: so we have to wait for it to complete.
- aio = ep->ep_con_syn;
- ep->ep_ops.ep_connect(ep->ep_data, aio);
- ep->ep_started = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_aio_wait(aio);
-
- // As we're synchronous, we also have to handle the completion.
- if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) {
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_started = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
- return (rv);
-}
-
-static void
-nni_ep_acc_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_acc_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL);
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- nni_ep_acc_start(ep);
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled or closed, no further action.
- break;
- case NNG_ECONNABORTED:
- case NNG_ECONNRESET:
- // These are remote conditions, no cool down.
- nni_ep_acc_start(ep);
- break;
- default:
- // We don't really know why we failed, but we backoff
- // here. This is because errors here are probably due
- // to system failures (resource exhaustion) and we hope
- // by not thrashing we give the system a chance to
- // recover.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_acc_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_acc_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
- ep->ep_ops.ep_accept(ep->ep_data, aio);
-}
-
-int
-nni_ep_listen(nni_ep *ep, int flags)
-{
- int rv = 0;
- NNI_ARG_UNUSED(flags);
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- rv = ep->ep_ops.ep_bind(ep->ep_data);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- ep->ep_started = true;
- nni_ep_acc_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (0);
-}
-
-int
-nni_ep_pipe_add(nni_ep *ep, nni_pipe *p)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- nni_list_append(&ep->ep_pipes, p);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
-}
-
-void
-nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
-{
- // Break up the relationship between the EP and the pipe.
- nni_mtx_lock(&ep->ep_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&ep->ep_pipes, pipe)) {
- nni_list_remove(&ep->ep_pipes, pipe);
- }
- // Wake up the close thread if it is waiting.
- if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wake(&ep->ep_cv);
- }
-
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) {
- nni_ep_tmo_start(ep);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-int
-nni_ep_setopt(
- nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t)
-{
- nni_tran_option *o;
-
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (NNG_EREADONLY);
- }
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
-
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_set == NULL) {
- return (NNG_EREADONLY);
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_set(ep->ep_data, val, sz, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- return (NNG_ENOTSUP);
-}
-
-int
-nni_ep_mode(nni_ep *ep)
-{
- return (ep->ep_mode);
-}
-
-int
-nni_ep_getopt(
- nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t)
-{
- nni_tran_option *o;
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_get == NULL) {
- return (NNG_EWRITEONLY);
- }
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_get(ep->ep_data, valp, szp, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- // We provide a fallback on the URL, but let the implementation
- // override. This allows the URL to be created with wildcards,
- // that are resolved later.
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t));
- }
-
- return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t));
-}
-
-void
-nni_ep_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_ep, ep_node);
-}
-
-nni_tran *
-nni_ep_tran(nni_ep *ep)
-{
- return (ep->ep_tran);
-}
-
-nni_sock *
-nni_ep_sock(nni_ep *ep)
-{
- return (ep->ep_sock);
-}
diff --git a/src/core/endpt.h b/src/core/endpt.h
deleted file mode 100644
index bf251d41..00000000
--- a/src/core/endpt.h
+++ /dev/null
@@ -1,45 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#ifndef CORE_ENDPT_H
-#define CORE_ENDPT_H
-
-extern int nni_ep_sys_init(void);
-extern void nni_ep_sys_fini(void);
-extern nni_tran *nni_ep_tran(nni_ep *);
-extern nni_sock *nni_ep_sock(nni_ep *);
-extern int nni_ep_find(nni_ep **, uint32_t);
-extern int nni_ep_hold(nni_ep *);
-extern void nni_ep_rele(nni_ep *);
-extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_shutdown(nni_ep *);
-extern void nni_ep_close(nni_ep *);
-extern int nni_ep_dial(nni_ep *, int);
-extern int nni_ep_listen(nni_ep *, int);
-extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
-extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
-extern int nni_ep_mode(nni_ep *);
-
-extern int nni_ep_setopt(
- nni_ep *, const char *, const void *, size_t, nni_opt_type);
-extern int nni_ep_getopt(
- nni_ep *, const char *, void *, size_t *, nni_opt_type);
-
-// Endpoint modes. Currently used by transports. Remove this when we make
-// transport dialers and listeners explicit.
-enum nni_ep_mode {
- NNI_EP_MODE_DIAL = 1,
- NNI_EP_MODE_LISTEN = 2,
-};
-
-#endif // CORE_ENDPT_H
diff --git a/src/core/init.c b/src/core/init.c
index c1b7bbac..30a7a547 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -33,7 +33,8 @@ nni_init_helper(void)
((rv = nni_aio_sys_init()) != 0) ||
((rv = nni_random_sys_init()) != 0) ||
((rv = nni_sock_sys_init()) != 0) ||
- ((rv = nni_ep_sys_init()) != 0) ||
+ ((rv = nni_listener_sys_init()) != 0) ||
+ ((rv = nni_dialer_sys_init()) != 0) ||
((rv = nni_pipe_sys_init()) != 0) ||
((rv = nni_proto_sys_init()) != 0) ||
((rv = nni_tran_sys_init()) != 0)) {
@@ -71,7 +72,8 @@ nni_fini(void)
nni_tran_sys_fini();
nni_proto_sys_fini();
nni_pipe_sys_fini();
- nni_ep_sys_fini();
+ nni_dialer_sys_fini();
+ nni_listener_sys_fini();
nni_sock_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_random_sys_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
new file mode 100644
index 00000000..6d580cd8
--- /dev/null
+++ b/src/core/listener.c
@@ -0,0 +1,443 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_listener {
+ nni_tran_listener_ops l_ops; // transport ops
+ nni_tran * l_tran; // transport pointer
+ void * l_data; // transport private
+ uint64_t l_id; // endpoint id
+ nni_list_node l_node; // per socket list
+ nni_sock * l_sock;
+ nni_url * l_url;
+ int l_refcnt;
+ bool l_started;
+ bool l_closed; // full shutdown
+ bool l_closing; // close pending (waiting on refcnt)
+ nni_mtx l_mtx;
+ nni_cv l_cv;
+ nni_list l_pipes;
+ nni_aio * l_acc_aio;
+ nni_aio * l_tmo_aio;
+};
+
+// Functionality related to listeners.
+
+static void listener_accept_start(nni_listener *);
+static void listener_accept_cb(void *);
+static void listener_timer_cb(void *);
+
+static nni_idhash *listeners;
+static nni_mtx listeners_lk;
+
+int
+nni_listener_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&listeners)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&listeners_lk);
+ nni_idhash_set_limits(
+ listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_listener_sys_fini(void)
+{
+ nni_mtx_fini(&listeners_lk);
+ nni_idhash_fini(listeners);
+ listeners = NULL;
+}
+
+uint32_t
+nni_listener_id(nni_listener *l)
+{
+ return ((uint32_t) l->l_id);
+}
+
+static void
+listener_destroy(nni_listener *l)
+{
+ if (l == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (l->l_id != 0) {
+ nni_idhash_remove(listeners, l->l_id);
+ }
+
+ nni_aio_stop(l->l_acc_aio);
+
+ nni_sock_remove_listener(l->l_sock, l);
+
+ nni_aio_fini(l->l_acc_aio);
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_data != NULL) {
+ l->l_ops.l_fini(l->l_data);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+ nni_cv_fini(&l->l_cv);
+ nni_mtx_fini(&l->l_mtx);
+ nni_url_free(l->l_url);
+ NNI_FREE_STRUCT(l);
+}
+
+int
+nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_listener *l;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_listener == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ l->l_url = url;
+ l->l_closed = false;
+ l->l_closing = false;
+ l->l_started = false;
+ l->l_data = NULL;
+ l->l_refcnt = 1;
+ l->l_sock = s;
+ l->l_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ l->l_ops = *tran->tran_listener;
+
+ NNI_LIST_NODE_INIT(&l->l_node);
+
+ nni_pipe_ep_list_init(&l->l_pipes);
+
+ nni_mtx_init(&l->l_mtx);
+ nni_cv_init(&l->l_cv, &l->l_mtx);
+
+ if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
+ ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
+ ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) ||
+ ((rv = nni_sock_add_listener(s, l)) != 0)) {
+ listener_destroy(l);
+ return (rv);
+ }
+
+ *lp = l;
+ return (0);
+}
+
+int
+nni_listener_find(nni_listener **lp, uint32_t id)
+{
+ int rv;
+ nni_listener *l;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&listeners_lk);
+ if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) {
+ if (l->l_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ l->l_refcnt++;
+ *lp = l;
+ }
+ }
+ nni_mtx_unlock(&listeners_lk);
+ return (rv);
+}
+
+int
+nni_listener_hold(nni_listener *l)
+{
+ int rv;
+ nni_mtx_lock(&listeners_lk);
+ if (l->l_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ l->l_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&listeners_lk);
+ return (rv);
+}
+
+void
+nni_listener_rele(nni_listener *l)
+{
+ nni_mtx_lock(&listeners_lk);
+ l->l_refcnt--;
+ if (l->l_closing) {
+ nni_cv_wake(&l->l_cv);
+ }
+ nni_mtx_unlock(&listeners_lk);
+}
+
+int
+nni_listener_shutdown(nni_listener *l)
+{
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ECLOSED);
+ }
+ l->l_closing = true;
+ nni_mtx_unlock(&l->l_mtx);
+
+ // Abort any remaining in-flight accepts.
+ nni_aio_close(l->l_acc_aio);
+ nni_aio_close(l->l_tmo_aio);
+
+ // Stop the underlying transport.
+ l->l_ops.l_close(l->l_data);
+
+ return (0);
+}
+
+void
+nni_listener_close(nni_listener *l)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closed) {
+ nni_mtx_unlock(&l->l_mtx);
+ nni_listener_rele(l);
+ return;
+ }
+ l->l_closed = true;
+ nni_mtx_unlock(&l->l_mtx);
+
+ nni_listener_shutdown(l);
+
+ nni_aio_stop(l->l_acc_aio);
+ nni_aio_stop(l->l_tmo_aio);
+
+ nni_mtx_lock(&l->l_mtx);
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) {
+ nni_cv_wait(&l->l_cv);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+
+ listener_destroy(l);
+}
+
+static void
+listener_timer_cb(void *arg)
+{
+ nni_listener *l = arg;
+ nni_aio * aio = l->l_tmo_aio;
+
+ nni_mtx_lock(&l->l_mtx);
+ if (nni_aio_result(aio) == 0) {
+ listener_accept_start(l);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+static void
+listener_accept_cb(void *arg)
+{
+ nni_listener *l = arg;
+ nni_pipe * p;
+ nni_aio * aio = l->l_acc_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
+ }
+
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&l->l_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_listener(p, l);
+ nni_list_append(&l->l_pipes, p);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+ listener_accept_start(l);
+ break;
+ case NNG_ECONNABORTED: // remote condition, no cooldown
+ case NNG_ECONNRESET: // remote condition, no cooldown
+ listener_accept_start(l);
+ break;
+ case NNG_ECLOSED: // no further action
+ case NNG_ECANCELED: // no further action
+ break;
+ default:
+ // We don't really know why we failed, but we backoff
+ // here. This is because errors here are probably due
+ // to system failures (resource exhaustion) and we hope
+ // by not thrashing we give the system a chance to
+ // recover. 100 msec is enough to cool down.
+ nni_sleep_aio(100, l->l_tmo_aio);
+ break;
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+static void
+listener_accept_start(nni_listener *l)
+{
+ nni_aio *aio = l->l_acc_aio;
+
+ // Call with the listener lock held.
+ if (l->l_closing) {
+ return;
+ }
+ l->l_ops.l_accept(l->l_data, aio);
+}
+
+int
+nni_listener_start(nni_listener *l, int flags)
+{
+ int rv = 0;
+ NNI_ARG_UNUSED(flags);
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->l_started) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((rv = l->l_ops.l_bind(l->l_data)) != 0) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ l->l_started = true;
+ listener_accept_start(l);
+ nni_mtx_unlock(&l->l_mtx);
+
+ return (0);
+}
+
+void
+nni_listener_remove_pipe(nni_listener *l, nni_pipe *p)
+{
+ if (l == NULL) {
+ return;
+ }
+ // Break up relationship between listener and pipe.
+ nni_mtx_lock(&l->l_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&l->l_pipes, p)) {
+ nni_list_remove(&l->l_pipes, p);
+ }
+ // Wake up the closer if it is waiting.
+ if (l->l_closed && nni_list_empty(&l->l_pipes)) {
+ nni_cv_wake(&l->l_cv);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+int
+nni_listener_setopt(nni_listener *l, const char *name, const void *val,
+ size_t sz, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = l->l_ops.l_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&l->l_mtx);
+ rv = o->o_set(l->l_data, val, sz, t);
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_listener_getopt(
+ nni_listener *l, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = l->l_ops.l_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&l->l_mtx);
+ rv = o->o_get(l->l_data, valp, szp, t);
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(l->l_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
+}
+
+void
+nni_listener_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_listener, l_node);
+}
diff --git a/src/core/listener.h b/src/core/listener.h
new file mode 100644
index 00000000..41b1a678
--- /dev/null
+++ b/src/core/listener.h
@@ -0,0 +1,33 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_LISTENER_H
+#define CORE_LISTENER_H
+
+extern int nni_listener_sys_init(void);
+extern void nni_listener_sys_fini(void);
+extern int nni_listener_find(nni_listener **, uint32_t);
+extern int nni_listener_hold(nni_listener *);
+extern void nni_listener_rele(nni_listener *);
+extern uint32_t nni_listener_id(nni_listener *);
+extern int nni_listener_create(nni_listener **, nni_sock *, const char *);
+extern int nni_listener_shutdown(nni_listener *);
+extern void nni_listener_close(nni_listener *);
+extern int nni_listener_start(nni_listener *, int);
+extern void nni_listener_list_init(nni_list *);
+extern int nni_listener_add_pipe(nni_listener *, nni_pipe *);
+extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *);
+
+extern int nni_listener_setopt(
+ nni_listener *, const char *, const void *, size_t, nni_opt_type);
+extern int nni_listener_getopt(
+ nni_listener *, const char *, void *, size_t *, nni_opt_type);
+
+#endif // CORE_LISTENER_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index fdb2ce94..9af12720 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -52,7 +52,8 @@
// These have to come after the others - particularly transport.h
-#include "core/endpt.h"
+#include "core/dialer.h"
+#include "core/listener.h"
#include "core/pipe.h"
#include "core/socket.h"
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 93fbae99..a42cdeff 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -26,7 +26,8 @@ struct nni_pipe {
nni_list_node p_sock_node;
nni_list_node p_ep_node;
nni_sock * p_sock;
- nni_ep * p_ep;
+ nni_listener * p_listener;
+ nni_dialer * p_dialer;
bool p_closed;
bool p_stop;
bool p_cbs;
@@ -98,7 +99,7 @@ nni_pipe_sys_fini(void)
}
}
-static void
+void
nni_pipe_destroy(nni_pipe *p)
{
bool cbs;
@@ -126,9 +127,8 @@ nni_pipe_destroy(nni_pipe *p)
// We have exclusive access at this point, so we can check if
// we are still on any lists.
- if (nni_list_node_active(&p->p_ep_node)) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
+ nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL
+ nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL
if (nni_list_node_active(&p->p_sock_node)) {
nni_sock_pipe_remove(p->p_sock, p);
@@ -303,12 +303,10 @@ nni_pipe_start_cb(void *arg)
}
int
-nni_pipe_create(nni_ep *ep, void *tdata)
+nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
nni_pipe * p;
int rv;
- nni_tran * tran = nni_ep_tran(ep);
- nni_sock * sock = nni_ep_sock(ep);
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
@@ -324,7 +322,6 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
p->p_proto_data = NULL;
- p->p_ep = ep;
p->p_sock = sock;
p->p_closed = false;
p->p_stop = false;
@@ -348,16 +345,27 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
if ((rv != 0) ||
- ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
- ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
+ ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_destroy(p);
return (rv);
}
+ *pp = p;
return (0);
}
+void
+nni_pipe_set_listener(nni_pipe *p, nni_listener *l)
+{
+ p->p_listener = l;
+}
+
+void
+nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d)
+{
+ p->p_dialer = d;
+}
+
int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
@@ -371,7 +379,13 @@ nni_pipe_getopt(
return (o->o_get(p->p_tran_data, val, szp, t));
}
// Maybe the endpoint knows?
- return (nni_ep_getopt(p->p_ep, name, val, szp, t));
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
+ }
+ if (p->p_listener != NULL) {
+ return (nni_listener_getopt(p->p_listener, name, val, szp, t));
+ }
+ return (NNG_ENOTSUP);
}
void
@@ -409,15 +423,20 @@ nni_pipe_sock_id(nni_pipe *p)
}
uint32_t
-nni_pipe_ep_id(nni_pipe *p)
+nni_pipe_listener_id(nni_pipe *p)
{
- return (nni_ep_id(p->p_ep));
+ if (p->p_listener != NULL) {
+ return (nni_listener_id(p->p_listener));
+ }
+ return (0);
}
-
-int
-nni_pipe_ep_mode(nni_pipe *p)
+uint32_t
+nni_pipe_dialer_id(nni_pipe *p)
{
- return (nni_ep_mode(p->p_ep));
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_id(p->p_dialer));
+ }
+ return (0);
}
static void
diff --git a/src/core/pipe.h b/src/core/pipe.h
index bd66d4ed..5c505514 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -29,6 +29,10 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *);
// Pipe operations that protocols use.
extern uint32_t nni_pipe_id(nni_pipe *);
+// nni_pipe_destroy destroys a pipe -- there must not be any other
+// references to it; this is used only during creation failures.
+extern void nni_pipe_destroy(nni_pipe *);
+
// nni_pipe_close closes the underlying transport for the pipe. Further
// operations against will return NNG_ECLOSED.
extern void nni_pipe_close(nni_pipe *);
@@ -48,7 +52,9 @@ extern void nni_pipe_stop(nni_pipe *);
// endpoint, grabbing each of those locks. The function takes ownership of
// the transport specific pipe (3rd argument), regardless of whether it
// succeeds or not. The endpoint should be held when calling this.
-extern int nni_pipe_create(nni_ep *, void *);
+extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *);
+extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *);
+extern void nni_pipe_set_listener(nni_pipe *, nni_listener *);
// nni_pipe_start is called by the socket to begin any startup activities
// on the pipe before making it ready for use by protocols. For example,
@@ -82,11 +88,11 @@ extern int nni_pipe_find(nni_pipe **, uint32_t);
// nni_pipe_sock_id returns the socket id for the pipe (used by public API).
extern uint32_t nni_pipe_sock_id(nni_pipe *);
-// nni_pipe_ep_id returns the endpoint id for the pipe.
-extern uint32_t nni_pipe_ep_id(nni_pipe *);
+// nni_pipe_listener_id returns the listener id for the pipe (or 0 if none).
+extern uint32_t nni_pipe_listener_id(nni_pipe *);
-// nni_pipe_ep_mode returns the endpoint mode for the pipe.
-extern int nni_pipe_ep_mode(nni_pipe *);
+// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none).
+extern uint32_t nni_pipe_dialer_id(nni_pipe *);
// nni_pipe_closed returns true if nni_pipe_close was called.
// (This is used by the socket to determine if user closed the pipe
diff --git a/src/core/socket.c b/src/core/socket.c
index 4cf624e2..894e4fee 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -82,9 +82,10 @@ struct nni_socket {
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // active pipes
- nni_list s_ctxs; // active contexts (protected by global sock_lk)
+ nni_list s_listeners; // active listeners
+ nni_list s_dialers; // active dialers
+ nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global sock_lk)
bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
@@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
nni_pipe_sock_list_init(&s->s_pipes);
- nni_ep_list_init(&s->s_eps);
+ nni_listener_list_init(&s->s_listeners);
+ nni_dialer_list_init(&s->s_dialers);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
int
nni_sock_shutdown(nni_sock *sock)
{
- nni_pipe *pipe;
- nni_ep * ep;
- nni_ep * nep;
- nni_ctx * ctx;
- nni_ctx * nctx;
+ nni_pipe * pipe;
+ nni_dialer * d;
+ nni_dialer * nd;
+ nni_listener *l;
+ nni_listener *nl;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_shutdown(ep);
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
+ nni_listener_shutdown(l);
}
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
+ nni_dialer_shutdown(d);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the endpoint list, attempting to close them.
+ // Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nep = nni_list_first(&sock->s_eps);
- while ((ep = nep) != NULL) {
- nep = nni_list_next(&sock->s_eps, nep);
+ nl = nni_list_first(&sock->s_listeners);
+ while ((l = nl) != NULL) {
+ nl = nni_list_next(&sock->s_listeners, nl);
+
+ if (nni_listener_hold(l) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
+ }
+ }
+ nd = nni_list_first(&sock->s_dialers);
+ while ((d = nd) != NULL) {
+ nd = nni_list_next(&sock->s_dialers, nd);
- if (nni_ep_hold(ep) == 0) {
+ if (nni_dialer_hold(d) == 0) {
nni_mtx_unlock(&sock->s_mx);
- nni_ep_close(ep);
+ nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
}
@@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock)
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_eps))) {
+ (!nni_list_empty(&sock->s_listeners)) ||
+ (!nni_list_empty(&sock->s_dialers))) {
nni_cv_wait(&sock->s_cv);
}
@@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s)
// Wait for pipes, eps, and contexts to finish closing.
nni_mtx_lock(&s->s_mx);
- while (
- (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
+ while ((!nni_list_empty(&s->s_pipes)) ||
+ (!nni_list_empty(&s->s_dialers)) ||
+ (!nni_list_empty(&s->s_listeners))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_ep_add(nni_sock *s, nni_ep *ep)
+nni_sock_add_listener(nni_sock *s, nni_listener *l)
+{
+ nni_sockopt *sopt;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ int rv;
+ rv = nni_listener_setopt(
+ l, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+ }
+
+ nni_list_append(&s->s_listeners, l);
+ nni_mtx_unlock(&s->s_mx);
+ return (0);
+}
+
+int
+nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
{
nni_sockopt *sopt;
@@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep)
NNI_LIST_FOREACH (&s->s_options, sopt) {
int rv;
- rv = nni_ep_setopt(
- ep, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ rv = nni_dialer_setopt(
+ d, sopt->name, sopt->data, sopt->sz, sopt->typ);
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
- nni_list_append(&s->s_eps, ep);
+ nni_list_append(&s->s_dialers, d);
nni_mtx_unlock(&s->s_mx);
return (0);
}
void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
+nni_sock_remove_listener(nni_sock *s, nni_listener *l)
{
- nni_mtx_lock(&sock->s_mx);
- if (nni_list_active(&sock->s_eps, ep)) {
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_listeners, l)) {
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
}
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
+{
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_dialers, d)) {
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+ }
+ nni_mtx_unlock(&s->s_mx);
}
int
@@ -948,7 +1007,8 @@ nni_sock_setopt(
nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t)
{
int rv = NNG_ENOTSUP;
- nni_ep * ep;
+ nni_dialer * d;
+ nni_listener * l;
nni_sockopt * optv;
nni_sockopt * oldv = NULL;
const sock_option * sso;
@@ -1042,9 +1102,20 @@ nni_sock_setopt(
// transport (other than ENOTSUP) stops the operation
// altogether. Its important that transport wide checks
// properly pre-validate.
- NNI_LIST_FOREACH (&s->s_eps, ep) {
+ NNI_LIST_FOREACH (&s->s_listeners, l) {
+ int x;
+ x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
+ if (x != NNG_ENOTSUP) {
+ if ((rv = x) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (rv);
+ }
+ }
+ }
+ NNI_LIST_FOREACH (&s->s_dialers, d) {
int x;
- x = nni_ep_setopt(ep, optv->name, optv->data, sz, t);
+ x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
if (x != NNG_ENOTSUP) {
if ((rv = x) != 0) {
nni_mtx_unlock(&s->s_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index 7c10b195..184cfb64 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -46,8 +46,11 @@ extern uint32_t nni_sock_id(nni_sock *);
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-extern int nni_sock_ep_add(nni_sock *, nni_ep *);
-extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
+
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
+extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
diff --git a/src/core/transport.c b/src/core/transport.c
index 4733b6bd..8485d048 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -118,12 +118,28 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ)
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
- const nni_tran_ep_ops *ep;
- const nni_tran_option *o;
+ const nni_tran_dialer_ops * dops;
+ const nni_tran_listener_ops *lops;
+ const nni_tran_option * o;
+
+ // Generally we look for endpoint options. We check both
+ // dialers and listeners.
+ dops = t->t_tran.tran_dialer;
+ for (o = dops->d_options; o && o->o_name != NULL; o++) {
+ if (strcmp(name, o->o_name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (NNG_EREADONLY);
+ }
- // Generally we look for endpoint options.
- ep = t->t_tran.tran_ep;
- for (o = ep->ep_options; o && o->o_name != NULL; o++) {
+ rv = (o->o_chk != NULL) ? o->o_chk(v, sz, typ) : 0;
+ nni_mtx_unlock(&nni_tran_lk);
+ return (rv);
+ }
+ lops = t->t_tran.tran_listener;
+ for (o = lops->l_options; o && o->o_name != NULL; o++) {
if (strcmp(name, o->o_name) != 0) {
continue;
}
diff --git a/src/core/transport.h b/src/core/transport.h
index e45aa7ec..257d232d 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -11,30 +11,11 @@
#ifndef CORE_TRANSPORT_H
#define CORE_TRANSPORT_H
-// Transport implementation details. Transports must implement the
-// interfaces in this file.
-struct nni_tran {
- // tran_version is the version of the transport ops that this
- // transport implements. We only bother to version the main
- // ops vector.
- uint32_t tran_version;
-
- // tran_scheme is the transport scheme, such as "tcp" or "inproc".
- const char *tran_scheme;
-
- // tran_ep links our endpoint-specific operations.
- const nni_tran_ep_ops *tran_ep;
-
- // tran_pipe links our pipe-specific operations.
- const nni_tran_pipe_ops *tran_pipe;
-
- // tran_init, if not NULL, is called once during library
- // initialization.
- int (*tran_init)(void);
-
- // tran_fini, if not NULL, is called during library deinitialization.
- // It should release any global resources, close any open files, etc.
- void (*tran_fini)(void);
+// Endpoint modes. Currently used by transports. Remove this when we make
+// transport dialers and listeners explicit.
+enum nni_ep_mode {
+ NNI_EP_MODE_DIAL = 1,
+ NNI_EP_MODE_LISTEN = 2,
};
// We quite intentionally use a signature where the upper word is nonzero,
@@ -48,7 +29,8 @@ struct nni_tran {
#define NNI_TRANSPORT_V0 0x54520000
#define NNI_TRANSPORT_V1 0x54520001
#define NNI_TRANSPORT_V2 0x54520002
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V2
+#define NNI_TRANSPORT_V3 0x54520003
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3
// Option handlers.
struct nni_tran_option {
@@ -81,40 +63,60 @@ struct nni_tran_option {
// For a given endpoint, the framework holds a lock so that each entry
// point is run exclusively of the others. (Transports must still guard
// against any asynchronous operations they manage themselves, though.)
-struct nni_tran_ep_ops {
- // ep_init creates a vanilla endpoint. The value created is
- // used for the first argument for all other endpoint
- // functions.
- int (*ep_init)(void **, nni_url *, nni_sock *, int);
- // ep_fini frees the resources associated with the endpoint.
- // The endpoint will already have been closed.
- void (*ep_fini)(void *);
+struct nni_tran_dialer_ops {
+ // d_init creates a vanilla dialer. The value created is
+ // used for the first argument for all other dialer functions.
+ int (*d_init)(void **, nni_url *, nni_sock *);
+
+ // d_fini frees the resources associated with the dialer.
+ // The dialer will already have been closed.
+ void (*d_fini)(void *);
- // ep_connect establishes a connection. It can return errors
+ // d_connect establishes a connection. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR,
// NNG_ECONNFAILED, NNG_ETIMEDOUT, and NNG_EPROTO.
- void (*ep_connect)(void *, nni_aio *);
+ void (*d_connect)(void *, nni_aio *);
+
+ // d_close stops the dialer from operating altogether. It
+ // does not affect pipes that have already been created. It is
+ // nonblocking.
+ void (*d_close)(void *);
+
+ // d_options is an array of dialer options. The final
+ // element must have a NULL name. If this member is NULL, then
+ // no dialer specific options are available.
+ nni_tran_option *d_options;
+};
+
+struct nni_tran_listener_ops {
+ // l_init creates a vanilla listener. The value created is
+ // used for the first argument for all other listener functions.
+ int (*l_init)(void **, nni_url *, nni_sock *);
- // ep_bind just does the bind() and listen() work,
+ // l_fini frees the resources associated with the listener.
+ // The listener will already have been closed.
+ void (*l_fini)(void *);
+
+ // l_bind just does the bind() and listen() work,
// reserving the address but not creating any connections.
// It should return NNG_EADDRINUSE if the address is already
// taken. It can also return NNG_EBADADDR for an unsuitable
// address, or NNG_EACCESS for permission problems.
- int (*ep_bind)(void *);
+ int (*l_bind)(void *);
- // ep_accept accepts an inbound connection.
- void (*ep_accept)(void *, nni_aio *);
+ // l_accept accepts an inbound connection.
+ void (*l_accept)(void *, nni_aio *);
- // ep_close stops the endpoint from operating altogether. It
+ // l_close stops the listener from operating altogether. It
// does not affect pipes that have already been created. It is
// nonblocking.
- void (*ep_close)(void *);
+ void (*l_close)(void *);
- // ep_options is an array of endpoint options. The final
+ // l_options is an array of listener options. The final
// element must have a NULL name. If this member is NULL, then
- // no transport specific options are available.
- nni_tran_option *ep_options;
+ // no dialer specific options are available.
+ nni_tran_option *l_options;
};
// Pipe operations are entry points called by the socket. These may be
@@ -168,6 +170,35 @@ struct nni_tran_pipe_ops {
nni_tran_option *p_options;
};
+// Transport implementation details. Transports must implement the
+// interfaces in this file.
+struct nni_tran {
+ // tran_version is the version of the transport ops that this
+ // transport implements. We only bother to version the main
+ // ops vector.
+ uint32_t tran_version;
+
+ // tran_scheme is the transport scheme, such as "tcp" or "inproc".
+ const char *tran_scheme;
+
+ // tran_dialer links our dialer-specific operations.
+ const nni_tran_dialer_ops *tran_dialer;
+
+ // tran_listener links our listener-specific operations.
+ const nni_tran_listener_ops *tran_listener;
+
+ // tran_pipe links our pipe-specific operations.
+ const nni_tran_pipe_ops *tran_pipe;
+
+ // tran_init, if not NULL, is called once during library
+ // initialization.
+ int (*tran_init)(void);
+
+ // tran_fini, if not NULL, is called during library deinitialization.
+ // It should release any global resources, close any open files, etc.
+ void (*tran_fini)(void);
+};
+
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(nni_url *);