aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt6
-rw-r--r--src/core/defs.h19
-rw-r--r--src/core/dialer.c512
-rw-r--r--src/core/dialer.h32
-rw-r--r--src/core/endpt.c665
-rw-r--r--src/core/endpt.h45
-rw-r--r--src/core/init.c6
-rw-r--r--src/core/listener.c443
-rw-r--r--src/core/listener.h33
-rw-r--r--src/core/nng_impl.h3
-rw-r--r--src/core/pipe.c57
-rw-r--r--src/core/pipe.h16
-rw-r--r--src/core/socket.c139
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/transport.c26
-rw-r--r--src/core/transport.h119
-rw-r--r--src/nng.c265
-rw-r--r--src/transport/inproc/inproc.c70
-rw-r--r--src/transport/ipc/ipc.c69
-rw-r--r--src/transport/tcp/tcp.c126
-rw-r--r--src/transport/tls/tls.c156
-rw-r--r--src/transport/ws/websocket.c757
-rw-r--r--src/transport/zerotier/zerotier.c48
23 files changed, 2256 insertions, 1363 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index cfb6ff16..0ad53392 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -36,8 +36,8 @@ set (NNG_SOURCES
core/clock.h
core/device.c
core/device.h
- core/endpt.c
- core/endpt.h
+ core/dialer.c
+ core/dialer.h
core/file.c
core/file.h
core/idhash.c
@@ -46,6 +46,8 @@ set (NNG_SOURCES
core/init.h
core/list.c
core/list.h
+ core/listener.c
+ core/listener.h
core/message.c
core/message.h
core/msgqueue.c
diff --git a/src/core/defs.h b/src/core/defs.h
index 77078a7a..a0cca368 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -40,14 +40,17 @@ typedef struct nng_event nni_event;
typedef struct nng_notify nni_notify;
// These are our own names.
-typedef struct nni_socket nni_sock;
-typedef struct nni_ctx nni_ctx;
-typedef struct nni_ep nni_ep;
-typedef struct nni_pipe nni_pipe;
-typedef struct nni_tran nni_tran;
-typedef struct nni_tran_option nni_tran_option;
-typedef struct nni_tran_ep_ops nni_tran_ep_ops;
-typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
+typedef struct nni_socket nni_sock;
+typedef struct nni_ctx nni_ctx;
+typedef struct nni_dialer nni_dialer;
+typedef struct nni_listener nni_listener;
+typedef struct nni_pipe nni_pipe;
+
+typedef struct nni_tran nni_tran;
+typedef struct nni_tran_option nni_tran_option;
+typedef struct nni_tran_dialer_ops nni_tran_dialer_ops;
+typedef struct nni_tran_listener_ops nni_tran_listener_ops;
+typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
typedef struct nni_proto_option nni_proto_option;
typedef struct nni_proto_ctx_ops nni_proto_ctx_ops;
diff --git a/src/core/dialer.c b/src/core/dialer.c
new file mode 100644
index 00000000..ee0d2916
--- /dev/null
+++ b/src/core/dialer.c
@@ -0,0 +1,512 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_dialer {
+ nni_tran_dialer_ops d_ops; // transport ops
+ nni_tran * d_tran; // transport pointer
+ void * d_data; // transport private
+ uint64_t d_id; // endpoint id
+ nni_list_node d_node; // per socket list
+ nni_sock * d_sock;
+ nni_url * d_url;
+ int d_refcnt;
+ int d_lastrv; // last result from synchronous
+ bool d_synch; // synchronous connect in progress?
+ bool d_started;
+ bool d_closed; // full shutdown
+ bool d_closing; // close pending (waiting on refcnt)
+ nni_mtx d_mtx;
+ nni_cv d_cv;
+ nni_list d_pipes;
+ nni_aio * d_con_aio;
+ nni_aio * d_tmo_aio; // backoff timer
+ nni_duration d_maxrtime; // maximum time for reconnect
+ nni_duration d_currtime; // current time for reconnect
+ nni_duration d_inirtime; // initial time for reconnect
+ nni_time d_conntime; // time of last good connect
+};
+
+// Functionality related to dialers.
+static void dialer_connect_start(nni_dialer *);
+static void dialer_connect_cb(void *);
+static void dialer_timer_cb(void *);
+
+static nni_idhash *dialers;
+static nni_mtx dialers_lk;
+
+int
+nni_dialer_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&dialers)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&dialers_lk);
+ nni_idhash_set_limits(
+ dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_dialer_sys_fini(void)
+{
+ nni_mtx_fini(&dialers_lk);
+ nni_idhash_fini(dialers);
+ dialers = NULL;
+}
+
+uint32_t
+nni_dialer_id(nni_dialer *d)
+{
+ return ((uint32_t) d->d_id);
+}
+
+static void
+dialer_destroy(nni_dialer *d)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (d->d_id != 0) {
+ nni_idhash_remove(dialers, d->d_id);
+ }
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_sock_remove_dialer(d->d_sock, d);
+
+ nni_aio_fini(d->d_con_aio);
+ nni_aio_fini(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_data != NULL) {
+ d->d_ops.d_fini(d->d_data);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+ nni_cv_fini(&d->d_cv);
+ nni_mtx_fini(&d->d_mtx);
+ nni_url_free(d->d_url);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_dialer *d;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_dialer == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_closing = false;
+ d->d_started = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ d->d_ops = *tran->tran_dialer;
+
+ NNI_LIST_NODE_INIT(&d->d_node);
+
+ nni_pipe_ep_list_init(&d->d_pipes);
+
+ nni_mtx_init(&d->d_mtx);
+ nni_cv_init(&d->d_cv, &d->d_mtx);
+
+ if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_sock_add_dialer(s, d)) != 0)) {
+ dialer_destroy(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+int
+nni_dialer_find(nni_dialer **dp, uint32_t id)
+{
+ int rv;
+ nni_dialer *d;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&dialers_lk);
+ if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) {
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ *dp = d;
+ }
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+int
+nni_dialer_hold(nni_dialer *d)
+{
+ int rv;
+ nni_mtx_lock(&dialers_lk);
+ if (d->d_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ d->d_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&dialers_lk);
+ return (rv);
+}
+
+void
+nni_dialer_rele(nni_dialer *d)
+{
+ nni_mtx_lock(&dialers_lk);
+ d->d_refcnt--;
+ if (d->d_closing) {
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&dialers_lk);
+}
+
+int
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+ d->d_closing = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ return (0);
+}
+
+void
+nni_dialer_close(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (d->d_closed) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_dialer_rele(d);
+ return;
+ }
+ d->d_closed = true;
+ nni_mtx_unlock(&d->d_mtx);
+
+ nni_dialer_shutdown(d);
+
+ nni_aio_stop(d->d_con_aio);
+ nni_aio_stop(d->d_tmo_aio);
+
+ nni_mtx_lock(&d->d_mtx);
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
+ nni_cv_wait(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+
+ dialer_destroy(d);
+}
+
+// This function starts an exponential backoff timer for reconnecting.
+static void
+dialer_timer_start(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ if (d->d_closing) {
+ return;
+ }
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+static void
+dialer_timer_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_aio * aio = d->d_tmo_aio;
+
+ nni_mtx_lock(&d->d_mtx);
+ if (nni_aio_result(aio) == 0) {
+ dialer_connect_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_cb(void *arg)
+{
+ nni_dialer *d = arg;
+ nni_pipe * p;
+ nni_aio * aio = d->d_con_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ }
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_dialer(p, d);
+ nni_list_append(&d->d_pipes, p);
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+
+ // Good connect, so reset the backoff timer.
+ // Note that a host that accepts the connect, but drops
+ // us immediately, is going to get hit pretty hard
+ // (depending on the initial backoff) with no
+ // exponential backoff. This can happen if we wind up
+ // trying to connect to some port that does not speak
+ // SP for example.
+ d->d_currtime = d->d_inirtime;
+
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // redial, but only if we are not synchronous
+ if (!d->d_synch) {
+ dialer_timer_start(d);
+ }
+ break;
+ }
+ if (d->d_synch) {
+ if (rv != 0) {
+ d->d_started = false;
+ }
+ d->d_lastrv = rv;
+ d->d_synch = false;
+ nni_cv_wake(&d->d_cv);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+static void
+dialer_connect_start(nni_dialer *d)
+{
+ nni_aio *aio = d->d_con_aio;
+
+ // Call with the Endpoint lock held.
+ if (d->d_closing) {
+ return;
+ }
+
+ d->d_ops.d_connect(d->d_data, aio);
+}
+
+int
+nni_dialer_start(nni_dialer *d, int flags)
+{
+ int rv = 0;
+
+ nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime);
+ d->d_currtime = d->d_inirtime;
+
+ nni_mtx_lock(&d->d_mtx);
+
+ if (d->d_closing) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ECLOSED);
+ }
+
+ if (d->d_started) {
+ nni_mtx_unlock(&d->d_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((flags & NNG_FLAG_NONBLOCK) != 0) {
+ d->d_started = true;
+ dialer_connect_start(d);
+ nni_mtx_unlock(&d->d_mtx);
+ return (0);
+ }
+
+ d->d_synch = true;
+ d->d_started = true;
+ dialer_connect_start(d);
+
+ while (d->d_synch && !d->d_closing) {
+ nni_cv_wait(&d->d_cv);
+ }
+ rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
+ nni_cv_wake(&d->d_cv);
+
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+}
+
+void
+nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
+{
+ if (d == NULL) {
+ return;
+ }
+
+ // Break up the relationship between the dialer and the pipe.
+ nni_mtx_lock(&d->d_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&d->d_pipes, p)) {
+ nni_list_remove(&d->d_pipes, p);
+ }
+ // Wake up the close thread if it is waiting.
+ if (d->d_closed) {
+ if (nni_list_empty(&d->d_pipes)) {
+ nni_cv_wake(&d->d_cv);
+ }
+ } else {
+ // If this pipe closed, then lets restart the dial operation.
+ // Since the remote side seems to have closed, lets start with
+ // a backoff. This keeps us from pounding the crap out of the
+ // thing if a remote server accepts but then disconnects
+ // immediately.
+ dialer_timer_start(d);
+ }
+ nni_mtx_unlock(&d->d_mtx);
+}
+
+int
+nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
+ nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_set(d->d_data, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_dialer_getopt(
+ nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = d->d_ops.d_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&d->d_mtx);
+ rv = o->o_get(d->d_data, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
+}
+
+void
+nni_dialer_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_dialer, d_node);
+}
diff --git a/src/core/dialer.h b/src/core/dialer.h
new file mode 100644
index 00000000..56b0fb1b
--- /dev/null
+++ b/src/core/dialer.h
@@ -0,0 +1,32 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_DIALER_H
+#define CORE_DIALER_H
+
+extern int nni_dialer_sys_init(void);
+extern void nni_dialer_sys_fini(void);
+extern int nni_dialer_find(nni_dialer **, uint32_t);
+extern int nni_dialer_hold(nni_dialer *);
+extern void nni_dialer_rele(nni_dialer *);
+extern uint32_t nni_dialer_id(nni_dialer *);
+extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
+extern int nni_dialer_shutdown(nni_dialer *);
+extern void nni_dialer_close(nni_dialer *);
+extern int nni_dialer_start(nni_dialer *, int);
+extern void nni_dialer_list_init(nni_list *);
+extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *);
+
+extern int nni_dialer_setopt(
+ nni_dialer *, const char *, const void *, size_t, nni_opt_type);
+extern int nni_dialer_getopt(
+ nni_dialer *, const char *, void *, size_t *, nni_opt_type);
+
+#endif // CORE_DIALER_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
deleted file mode 100644
index 8e678fb0..00000000
--- a/src/core/endpt.c
+++ /dev/null
@@ -1,665 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-struct nni_ep {
- nni_tran_ep_ops ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint64_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- nni_url * ep_url;
- int ep_mode;
- int ep_refcnt;
- bool ep_started;
- bool ep_closed; // full shutdown
- bool ep_closing; // close pending (waiting on refcnt)
- bool ep_tmo_run;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio * ep_acc_aio;
- nni_aio * ep_con_aio;
- nni_aio * ep_con_syn; // used for sync connect
- nni_aio * ep_tmo_aio; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
-};
-
-// Functionality related to end points.
-
-static void nni_ep_acc_start(nni_ep *);
-static void nni_ep_acc_cb(void *);
-static void nni_ep_con_start(nni_ep *);
-static void nni_ep_con_cb(void *);
-static void nni_ep_tmo_start(nni_ep *);
-static void nni_ep_tmo_cb(void *);
-
-static nni_idhash *nni_eps;
-static nni_mtx nni_ep_lk;
-
-int
-nni_ep_sys_init(void)
-{
- int rv;
-
- if ((rv = nni_idhash_init(&nni_eps)) != 0) {
- return (rv);
- }
- nni_mtx_init(&nni_ep_lk);
- nni_idhash_set_limits(
- nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
-
- return (0);
-}
-
-void
-nni_ep_sys_fini(void)
-{
- nni_mtx_fini(&nni_ep_lk);
- nni_idhash_fini(nni_eps);
- nni_eps = NULL;
-}
-
-uint32_t
-nni_ep_id(nni_ep *ep)
-{
- return ((uint32_t) ep->ep_id);
-}
-
-static void
-nni_ep_destroy(nni_ep *ep)
-{
- if (ep == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (ep->ep_id != 0) {
- nni_idhash_remove(nni_eps, ep->ep_id);
- }
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_sock_ep_remove(ep->ep_sock, ep);
-
- nni_aio_fini(ep->ep_acc_aio);
- nni_aio_fini(ep->ep_con_aio);
- nni_aio_fini(ep->ep_con_syn);
- nni_aio_fini(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_data != NULL) {
- ep->ep_ops.ep_fini(ep->ep_data);
- }
- nni_mtx_unlock(&ep->ep_mtx);
- nni_cv_fini(&ep->ep_cv);
- nni_mtx_fini(&ep->ep_mtx);
- nni_url_free(ep->ep_url);
- NNI_FREE_STRUCT(ep);
-}
-
-static int
-nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode)
-{
- nni_tran *tran;
- nni_ep * ep;
- int rv;
- nni_url * url;
-
- if ((rv = nni_url_parse(&url, urlstr)) != 0) {
- return (rv);
- }
- if ((tran = nni_tran_find(url)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOTSUP);
- }
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- nni_url_free(url);
- return (NNG_ENOMEM);
- }
- ep->ep_url = url;
- ep->ep_closed = false;
- ep->ep_closing = false;
- ep->ep_started = false;
- ep->ep_data = NULL;
- ep->ep_refcnt = 1;
- ep->ep_sock = s;
- ep->ep_tran = tran;
- ep->ep_mode = mode;
-
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- ep->ep_ops = *tran->tran_ep;
-
- NNI_LIST_NODE_INIT(&ep->ep_node);
-
- nni_pipe_ep_list_init(&ep->ep_pipes);
-
- nni_mtx_init(&ep->ep_mtx);
- nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
-
- if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) ||
- ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
- ((rv = nni_sock_ep_add(s, ep)) != 0)) {
- nni_ep_destroy(ep);
- return (rv);
- }
-
- *epp = ep;
- return (0);
-}
-
-int
-nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL));
-}
-
-int
-nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr)
-{
- return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN));
-}
-
-int
-nni_ep_find(nni_ep **epp, uint32_t id)
-{
- int rv;
- nni_ep *ep;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
- nni_mtx_lock(&nni_ep_lk);
- if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) {
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- *epp = ep;
- }
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-int
-nni_ep_hold(nni_ep *ep)
-{
- int rv;
- nni_mtx_lock(&nni_ep_lk);
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- ep->ep_refcnt++;
- rv = 0;
- }
- nni_mtx_unlock(&nni_ep_lk);
- return (rv);
-}
-
-void
-nni_ep_rele(nni_ep *ep)
-{
- nni_mtx_lock(&nni_ep_lk);
- ep->ep_refcnt--;
- if (ep->ep_closing) {
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&nni_ep_lk);
-}
-
-int
-nni_ep_shutdown(nni_ep *ep)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- ep->ep_closing = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- // Abort any remaining in-flight operations.
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- // Stop the underlying transport.
- ep->ep_ops.ep_close(ep->ep_data);
-
- return (0);
-}
-
-void
-nni_ep_close(nni_ep *ep)
-{
- nni_pipe *p;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
- return;
- }
- ep->ep_closed = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_shutdown(ep);
-
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
-
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) {
- nni_cv_wait(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_ep_destroy(ep);
-}
-
-static void
-nni_ep_tmo_cancel(nni_aio *aio, int rv)
-{
- nni_ep *ep = nni_aio_get_prov_data(aio);
- // The only way this ever gets "finished", is via cancellation.
- if (ep != NULL) {
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_tmo_run) {
- nni_aio_finish_error(aio, rv);
- }
- ep->ep_tmo_run = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
-}
-
-static void
-nni_ep_tmo_start(nni_ep *ep)
-{
- nni_duration backoff;
- int rv;
-
- if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
- return;
- }
- backoff = ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
-
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
-
- nni_aio_set_timeout(
- ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
-
- if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
- 0) {
- nni_aio_finish_error(ep->ep_tmo_aio, rv);
- }
-
- ep->ep_tmo_run = true;
-}
-
-static void
-nni_ep_tmo_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_tmo_aio;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
- if (ep->ep_mode == NNI_EP_MODE_DIAL) {
- nni_ep_con_start(ep);
- } else {
- nni_ep_acc_start(ep);
- }
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_con_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- ep->ep_currtime = ep->ep_inirtime;
-
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
- break;
- default:
- // Other errors involve the use of the backoff timer.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_con_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_con_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
-
- ep->ep_ops.ep_connect(ep->ep_data, aio);
-}
-
-int
-nni_ep_dial(nni_ep *ep, int flags)
-{
- int rv = 0;
- nni_aio *aio;
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
-
- if (ep->ep_mode != NNI_EP_MODE_DIAL) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
-
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- ep->ep_started = true;
- nni_ep_con_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
- }
-
- // Synchronous mode: so we have to wait for it to complete.
- aio = ep->ep_con_syn;
- ep->ep_ops.ep_connect(ep->ep_data, aio);
- ep->ep_started = true;
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_aio_wait(aio);
-
- // As we're synchronous, we also have to handle the completion.
- if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) {
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_started = false;
- nni_mtx_unlock(&ep->ep_mtx);
- }
- return (rv);
-}
-
-static void
-nni_ep_acc_cb(void *arg)
-{
- nni_ep * ep = arg;
- nni_aio *aio = ep->ep_acc_aio;
- int rv;
-
- if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL);
- rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- switch (rv) {
- case 0:
- nni_ep_acc_start(ep);
- break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled or closed, no further action.
- break;
- case NNG_ECONNABORTED:
- case NNG_ECONNRESET:
- // These are remote conditions, no cool down.
- nni_ep_acc_start(ep);
- break;
- default:
- // We don't really know why we failed, but we backoff
- // here. This is because errors here are probably due
- // to system failures (resource exhaustion) and we hope
- // by not thrashing we give the system a chance to
- // recover.
- nni_ep_tmo_start(ep);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-static void
-nni_ep_acc_start(nni_ep *ep)
-{
- nni_aio *aio = ep->ep_acc_aio;
-
- // Call with the Endpoint lock held.
- if (ep->ep_closing) {
- return;
- }
- ep->ep_ops.ep_accept(ep->ep_data, aio);
-}
-
-int
-nni_ep_listen(nni_ep *ep, int flags)
-{
- int rv = 0;
- NNI_ARG_UNUSED(flags);
-
- nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
- ep->ep_currtime = ep->ep_inirtime;
-
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ENOTSUP);
- }
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
- }
-
- rv = ep->ep_ops.ep_bind(ep->ep_data);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- ep->ep_started = true;
- nni_ep_acc_start(ep);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (0);
-}
-
-int
-nni_ep_pipe_add(nni_ep *ep, nni_pipe *p)
-{
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closing) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- nni_list_append(&ep->ep_pipes, p);
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
-}
-
-void
-nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
-{
- // Break up the relationship between the EP and the pipe.
- nni_mtx_lock(&ep->ep_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&ep->ep_pipes, pipe)) {
- nni_list_remove(&ep->ep_pipes, pipe);
- }
- // Wake up the close thread if it is waiting.
- if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wake(&ep->ep_cv);
- }
-
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) {
- nni_ep_tmo_start(ep);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
-int
-nni_ep_setopt(
- nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t)
-{
- nni_tran_option *o;
-
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (NNG_EREADONLY);
- }
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
-
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_set == NULL) {
- return (NNG_EREADONLY);
- }
-
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_set(ep->ep_data, val, sz, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- return (NNG_ENOTSUP);
-}
-
-int
-nni_ep_mode(nni_ep *ep)
-{
- return (ep->ep_mode);
-}
-
-int
-nni_ep_getopt(
- nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t)
-{
- nni_tran_option *o;
-
- for (o = ep->ep_ops.ep_options; o && o->o_name; o++) {
- int rv;
- if (strcmp(o->o_name, name) != 0) {
- continue;
- }
- if (o->o_get == NULL) {
- return (NNG_EWRITEONLY);
- }
- nni_mtx_lock(&ep->ep_mtx);
- rv = o->o_get(ep->ep_data, valp, szp, t);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- // We provide a fallback on the URL, but let the implementation
- // override. This allows the URL to be created with wildcards,
- // that are resolved later.
- if (strcmp(name, NNG_OPT_URL) == 0) {
- return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t));
- }
-
- return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t));
-}
-
-void
-nni_ep_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_ep, ep_node);
-}
-
-nni_tran *
-nni_ep_tran(nni_ep *ep)
-{
- return (ep->ep_tran);
-}
-
-nni_sock *
-nni_ep_sock(nni_ep *ep)
-{
- return (ep->ep_sock);
-}
diff --git a/src/core/endpt.h b/src/core/endpt.h
deleted file mode 100644
index bf251d41..00000000
--- a/src/core/endpt.h
+++ /dev/null
@@ -1,45 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#ifndef CORE_ENDPT_H
-#define CORE_ENDPT_H
-
-extern int nni_ep_sys_init(void);
-extern void nni_ep_sys_fini(void);
-extern nni_tran *nni_ep_tran(nni_ep *);
-extern nni_sock *nni_ep_sock(nni_ep *);
-extern int nni_ep_find(nni_ep **, uint32_t);
-extern int nni_ep_hold(nni_ep *);
-extern void nni_ep_rele(nni_ep *);
-extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_shutdown(nni_ep *);
-extern void nni_ep_close(nni_ep *);
-extern int nni_ep_dial(nni_ep *, int);
-extern int nni_ep_listen(nni_ep *, int);
-extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
-extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
-extern int nni_ep_mode(nni_ep *);
-
-extern int nni_ep_setopt(
- nni_ep *, const char *, const void *, size_t, nni_opt_type);
-extern int nni_ep_getopt(
- nni_ep *, const char *, void *, size_t *, nni_opt_type);
-
-// Endpoint modes. Currently used by transports. Remove this when we make
-// transport dialers and listeners explicit.
-enum nni_ep_mode {
- NNI_EP_MODE_DIAL = 1,
- NNI_EP_MODE_LISTEN = 2,
-};
-
-#endif // CORE_ENDPT_H
diff --git a/src/core/init.c b/src/core/init.c
index c1b7bbac..30a7a547 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -33,7 +33,8 @@ nni_init_helper(void)
((rv = nni_aio_sys_init()) != 0) ||
((rv = nni_random_sys_init()) != 0) ||
((rv = nni_sock_sys_init()) != 0) ||
- ((rv = nni_ep_sys_init()) != 0) ||
+ ((rv = nni_listener_sys_init()) != 0) ||
+ ((rv = nni_dialer_sys_init()) != 0) ||
((rv = nni_pipe_sys_init()) != 0) ||
((rv = nni_proto_sys_init()) != 0) ||
((rv = nni_tran_sys_init()) != 0)) {
@@ -71,7 +72,8 @@ nni_fini(void)
nni_tran_sys_fini();
nni_proto_sys_fini();
nni_pipe_sys_fini();
- nni_ep_sys_fini();
+ nni_dialer_sys_fini();
+ nni_listener_sys_fini();
nni_sock_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_random_sys_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
new file mode 100644
index 00000000..6d580cd8
--- /dev/null
+++ b/src/core/listener.c
@@ -0,0 +1,443 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct nni_listener {
+ nni_tran_listener_ops l_ops; // transport ops
+ nni_tran * l_tran; // transport pointer
+ void * l_data; // transport private
+ uint64_t l_id; // endpoint id
+ nni_list_node l_node; // per socket list
+ nni_sock * l_sock;
+ nni_url * l_url;
+ int l_refcnt;
+ bool l_started;
+ bool l_closed; // full shutdown
+ bool l_closing; // close pending (waiting on refcnt)
+ nni_mtx l_mtx;
+ nni_cv l_cv;
+ nni_list l_pipes;
+ nni_aio * l_acc_aio;
+ nni_aio * l_tmo_aio;
+};
+
+// Functionality related to listeners.
+
+static void listener_accept_start(nni_listener *);
+static void listener_accept_cb(void *);
+static void listener_timer_cb(void *);
+
+static nni_idhash *listeners;
+static nni_mtx listeners_lk;
+
+int
+nni_listener_sys_init(void)
+{
+ int rv;
+
+ if ((rv = nni_idhash_init(&listeners)) != 0) {
+ return (rv);
+ }
+ nni_mtx_init(&listeners_lk);
+ nni_idhash_set_limits(
+ listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff);
+
+ return (0);
+}
+
+void
+nni_listener_sys_fini(void)
+{
+ nni_mtx_fini(&listeners_lk);
+ nni_idhash_fini(listeners);
+ listeners = NULL;
+}
+
+uint32_t
+nni_listener_id(nni_listener *l)
+{
+ return ((uint32_t) l->l_id);
+}
+
+static void
+listener_destroy(nni_listener *l)
+{
+ if (l == NULL) {
+ return;
+ }
+
+ // Remove us from the table so we cannot be found.
+ if (l->l_id != 0) {
+ nni_idhash_remove(listeners, l->l_id);
+ }
+
+ nni_aio_stop(l->l_acc_aio);
+
+ nni_sock_remove_listener(l->l_sock, l);
+
+ nni_aio_fini(l->l_acc_aio);
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_data != NULL) {
+ l->l_ops.l_fini(l->l_data);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+ nni_cv_fini(&l->l_cv);
+ nni_mtx_fini(&l->l_mtx);
+ nni_url_free(l->l_url);
+ NNI_FREE_STRUCT(l);
+}
+
+int
+nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
+{
+ nni_tran * tran;
+ nni_listener *l;
+ int rv;
+ nni_url * url;
+
+ if ((rv = nni_url_parse(&url, urlstr)) != 0) {
+ return (rv);
+ }
+ if (((tran = nni_tran_find(url)) == NULL) ||
+ (tran->tran_listener == NULL)) {
+ nni_url_free(url);
+ return (NNG_ENOTSUP);
+ }
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ nni_url_free(url);
+ return (NNG_ENOMEM);
+ }
+ l->l_url = url;
+ l->l_closed = false;
+ l->l_closing = false;
+ l->l_started = false;
+ l->l_data = NULL;
+ l->l_refcnt = 1;
+ l->l_sock = s;
+ l->l_tran = tran;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ l->l_ops = *tran->tran_listener;
+
+ NNI_LIST_NODE_INIT(&l->l_node);
+
+ nni_pipe_ep_list_init(&l->l_pipes);
+
+ nni_mtx_init(&l->l_mtx);
+ nni_cv_init(&l->l_cv, &l->l_mtx);
+
+ if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
+ ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
+ ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) ||
+ ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) ||
+ ((rv = nni_sock_add_listener(s, l)) != 0)) {
+ listener_destroy(l);
+ return (rv);
+ }
+
+ *lp = l;
+ return (0);
+}
+
+int
+nni_listener_find(nni_listener **lp, uint32_t id)
+{
+ int rv;
+ nni_listener *l;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&listeners_lk);
+ if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) {
+ if (l->l_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ l->l_refcnt++;
+ *lp = l;
+ }
+ }
+ nni_mtx_unlock(&listeners_lk);
+ return (rv);
+}
+
+int
+nni_listener_hold(nni_listener *l)
+{
+ int rv;
+ nni_mtx_lock(&listeners_lk);
+ if (l->l_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ l->l_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&listeners_lk);
+ return (rv);
+}
+
+void
+nni_listener_rele(nni_listener *l)
+{
+ nni_mtx_lock(&listeners_lk);
+ l->l_refcnt--;
+ if (l->l_closing) {
+ nni_cv_wake(&l->l_cv);
+ }
+ nni_mtx_unlock(&listeners_lk);
+}
+
+int
+nni_listener_shutdown(nni_listener *l)
+{
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ECLOSED);
+ }
+ l->l_closing = true;
+ nni_mtx_unlock(&l->l_mtx);
+
+ // Abort any remaining in-flight accepts.
+ nni_aio_close(l->l_acc_aio);
+ nni_aio_close(l->l_tmo_aio);
+
+ // Stop the underlying transport.
+ l->l_ops.l_close(l->l_data);
+
+ return (0);
+}
+
+void
+nni_listener_close(nni_listener *l)
+{
+ nni_pipe *p;
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closed) {
+ nni_mtx_unlock(&l->l_mtx);
+ nni_listener_rele(l);
+ return;
+ }
+ l->l_closed = true;
+ nni_mtx_unlock(&l->l_mtx);
+
+ nni_listener_shutdown(l);
+
+ nni_aio_stop(l->l_acc_aio);
+ nni_aio_stop(l->l_tmo_aio);
+
+ nni_mtx_lock(&l->l_mtx);
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) {
+ nni_cv_wait(&l->l_cv);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+
+ listener_destroy(l);
+}
+
+static void
+listener_timer_cb(void *arg)
+{
+ nni_listener *l = arg;
+ nni_aio * aio = l->l_tmo_aio;
+
+ nni_mtx_lock(&l->l_mtx);
+ if (nni_aio_result(aio) == 0) {
+ listener_accept_start(l);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+static void
+listener_accept_cb(void *arg)
+{
+ nni_listener *l = arg;
+ nni_pipe * p;
+ nni_aio * aio = l->l_acc_aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) == 0) {
+ void *data = nni_aio_get_output(aio, 0);
+ NNI_ASSERT(data != NULL);
+ rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
+ }
+
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+
+ nni_mtx_lock(&l->l_mtx);
+ switch (rv) {
+ case 0:
+ nni_pipe_set_listener(p, l);
+ nni_list_append(&l->l_pipes, p);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ nni_pipe_stop(p);
+ return;
+ }
+ listener_accept_start(l);
+ break;
+ case NNG_ECONNABORTED: // remote condition, no cooldown
+ case NNG_ECONNRESET: // remote condition, no cooldown
+ listener_accept_start(l);
+ break;
+ case NNG_ECLOSED: // no further action
+ case NNG_ECANCELED: // no further action
+ break;
+ default:
+ // We don't really know why we failed, but we backoff
+ // here. This is because errors here are probably due
+ // to system failures (resource exhaustion) and we hope
+ // by not thrashing we give the system a chance to
+ // recover. 100 msec is enough to cool down.
+ nni_sleep_aio(100, l->l_tmo_aio);
+ break;
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+static void
+listener_accept_start(nni_listener *l)
+{
+ nni_aio *aio = l->l_acc_aio;
+
+ // Call with the listener lock held.
+ if (l->l_closing) {
+ return;
+ }
+ l->l_ops.l_accept(l->l_data, aio);
+}
+
+int
+nni_listener_start(nni_listener *l, int flags)
+{
+ int rv = 0;
+ NNI_ARG_UNUSED(flags);
+
+ nni_mtx_lock(&l->l_mtx);
+ if (l->l_closing) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->l_started) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((rv = l->l_ops.l_bind(l->l_data)) != 0) {
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ l->l_started = true;
+ listener_accept_start(l);
+ nni_mtx_unlock(&l->l_mtx);
+
+ return (0);
+}
+
+void
+nni_listener_remove_pipe(nni_listener *l, nni_pipe *p)
+{
+ if (l == NULL) {
+ return;
+ }
+ // Break up relationship between listener and pipe.
+ nni_mtx_lock(&l->l_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&l->l_pipes, p)) {
+ nni_list_remove(&l->l_pipes, p);
+ }
+ // Wake up the closer if it is waiting.
+ if (l->l_closed && nni_list_empty(&l->l_pipes)) {
+ nni_cv_wake(&l->l_cv);
+ }
+ nni_mtx_unlock(&l->l_mtx);
+}
+
+int
+nni_listener_setopt(nni_listener *l, const char *name, const void *val,
+ size_t sz, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
+ }
+
+ for (o = l->l_ops.l_options; o && o->o_name; o++) {
+ int rv;
+
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ return (NNG_EREADONLY);
+ }
+
+ nni_mtx_lock(&l->l_mtx);
+ rv = o->o_set(l->l_data, val, sz, t);
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_listener_getopt(
+ nni_listener *l, const char *name, void *valp, size_t *szp, nni_opt_type t)
+{
+ nni_tran_option *o;
+
+ for (o = l->l_ops.l_options; o && o->o_name; o++) {
+ int rv;
+ if (strcmp(o->o_name, name) != 0) {
+ continue;
+ }
+ if (o->o_get == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&l->l_mtx);
+ rv = o->o_get(l->l_data, valp, szp, t);
+ nni_mtx_unlock(&l->l_mtx);
+ return (rv);
+ }
+
+ // We provide a fallback on the URL, but let the implementation
+ // override. This allows the URL to be created with wildcards,
+ // that are resolved later.
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (nni_copyout_str(l->l_url->u_rawurl, valp, szp, t));
+ }
+
+ return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
+}
+
+void
+nni_listener_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_listener, l_node);
+}
diff --git a/src/core/listener.h b/src/core/listener.h
new file mode 100644
index 00000000..41b1a678
--- /dev/null
+++ b/src/core/listener.h
@@ -0,0 +1,33 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_LISTENER_H
+#define CORE_LISTENER_H
+
+extern int nni_listener_sys_init(void);
+extern void nni_listener_sys_fini(void);
+extern int nni_listener_find(nni_listener **, uint32_t);
+extern int nni_listener_hold(nni_listener *);
+extern void nni_listener_rele(nni_listener *);
+extern uint32_t nni_listener_id(nni_listener *);
+extern int nni_listener_create(nni_listener **, nni_sock *, const char *);
+extern int nni_listener_shutdown(nni_listener *);
+extern void nni_listener_close(nni_listener *);
+extern int nni_listener_start(nni_listener *, int);
+extern void nni_listener_list_init(nni_list *);
+extern int nni_listener_add_pipe(nni_listener *, nni_pipe *);
+extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *);
+
+extern int nni_listener_setopt(
+ nni_listener *, const char *, const void *, size_t, nni_opt_type);
+extern int nni_listener_getopt(
+ nni_listener *, const char *, void *, size_t *, nni_opt_type);
+
+#endif // CORE_LISTENER_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index fdb2ce94..9af12720 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -52,7 +52,8 @@
// These have to come after the others - particularly transport.h
-#include "core/endpt.h"
+#include "core/dialer.h"
+#include "core/listener.h"
#include "core/pipe.h"
#include "core/socket.h"
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 93fbae99..a42cdeff 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -26,7 +26,8 @@ struct nni_pipe {
nni_list_node p_sock_node;
nni_list_node p_ep_node;
nni_sock * p_sock;
- nni_ep * p_ep;
+ nni_listener * p_listener;
+ nni_dialer * p_dialer;
bool p_closed;
bool p_stop;
bool p_cbs;
@@ -98,7 +99,7 @@ nni_pipe_sys_fini(void)
}
}
-static void
+void
nni_pipe_destroy(nni_pipe *p)
{
bool cbs;
@@ -126,9 +127,8 @@ nni_pipe_destroy(nni_pipe *p)
// We have exclusive access at this point, so we can check if
// we are still on any lists.
- if (nni_list_node_active(&p->p_ep_node)) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
+ nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL
+ nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL
if (nni_list_node_active(&p->p_sock_node)) {
nni_sock_pipe_remove(p->p_sock, p);
@@ -303,12 +303,10 @@ nni_pipe_start_cb(void *arg)
}
int
-nni_pipe_create(nni_ep *ep, void *tdata)
+nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
nni_pipe * p;
int rv;
- nni_tran * tran = nni_ep_tran(ep);
- nni_sock * sock = nni_ep_sock(ep);
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
@@ -324,7 +322,6 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
p->p_proto_data = NULL;
- p->p_ep = ep;
p->p_sock = sock;
p->p_closed = false;
p->p_stop = false;
@@ -348,16 +345,27 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
if ((rv != 0) ||
- ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
- ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
+ ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_destroy(p);
return (rv);
}
+ *pp = p;
return (0);
}
+void
+nni_pipe_set_listener(nni_pipe *p, nni_listener *l)
+{
+ p->p_listener = l;
+}
+
+void
+nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d)
+{
+ p->p_dialer = d;
+}
+
int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
@@ -371,7 +379,13 @@ nni_pipe_getopt(
return (o->o_get(p->p_tran_data, val, szp, t));
}
// Maybe the endpoint knows?
- return (nni_ep_getopt(p->p_ep, name, val, szp, t));
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
+ }
+ if (p->p_listener != NULL) {
+ return (nni_listener_getopt(p->p_listener, name, val, szp, t));
+ }
+ return (NNG_ENOTSUP);
}
void
@@ -409,15 +423,20 @@ nni_pipe_sock_id(nni_pipe *p)
}
uint32_t
-nni_pipe_ep_id(nni_pipe *p)
+nni_pipe_listener_id(nni_pipe *p)
{
- return (nni_ep_id(p->p_ep));
+ if (p->p_listener != NULL) {
+ return (nni_listener_id(p->p_listener));
+ }
+ return (0);
}
-
-int
-nni_pipe_ep_mode(nni_pipe *p)
+uint32_t
+nni_pipe_dialer_id(nni_pipe *p)
{
- return (nni_ep_mode(p->p_ep));
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_id(p->p_dialer));
+ }
+ return (0);
}
static void
diff --git a/src/core/pipe.h b/src/core/pipe.h
index bd66d4ed..5c505514 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -29,6 +29,10 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *);
// Pipe operations that protocols use.
extern uint32_t nni_pipe_id(nni_pipe *);
+// nni_pipe_destroy destroys a pipe -- there must not be any other
+// references to it; this is used only during creation failures.
+extern void nni_pipe_destroy(nni_pipe *);
+
// nni_pipe_close closes the underlying transport for the pipe. Further
// operations against will return NNG_ECLOSED.
extern void nni_pipe_close(nni_pipe *);
@@ -48,7 +52,9 @@ extern void nni_pipe_stop(nni_pipe *);
// endpoint, grabbing each of those locks. The function takes ownership of
// the transport specific pipe (3rd argument), regardless of whether it
// succeeds or not. The endpoint should be held when calling this.
-extern int nni_pipe_create(nni_ep *, void *);
+extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *);
+extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *);
+extern void nni_pipe_set_listener(nni_pipe *, nni_listener *);
// nni_pipe_start is called by the socket to begin any startup activities
// on the pipe before making it ready for use by protocols. For example,
@@ -82,11 +88,11 @@ extern int nni_pipe_find(nni_pipe **, uint32_t);
// nni_pipe_sock_id returns the socket id for the pipe (used by public API).
extern uint32_t nni_pipe_sock_id(nni_pipe *);
-// nni_pipe_ep_id returns the endpoint id for the pipe.
-extern uint32_t nni_pipe_ep_id(nni_pipe *);
+// nni_pipe_listener_id returns the listener id for the pipe (or 0 if none).
+extern uint32_t nni_pipe_listener_id(nni_pipe *);
-// nni_pipe_ep_mode returns the endpoint mode for the pipe.
-extern int nni_pipe_ep_mode(nni_pipe *);
+// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none).
+extern uint32_t nni_pipe_dialer_id(nni_pipe *);
// nni_pipe_closed returns true if nni_pipe_close was called.
// (This is used by the socket to determine if user closed the pipe
diff --git a/src/core/socket.c b/src/core/socket.c
index 4cf624e2..894e4fee 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -82,9 +82,10 @@ struct nni_socket {
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // active pipes
- nni_list s_ctxs; // active contexts (protected by global sock_lk)
+ nni_list s_listeners; // active listeners
+ nni_list s_dialers; // active dialers
+ nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global sock_lk)
bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
@@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
nni_pipe_sock_list_init(&s->s_pipes);
- nni_ep_list_init(&s->s_eps);
+ nni_listener_list_init(&s->s_listeners);
+ nni_dialer_list_init(&s->s_dialers);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
int
nni_sock_shutdown(nni_sock *sock)
{
- nni_pipe *pipe;
- nni_ep * ep;
- nni_ep * nep;
- nni_ctx * ctx;
- nni_ctx * nctx;
+ nni_pipe * pipe;
+ nni_dialer * d;
+ nni_dialer * nd;
+ nni_listener *l;
+ nni_listener *nl;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_shutdown(ep);
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
+ nni_listener_shutdown(l);
}
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
+ nni_dialer_shutdown(d);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the endpoint list, attempting to close them.
+ // Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nep = nni_list_first(&sock->s_eps);
- while ((ep = nep) != NULL) {
- nep = nni_list_next(&sock->s_eps, nep);
+ nl = nni_list_first(&sock->s_listeners);
+ while ((l = nl) != NULL) {
+ nl = nni_list_next(&sock->s_listeners, nl);
+
+ if (nni_listener_hold(l) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
+ }
+ }
+ nd = nni_list_first(&sock->s_dialers);
+ while ((d = nd) != NULL) {
+ nd = nni_list_next(&sock->s_dialers, nd);
- if (nni_ep_hold(ep) == 0) {
+ if (nni_dialer_hold(d) == 0) {
nni_mtx_unlock(&sock->s_mx);
- nni_ep_close(ep);
+ nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
}
@@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock)
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_eps))) {
+ (!nni_list_empty(&sock->s_listeners)) ||
+ (!nni_list_empty(&sock->s_dialers))) {
nni_cv_wait(&sock->s_cv);
}
@@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s)
// Wait for pipes, eps, and contexts to finish closing.
nni_mtx_lock(&s->s_mx);
- while (
- (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
+ while ((!nni_list_empty(&s->s_pipes)) ||
+ (!nni_list_empty(&s->s_dialers)) ||
+ (!nni_list_empty(&s->s_listeners))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_ep_add(nni_sock *s, nni_ep *ep)
+nni_sock_add_listener(nni_sock *s, nni_listener *l)
+{
+ nni_sockopt *sopt;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ int rv;
+ rv = nni_listener_setopt(
+ l, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+ }
+
+ nni_list_append(&s->s_listeners, l);
+ nni_mtx_unlock(&s->s_mx);
+ return (0);
+}
+
+int
+nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
{
nni_sockopt *sopt;
@@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep)
NNI_LIST_FOREACH (&s->s_options, sopt) {
int rv;
- rv = nni_ep_setopt(
- ep, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ rv = nni_dialer_setopt(
+ d, sopt->name, sopt->data, sopt->sz, sopt->typ);
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
- nni_list_append(&s->s_eps, ep);
+ nni_list_append(&s->s_dialers, d);
nni_mtx_unlock(&s->s_mx);
return (0);
}
void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
+nni_sock_remove_listener(nni_sock *s, nni_listener *l)
{
- nni_mtx_lock(&sock->s_mx);
- if (nni_list_active(&sock->s_eps, ep)) {
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_listeners, l)) {
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
}
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
+{
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_dialers, d)) {
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+ }
+ nni_mtx_unlock(&s->s_mx);
}
int
@@ -948,7 +1007,8 @@ nni_sock_setopt(
nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t)
{
int rv = NNG_ENOTSUP;
- nni_ep * ep;
+ nni_dialer * d;
+ nni_listener * l;
nni_sockopt * optv;
nni_sockopt * oldv = NULL;
const sock_option * sso;
@@ -1042,9 +1102,20 @@ nni_sock_setopt(
// transport (other than ENOTSUP) stops the operation
// altogether. Its important that transport wide checks
// properly pre-validate.
- NNI_LIST_FOREACH (&s->s_eps, ep) {
+ NNI_LIST_FOREACH (&s->s_listeners, l) {
+ int x;
+ x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
+ if (x != NNG_ENOTSUP) {
+ if ((rv = x) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (rv);
+ }
+ }
+ }
+ NNI_LIST_FOREACH (&s->s_dialers, d) {
int x;
- x = nni_ep_setopt(ep, optv->name, optv->data, sz, t);
+ x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
if (x != NNG_ENOTSUP) {
if ((rv = x) != 0) {
nni_mtx_unlock(&s->s_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index 7c10b195..184cfb64 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -46,8 +46,11 @@ extern uint32_t nni_sock_id(nni_sock *);
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-extern int nni_sock_ep_add(nni_sock *, nni_ep *);
-extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
+
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
+extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
diff --git a/src/core/transport.c b/src/core/transport.c
index 4733b6bd..8485d048 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -118,12 +118,28 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ)
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
- const nni_tran_ep_ops *ep;
- const nni_tran_option *o;
+ const nni_tran_dialer_ops * dops;
+ const nni_tran_listener_ops *lops;
+ const nni_tran_option * o;
+
+ // Generally we look for endpoint options. We check both
+ // dialers and listeners.
+ dops = t->t_tran.tran_dialer;
+ for (o = dops->d_options; o && o->o_name != NULL; o++) {
+ if (strcmp(name, o->o_name) != 0) {
+ continue;
+ }
+ if (o->o_set == NULL) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (NNG_EREADONLY);
+ }
- // Generally we look for endpoint options.
- ep = t->t_tran.tran_ep;
- for (o = ep->ep_options; o && o->o_name != NULL; o++) {
+ rv = (o->o_chk != NULL) ? o->o_chk(v, sz, typ) : 0;
+ nni_mtx_unlock(&nni_tran_lk);
+ return (rv);
+ }
+ lops = t->t_tran.tran_listener;
+ for (o = lops->l_options; o && o->o_name != NULL; o++) {
if (strcmp(name, o->o_name) != 0) {
continue;
}
diff --git a/src/core/transport.h b/src/core/transport.h
index e45aa7ec..257d232d 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -11,30 +11,11 @@
#ifndef CORE_TRANSPORT_H
#define CORE_TRANSPORT_H
-// Transport implementation details. Transports must implement the
-// interfaces in this file.
-struct nni_tran {
- // tran_version is the version of the transport ops that this
- // transport implements. We only bother to version the main
- // ops vector.
- uint32_t tran_version;
-
- // tran_scheme is the transport scheme, such as "tcp" or "inproc".
- const char *tran_scheme;
-
- // tran_ep links our endpoint-specific operations.
- const nni_tran_ep_ops *tran_ep;
-
- // tran_pipe links our pipe-specific operations.
- const nni_tran_pipe_ops *tran_pipe;
-
- // tran_init, if not NULL, is called once during library
- // initialization.
- int (*tran_init)(void);
-
- // tran_fini, if not NULL, is called during library deinitialization.
- // It should release any global resources, close any open files, etc.
- void (*tran_fini)(void);
+// Endpoint modes. Currently used by transports. Remove this when we make
+// transport dialers and listeners explicit.
+enum nni_ep_mode {
+ NNI_EP_MODE_DIAL = 1,
+ NNI_EP_MODE_LISTEN = 2,
};
// We quite intentionally use a signature where the upper word is nonzero,
@@ -48,7 +29,8 @@ struct nni_tran {
#define NNI_TRANSPORT_V0 0x54520000
#define NNI_TRANSPORT_V1 0x54520001
#define NNI_TRANSPORT_V2 0x54520002
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V2
+#define NNI_TRANSPORT_V3 0x54520003
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3
// Option handlers.
struct nni_tran_option {
@@ -81,40 +63,60 @@ struct nni_tran_option {
// For a given endpoint, the framework holds a lock so that each entry
// point is run exclusively of the others. (Transports must still guard
// against any asynchronous operations they manage themselves, though.)
-struct nni_tran_ep_ops {
- // ep_init creates a vanilla endpoint. The value created is
- // used for the first argument for all other endpoint
- // functions.
- int (*ep_init)(void **, nni_url *, nni_sock *, int);
- // ep_fini frees the resources associated with the endpoint.
- // The endpoint will already have been closed.
- void (*ep_fini)(void *);
+struct nni_tran_dialer_ops {
+ // d_init creates a vanilla dialer. The value created is
+ // used for the first argument for all other dialer functions.
+ int (*d_init)(void **, nni_url *, nni_sock *);
+
+ // d_fini frees the resources associated with the dialer.
+ // The dialer will already have been closed.
+ void (*d_fini)(void *);
- // ep_connect establishes a connection. It can return errors
+ // d_connect establishes a connection. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR,
// NNG_ECONNFAILED, NNG_ETIMEDOUT, and NNG_EPROTO.
- void (*ep_connect)(void *, nni_aio *);
+ void (*d_connect)(void *, nni_aio *);
+
+ // d_close stops the dialer from operating altogether. It
+ // does not affect pipes that have already been created. It is
+ // nonblocking.
+ void (*d_close)(void *);
+
+ // d_options is an array of dialer options. The final
+ // element must have a NULL name. If this member is NULL, then
+ // no dialer specific options are available.
+ nni_tran_option *d_options;
+};
+
+struct nni_tran_listener_ops {
+ // l_init creates a vanilla listener. The value created is
+ // used for the first argument for all other listener functions.
+ int (*l_init)(void **, nni_url *, nni_sock *);
- // ep_bind just does the bind() and listen() work,
+ // l_fini frees the resources associated with the listener.
+ // The listener will already have been closed.
+ void (*l_fini)(void *);
+
+ // l_bind just does the bind() and listen() work,
// reserving the address but not creating any connections.
// It should return NNG_EADDRINUSE if the address is already
// taken. It can also return NNG_EBADADDR for an unsuitable
// address, or NNG_EACCESS for permission problems.
- int (*ep_bind)(void *);
+ int (*l_bind)(void *);
- // ep_accept accepts an inbound connection.
- void (*ep_accept)(void *, nni_aio *);
+ // l_accept accepts an inbound connection.
+ void (*l_accept)(void *, nni_aio *);
- // ep_close stops the endpoint from operating altogether. It
+ // l_close stops the listener from operating altogether. It
// does not affect pipes that have already been created. It is
// nonblocking.
- void (*ep_close)(void *);
+ void (*l_close)(void *);
- // ep_options is an array of endpoint options. The final
+ // l_options is an array of listener options. The final
// element must have a NULL name. If this member is NULL, then
- // no transport specific options are available.
- nni_tran_option *ep_options;
+ // no dialer specific options are available.
+ nni_tran_option *l_options;
};
// Pipe operations are entry points called by the socket. These may be
@@ -168,6 +170,35 @@ struct nni_tran_pipe_ops {
nni_tran_option *p_options;
};
+// Transport implementation details. Transports must implement the
+// interfaces in this file.
+struct nni_tran {
+ // tran_version is the version of the transport ops that this
+ // transport implements. We only bother to version the main
+ // ops vector.
+ uint32_t tran_version;
+
+ // tran_scheme is the transport scheme, such as "tcp" or "inproc".
+ const char *tran_scheme;
+
+ // tran_dialer links our dialer-specific operations.
+ const nni_tran_dialer_ops *tran_dialer;
+
+ // tran_listener links our listener-specific operations.
+ const nni_tran_listener_ops *tran_listener;
+
+ // tran_pipe links our pipe-specific operations.
+ const nni_tran_pipe_ops *tran_pipe;
+
+ // tran_init, if not NULL, is called once during library
+ // initialization.
+ int (*tran_init)(void);
+
+ // tran_fini, if not NULL, is called during library deinitialization.
+ // It should release any global resources, close any open files, etc.
+ void (*tran_fini)(void);
+};
+
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(nni_url *);
diff --git a/src/nng.c b/src/nng.c
index acdabe69..f7f16954 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -316,7 +316,7 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio)
}
static int
-nng_ctx_getx(nng_ctx id, const char *n, void *v, size_t *szp, int t)
+nng_ctx_getx(nng_ctx id, const char *n, void *v, size_t *szp, nni_opt_type t)
{
nni_ctx *ctx;
int rv;
@@ -367,7 +367,8 @@ nng_ctx_getopt_ms(nng_ctx id, const char *name, nng_duration *vp)
}
static int
-nng_ctx_setx(nng_ctx id, const char *n, const void *v, size_t sz, int t)
+nng_ctx_setx(
+ nng_ctx id, const char *n, const void *v, size_t sz, nni_opt_type t)
{
nni_ctx *ctx;
int rv;
@@ -414,97 +415,97 @@ nng_ctx_setopt_ms(nng_ctx id, const char *name, nng_duration v)
}
int
-nng_dial(nng_socket s, const char *addr, nng_dialer *dp, int flags)
+nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
- nni_ep * ep;
- int rv;
- nni_sock *sock;
+ nni_dialer *d;
+ int rv;
+ nni_sock * s;
- if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ if ((rv = nni_sock_find(&s, sid.id)) != 0) {
return (rv);
}
- if ((rv = nni_ep_create_dialer(&ep, sock, addr)) != 0) {
- nni_sock_rele(sock);
+ if ((rv = nni_dialer_create(&d, s, addr)) != 0) {
+ nni_sock_rele(s);
return (rv);
}
- if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_sock_rele(sock);
+ if ((rv = nni_dialer_start(d, flags)) != 0) {
+ nni_dialer_close(d);
+ nni_sock_rele(s);
return (rv);
}
if (dp != NULL) {
- nng_dialer d;
- d.id = nni_ep_id(ep);
- *dp = d;
+ nng_dialer did;
+ did.id = nni_dialer_id(d);
+ *dp = did;
}
- nni_ep_rele(ep);
- nni_sock_rele(sock);
+ nni_dialer_rele(d);
+ nni_sock_rele(s);
return (0);
}
int
-nng_listen(nng_socket s, const char *addr, nng_listener *lp, int flags)
+nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
{
- nni_ep * ep;
- int rv;
- nni_sock *sock;
+ int rv;
+ nni_sock * s;
+ nni_listener *l;
- if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ if ((rv = nni_sock_find(&s, sid.id)) != 0) {
return (rv);
}
- if ((rv = nni_ep_create_listener(&ep, sock, addr)) != 0) {
- nni_sock_rele(sock);
+ if ((rv = nni_listener_create(&l, s, addr)) != 0) {
+ nni_sock_rele(s);
return (rv);
}
- if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_sock_rele(sock);
+ if ((rv = nni_listener_start(l, flags)) != 0) {
+ nni_listener_close(l);
+ nni_sock_rele(s);
return (rv);
}
if (lp != NULL) {
- nng_listener l;
- l.id = nni_ep_id(ep);
- *lp = l;
+ nng_listener lid;
+ lid.id = nni_listener_id(l);
+ *lp = lid;
}
- nni_ep_rele(ep);
- nni_sock_rele(sock);
+ nni_listener_rele(l);
+ nni_sock_rele(s);
return (rv);
}
int
-nng_listener_create(nng_listener *lp, nng_socket s, const char *addr)
+nng_listener_create(nng_listener *lp, nng_socket sid, const char *addr)
{
- nni_sock * sock;
- nni_ep * ep;
- int rv;
- nng_listener l;
+ nni_sock * s;
+ int rv;
+ nni_listener *l;
+ nng_listener lid;
- if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ if ((rv = nni_sock_find(&s, sid.id)) != 0) {
return (rv);
}
- if ((rv = nni_ep_create_listener(&ep, sock, addr)) != 0) {
- nni_sock_rele(sock);
+ if ((rv = nni_listener_create(&l, s, addr)) != 0) {
+ nni_sock_rele(s);
return (rv);
}
- l.id = nni_ep_id(ep);
- *lp = l;
- nni_ep_rele(ep);
- nni_sock_rele(sock);
+ lid.id = nni_listener_id(l);
+ *lp = lid;
+ nni_listener_rele(l);
+ nni_sock_rele(s);
return (0);
}
int
-nng_listener_start(nng_listener l, int flags)
+nng_listener_start(nng_listener lid, int flags)
{
- nni_ep *ep;
- int rv;
+ nni_listener *l;
+ int rv;
- if ((rv = nni_ep_find(&ep, l.id)) != 0) {
+ if ((rv = nni_listener_find(&l, lid.id)) != 0) {
return (rv);
}
- rv = nni_ep_listen(ep, flags);
- nni_ep_rele(ep);
+ rv = nni_listener_start(l, flags);
+ nni_listener_rele(l);
return (rv);
}
@@ -515,38 +516,38 @@ nng_listener_id(nng_listener l)
}
int
-nng_dialer_create(nng_dialer *dp, nng_socket s, const char *addr)
+nng_dialer_create(nng_dialer *dp, nng_socket sid, const char *addr)
{
- nni_sock * sock;
- nni_ep * ep;
- int rv;
- nng_dialer d;
+ nni_sock * s;
+ nni_dialer *d;
+ int rv;
+ nng_dialer did;
- if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ if ((rv = nni_sock_find(&s, sid.id)) != 0) {
return (rv);
}
- if ((rv = nni_ep_create_dialer(&ep, sock, addr)) != 0) {
- nni_sock_rele(sock);
+ if ((rv = nni_dialer_create(&d, s, addr)) != 0) {
+ nni_sock_rele(s);
return (rv);
}
- d.id = nni_ep_id(ep);
- *dp = d;
- nni_ep_rele(ep);
- nni_sock_rele(sock);
+ did.id = nni_dialer_id(d);
+ *dp = did;
+ nni_dialer_rele(d);
+ nni_sock_rele(s);
return (0);
}
int
-nng_dialer_start(nng_dialer d, int flags)
+nng_dialer_start(nng_dialer did, int flags)
{
- nni_ep *ep;
- int rv;
+ nni_dialer *d;
+ int rv;
- if ((rv = nni_ep_find(&ep, d.id)) != 0) {
+ if ((rv = nni_dialer_find(&d, did.id)) != 0) {
return (rv);
}
- rv = nni_ep_dial(ep, flags);
- nni_ep_rele(ep);
+ rv = nni_dialer_start(d, flags);
+ nni_dialer_rele(d);
return (rv);
}
@@ -557,54 +558,41 @@ nng_dialer_id(nng_dialer d)
}
static int
-nng_ep_setx(
- uint32_t id, const char *n, const void *v, size_t sz, int mode, int t)
+nng_dialer_setx(
+ nng_dialer did, const char *n, const void *v, size_t sz, nni_opt_type t)
{
- nni_ep *ep;
- int rv;
+ nni_dialer *d;
+ int rv;
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((rv = nni_ep_find(&ep, id)) != 0) {
+ if ((rv = nni_dialer_find(&d, did.id)) != 0) {
return (rv);
}
- if (nni_ep_mode(ep) == mode) {
- rv = nni_ep_setopt(ep, n, v, sz, t);
- } else {
- rv = NNG_ENOENT;
- }
- nni_ep_rele(ep);
+ rv = nni_dialer_setopt(d, n, v, sz, t);
+ nni_dialer_rele(d);
return (rv);
}
static int
-nng_ep_getx(uint32_t id, const char *n, void *v, size_t *szp, int mode, int t)
+nng_dialer_getx(
+ nng_dialer did, const char *n, void *v, size_t *szp, nni_opt_type t)
{
- nni_ep *ep;
- int rv;
+ nni_dialer *d;
+ int rv;
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((rv = nni_ep_find(&ep, id)) != 0) {
+ if ((rv = nni_dialer_find(&d, did.id)) != 0) {
return (rv);
}
- if (nni_ep_mode(ep) == mode) {
- rv = nni_ep_getopt(ep, n, v, szp, t);
- } else {
- rv = NNG_ENOENT;
- }
- nni_ep_rele(ep);
+ rv = nni_dialer_getopt(d, n, v, szp, t);
+ nni_dialer_rele(d);
return (rv);
}
-static int
-nng_dialer_setx(nng_dialer d, const char *nm, const void *v, size_t sz, int t)
-{
- return (nng_ep_setx(d.id, nm, v, sz, NNI_EP_MODE_DIAL, t));
-}
-
int
nng_dialer_setopt(nng_dialer d, const char *name, const void *v, size_t sz)
{
@@ -653,12 +641,6 @@ nng_dialer_setopt_string(nng_dialer d, const char *name, const char *v)
return (nng_dialer_setx(d, name, v, strlen(v) + 1, NNI_TYPE_STRING));
}
-static int
-nng_dialer_getx(nng_dialer d, const char *n, void *v, size_t *szp, int t)
-{
- return (nng_ep_getx(d.id, n, v, szp, NNI_EP_MODE_DIAL, t));
-}
-
int
nng_dialer_getopt(nng_dialer d, const char *name, void *val, size_t *szp)
{
@@ -722,10 +704,21 @@ nng_dialer_getopt_ms(nng_dialer d, const char *name, nng_duration *vp)
}
int
-nng_listener_setx(
- nng_listener l, const char *name, const void *v, size_t sz, int t)
+nng_listener_setx(nng_listener lid, const char *name, const void *v, size_t sz,
+ nni_opt_type t)
{
- return (nng_ep_setx(l.id, name, v, sz, NNI_EP_MODE_LISTEN, t));
+ nni_listener *l;
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_listener_find(&l, lid.id)) != 0) {
+ return (rv);
+ }
+ rv = nni_listener_setopt(l, name, v, sz, t);
+ nni_listener_rele(l);
+ return (rv);
}
int
@@ -778,9 +771,20 @@ nng_listener_setopt_string(nng_listener l, const char *n, const char *v)
int
nng_listener_getx(
- nng_listener l, const char *name, void *v, size_t *szp, int t)
+ nng_listener lid, const char *name, void *v, size_t *szp, nni_opt_type t)
{
- return (nng_ep_getx(l.id, name, v, szp, NNI_EP_MODE_LISTEN, t));
+ nni_listener *l;
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_listener_find(&l, lid.id)) != 0) {
+ return (rv);
+ }
+ rv = nni_listener_getopt(l, name, v, szp, t);
+ nni_listener_rele(l);
+ return (rv);
}
int
@@ -846,38 +850,35 @@ nng_listener_getopt_ms(nng_listener l, const char *name, nng_duration *vp)
return (nng_listener_getx(l, name, vp, &sz, NNI_TYPE_DURATION));
}
-static int
-nng_ep_close(uint32_t id, int mode)
+int
+nng_dialer_close(nng_dialer did)
{
- nni_ep *ep;
- int rv;
+ nni_dialer *d;
+ int rv;
- if ((rv = nni_ep_find(&ep, id)) != 0) {
+ if ((rv = nni_dialer_find(&d, did.id)) != 0) {
return (rv);
}
- if (nni_ep_mode(ep) != mode) {
- nni_ep_rele(ep);
- return (NNG_ENOENT);
- }
-
- nni_ep_close(ep);
+ nni_dialer_close(d);
return (0);
}
int
-nng_dialer_close(nng_dialer d)
+nng_listener_close(nng_listener lid)
{
- return (nng_ep_close(d.id, NNI_EP_MODE_DIAL));
-}
+ nni_listener *l;
+ int rv;
-int
-nng_listener_close(nng_listener l)
-{
- return (nng_ep_close(l.id, NNI_EP_MODE_LISTEN));
+ if ((rv = nni_listener_find(&l, lid.id)) != 0) {
+ return (rv);
+ }
+ nni_listener_close(l);
+ return (0);
}
static int
-nng_setx(nng_socket s, const char *name, const void *val, size_t sz, int t)
+nng_setx(
+ nng_socket s, const char *name, const void *val, size_t sz, nni_opt_type t)
{
nni_sock *sock;
int rv;
@@ -900,7 +901,8 @@ nng_setopt(nng_socket s, const char *name, const void *val, size_t sz)
}
static int
-nng_getx(nng_socket s, const char *name, void *val, size_t *szp, int t)
+nng_getx(
+ nng_socket s, const char *name, void *val, size_t *szp, nni_opt_type t)
{
nni_sock *sock;
int rv;
@@ -1130,7 +1132,8 @@ nng_strerror(int num)
}
static int
-nng_pipe_getx(nng_pipe p, const char *name, void *val, size_t *szp, int t)
+nng_pipe_getx(
+ nng_pipe p, const char *name, void *val, size_t *szp, nni_opt_type t)
{
int rv;
nni_pipe *pipe;
@@ -1227,9 +1230,7 @@ nng_pipe_dialer(nng_pipe p)
nng_dialer d = NNG_DIALER_INITIALIZER;
nni_pipe * pipe;
if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
- if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_DIAL) {
- d.id = nni_pipe_ep_id(pipe);
- }
+ d.id = nni_pipe_dialer_id(pipe);
nni_pipe_rele(pipe);
}
return (d);
@@ -1241,9 +1242,7 @@ nng_pipe_listener(nng_pipe p)
nng_listener l = NNG_LISTENER_INITIALIZER;
nni_pipe * pipe;
if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
- if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_LISTEN) {
- l.id = nni_pipe_ep_id(pipe);
- }
+ l.id = nni_pipe_listener_id(pipe);
nni_pipe_rele(pipe);
}
return (l);
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 7a52d89f..db8aeff5 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -48,7 +48,7 @@ struct nni_inproc_pair {
struct nni_inproc_ep {
const char * addr;
- int mode;
+ bool listener;
nni_list_node node;
uint16_t proto;
nni_cv cv;
@@ -189,7 +189,7 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static int
-nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
{
nni_inproc_ep *ep;
@@ -197,8 +197,26 @@ nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_ENOMEM);
}
- ep->mode = mode;
- ep->proto = nni_sock_proto_id(sock);
+ ep->listener = false;
+ ep->proto = nni_sock_proto_id(sock);
+ NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
+ nni_aio_list_init(&ep->aios);
+
+ ep->addr = url->u_rawurl; // we match on the full URL.
+ *epp = ep;
+ return (0);
+}
+static int
+nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ nni_inproc_ep *ep;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ ep->listener = true;
+ ep->proto = nni_sock_proto_id(sock);
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -222,8 +240,7 @@ nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe)
nni_aio_list_remove(aio);
- if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) &&
- nni_list_empty(&ep->aios)) {
+ if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) {
nni_list_node_remove(&ep->node);
}
@@ -354,10 +371,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
if (nni_aio_begin(aio) != 0) {
return;
}
- if (ep->mode != NNI_EP_MODE_DIAL) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- return;
- }
+
nni_mtx_lock(&nni_inproc.mx);
// Find a server.
@@ -468,25 +482,33 @@ static nni_tran_option nni_inproc_ep_options[] = {
},
};
-static nni_tran_ep_ops nni_inproc_ep_ops = {
- .ep_init = nni_inproc_ep_init,
- .ep_fini = nni_inproc_ep_fini,
- .ep_connect = nni_inproc_ep_connect,
- .ep_bind = nni_inproc_ep_bind,
- .ep_accept = nni_inproc_ep_accept,
- .ep_close = nni_inproc_ep_close,
- .ep_options = nni_inproc_ep_options,
+static nni_tran_dialer_ops nni_inproc_dialer_ops = {
+ .d_init = nni_inproc_dialer_init,
+ .d_fini = nni_inproc_ep_fini,
+ .d_connect = nni_inproc_ep_connect,
+ .d_close = nni_inproc_ep_close,
+ .d_options = nni_inproc_ep_options,
+};
+
+static nni_tran_listener_ops nni_inproc_listener_ops = {
+ .l_init = nni_inproc_listener_init,
+ .l_fini = nni_inproc_ep_fini,
+ .l_bind = nni_inproc_ep_bind,
+ .l_accept = nni_inproc_ep_accept,
+ .l_close = nni_inproc_ep_close,
+ .l_options = nni_inproc_ep_options,
};
// This is the inproc transport linkage, and should be the only global
// symbol in this entire file.
struct nni_tran nni_inproc_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "inproc",
- .tran_ep = &nni_inproc_ep_ops,
- .tran_pipe = &nni_inproc_pipe_ops,
- .tran_init = nni_inproc_init,
- .tran_fini = nni_inproc_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "inproc",
+ .tran_dialer = &nni_inproc_dialer_ops,
+ .tran_listener = &nni_inproc_listener_ops,
+ .tran_pipe = &nni_inproc_pipe_ops,
+ .tran_init = nni_inproc_init,
+ .tran_fini = nni_inproc_fini,
};
int
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c5c7032a..b48b82d9 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -656,6 +656,18 @@ ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (0);
}
+static int
+ipc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+}
+
+static int
+ipc_listener_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+}
+
static void
ipc_ep_close(void *arg)
{
@@ -915,7 +927,26 @@ static nni_tran_pipe_ops ipc_pipe_ops = {
.p_options = ipc_pipe_options,
};
-static nni_tran_option ipc_ep_options[] = {
+static nni_tran_option ipc_dialer_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_type = NNI_TYPE_SIZE,
+ .o_get = ipc_ep_get_recvmaxsz,
+ .o_set = ipc_ep_set_recvmaxsz,
+ .o_chk = ipc_ep_chk_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ .o_type = NNI_TYPE_SOCKADDR,
+ .o_get = ipc_ep_get_addr,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_tran_option ipc_listener_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
@@ -948,23 +979,31 @@ static nni_tran_option ipc_ep_options[] = {
},
};
-static nni_tran_ep_ops ipc_ep_ops = {
- .ep_init = ipc_ep_init,
- .ep_fini = ipc_ep_fini,
- .ep_connect = ipc_ep_connect,
- .ep_bind = ipc_ep_bind,
- .ep_accept = ipc_ep_accept,
- .ep_close = ipc_ep_close,
- .ep_options = ipc_ep_options,
+static nni_tran_dialer_ops ipc_dialer_ops = {
+ .d_init = ipc_dialer_init,
+ .d_fini = ipc_ep_fini,
+ .d_connect = ipc_ep_connect,
+ .d_close = ipc_ep_close,
+ .d_options = ipc_dialer_options,
+};
+
+static nni_tran_listener_ops ipc_listener_ops = {
+ .l_init = ipc_listener_init,
+ .l_fini = ipc_ep_fini,
+ .l_bind = ipc_ep_bind,
+ .l_accept = ipc_ep_accept,
+ .l_close = ipc_ep_close,
+ .l_options = ipc_listener_options,
};
static nni_tran ipc_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "ipc",
- .tran_ep = &ipc_ep_ops,
- .tran_pipe = &ipc_pipe_ops,
- .tran_init = ipc_tran_init,
- .tran_fini = ipc_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ipc",
+ .tran_dialer = &ipc_dialer_ops,
+ .tran_listener = &ipc_listener_ops,
+ .tran_pipe = &ipc_pipe_ops,
+ .tran_init = ipc_tran_init,
+ .tran_fini = ipc_tran_fini,
};
int
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f23d5b3a..1a183ecd 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -56,7 +56,6 @@ struct tcp_ep {
nni_aio * user_aio;
nni_url * url;
nng_sockaddr bsa; // bound addr
- int mode;
nni_mtx mtx;
};
@@ -688,7 +687,6 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (rv);
}
ep->proto = nni_sock_proto_id(sock);
- ep->mode = mode;
ep->nodelay = true;
ep->keepalive = false;
@@ -696,6 +694,18 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (0);
}
+static int
+tcp_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+}
+
+static int
+tcp_listener_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+}
+
static void
tcp_ep_close(void *arg)
{
@@ -897,16 +907,21 @@ tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tcp_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tcp_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ tcp_ep *ep = arg;
+
+ return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
+}
+
+static int
+tcp_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tcp_ep *ep = arg;
char ustr[128];
char ipstr[48]; // max for IPv6 addresses including []
char portstr[6]; // max for 16-bit port
- if (ep->mode == NNI_EP_MODE_DIAL) {
- return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
- }
nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr);
snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr);
return (nni_copyout_str(ustr, v, szp, t));
@@ -957,7 +972,40 @@ static nni_tran_pipe_ops tcp_pipe_ops = {
.p_options = tcp_pipe_options,
};
-static nni_tran_option tcp_ep_options[] = {
+static nni_tran_option tcp_dialer_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_type = NNI_TYPE_SIZE,
+ .o_get = tcp_ep_get_recvmaxsz,
+ .o_set = tcp_ep_set_recvmaxsz,
+ .o_chk = tcp_ep_chk_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_URL,
+ .o_type = NNI_TYPE_STRING,
+ .o_get = tcp_dialer_get_url,
+ },
+ {
+ .o_name = NNG_OPT_TCP_NODELAY,
+ .o_type = NNI_TYPE_BOOL,
+ .o_get = tcp_ep_get_nodelay,
+ .o_set = tcp_ep_set_nodelay,
+ .o_chk = tcp_ep_chk_bool,
+ },
+ {
+ .o_name = NNG_OPT_TCP_KEEPALIVE,
+ .o_type = NNI_TYPE_BOOL,
+ .o_get = tcp_ep_get_keepalive,
+ .o_set = tcp_ep_set_keepalive,
+ .o_chk = tcp_ep_chk_bool,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_tran_option tcp_listener_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
@@ -968,7 +1016,7 @@ static nni_tran_option tcp_ep_options[] = {
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tcp_ep_get_url,
+ .o_get = tcp_listener_get_url,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
@@ -990,41 +1038,51 @@ static nni_tran_option tcp_ep_options[] = {
},
};
-static nni_tran_ep_ops tcp_ep_ops = {
- .ep_init = tcp_ep_init,
- .ep_fini = tcp_ep_fini,
- .ep_connect = tcp_ep_connect,
- .ep_bind = tcp_ep_bind,
- .ep_accept = tcp_ep_accept,
- .ep_close = tcp_ep_close,
- .ep_options = tcp_ep_options,
+static nni_tran_dialer_ops tcp_dialer_ops = {
+ .d_init = tcp_dialer_init,
+ .d_fini = tcp_ep_fini,
+ .d_connect = tcp_ep_connect,
+ .d_close = tcp_ep_close,
+ .d_options = tcp_dialer_options,
+};
+
+static nni_tran_listener_ops tcp_listener_ops = {
+ .l_init = tcp_listener_init,
+ .l_fini = tcp_ep_fini,
+ .l_bind = tcp_ep_bind,
+ .l_accept = tcp_ep_accept,
+ .l_close = tcp_ep_close,
+ .l_options = tcp_listener_options,
};
static nni_tran tcp_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tcp",
- .tran_ep = &tcp_ep_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tcp",
+ .tran_dialer = &tcp_dialer_ops,
+ .tran_listener = &tcp_listener_ops,
+ .tran_pipe = &tcp_pipe_ops,
+ .tran_init = tcp_tran_init,
+ .tran_fini = tcp_tran_fini,
};
static nni_tran tcp4_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tcp4",
- .tran_ep = &tcp_ep_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tcp4",
+ .tran_dialer = &tcp_dialer_ops,
+ .tran_listener = &tcp_listener_ops,
+ .tran_pipe = &tcp_pipe_ops,
+ .tran_init = tcp_tran_init,
+ .tran_fini = tcp_tran_fini,
};
static nni_tran tcp6_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tcp6",
- .tran_ep = &tcp_ep_ops,
- .tran_pipe = &tcp_pipe_ops,
- .tran_init = tcp_tran_init,
- .tran_fini = tcp_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tcp6",
+ .tran_dialer = &tcp_dialer_ops,
+ .tran_listener = &tcp_listener_ops,
+ .tran_pipe = &tcp_pipe_ops,
+ .tran_init = tcp_tran_init,
+ .tran_fini = tcp_tran_fini,
};
int
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 35f88e25..b4f555da 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -691,7 +691,6 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
nni_mtx_init(&ep->mtx);
ep->url = url;
- ep->mode = mode;
ep->keepalive = false;
ep->nodelay = true;
@@ -715,6 +714,18 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (0);
}
+static int
+tls_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (tls_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+}
+
+static int
+tls_listener_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (tls_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+}
+
static void
tls_ep_close(void *arg)
{
@@ -891,16 +902,21 @@ tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-tls_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+tls_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ tls_ep *ep = arg;
+
+ return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
+}
+
+static int
+tls_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t)
{
tls_ep *ep = arg;
char ustr[128];
char ipstr[48]; // max for IPv6 addresses including []
char portstr[6]; // max for 16-bit port
- if (ep->mode == NNI_EP_MODE_DIAL) {
- return (nni_copyout_str(ep->url->u_rawurl, v, szp, t));
- }
nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr);
snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr);
return (nni_copyout_str(ustr, v, szp, t));
@@ -1095,7 +1111,7 @@ static nni_tran_pipe_ops tls_pipe_ops = {
.p_options = tls_pipe_options,
};
-static nni_tran_option tls_ep_options[] = {
+static nni_tran_option tls_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
@@ -1106,7 +1122,7 @@ static nni_tran_option tls_ep_options[] = {
{
.o_name = NNG_OPT_URL,
.o_type = NNI_TYPE_STRING,
- .o_get = tls_ep_get_url,
+ .o_get = tls_dialer_get_url,
},
{
.o_name = NNG_OPT_TLS_CONFIG,
@@ -1159,41 +1175,115 @@ static nni_tran_option tls_ep_options[] = {
},
};
-static nni_tran_ep_ops tls_ep_ops = {
- .ep_init = tls_ep_init,
- .ep_fini = tls_ep_fini,
- .ep_connect = tls_ep_connect,
- .ep_bind = tls_ep_bind,
- .ep_accept = tls_ep_accept,
- .ep_close = tls_ep_close,
- .ep_options = tls_ep_options,
+static nni_tran_option tls_listener_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_type = NNI_TYPE_SIZE,
+ .o_get = tls_ep_get_recvmaxsz,
+ .o_set = tls_ep_set_recvmaxsz,
+ .o_chk = tls_ep_chk_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_URL,
+ .o_type = NNI_TYPE_STRING,
+ .o_get = tls_listener_get_url,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CONFIG,
+ .o_type = NNI_TYPE_POINTER,
+ .o_get = tls_ep_get_config,
+ .o_set = tls_ep_set_config,
+ .o_chk = tls_ep_chk_config,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CERT_KEY_FILE,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = tls_ep_set_cert_key_file,
+ .o_chk = tls_ep_chk_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CA_FILE,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = tls_ep_set_ca_file,
+ .o_chk = tls_ep_chk_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_AUTH_MODE,
+ .o_type = NNI_TYPE_INT32, // enum really
+ .o_set = tls_ep_set_auth_mode,
+ .o_chk = tls_ep_chk_auth_mode,
+ },
+ {
+ .o_name = NNG_OPT_TLS_SERVER_NAME,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = tls_ep_set_server_name,
+ .o_chk = tls_ep_chk_string,
+ },
+ {
+ .o_name = NNG_OPT_TCP_NODELAY,
+ .o_type = NNI_TYPE_BOOL,
+ .o_get = tls_ep_get_nodelay,
+ .o_set = tls_ep_set_nodelay,
+ .o_chk = tls_ep_chk_bool,
+ },
+ {
+ .o_name = NNG_OPT_TCP_KEEPALIVE,
+ .o_type = NNI_TYPE_BOOL,
+ .o_get = tls_ep_get_keepalive,
+ .o_set = tls_ep_set_keepalive,
+ .o_chk = tls_ep_chk_bool,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_tran_dialer_ops tls_dialer_ops = {
+ .d_init = tls_dialer_init,
+ .d_fini = tls_ep_fini,
+ .d_connect = tls_ep_connect,
+ .d_close = tls_ep_close,
+ .d_options = tls_dialer_options,
+};
+
+static nni_tran_listener_ops tls_listener_ops = {
+ .l_init = tls_listener_init,
+ .l_fini = tls_ep_fini,
+ .l_bind = tls_ep_bind,
+ .l_accept = tls_ep_accept,
+ .l_close = tls_ep_close,
+ .l_options = tls_listener_options,
};
static nni_tran tls_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tls+tcp",
- .tran_ep = &tls_ep_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tls+tcp",
+ .tran_dialer = &tls_dialer_ops,
+ .tran_listener = &tls_listener_ops,
+ .tran_pipe = &tls_pipe_ops,
+ .tran_init = tls_tran_init,
+ .tran_fini = tls_tran_fini,
};
static nni_tran tls4_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tls+tcp4",
- .tran_ep = &tls_ep_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tls+tcp4",
+ .tran_dialer = &tls_dialer_ops,
+ .tran_listener = &tls_listener_ops,
+ .tran_pipe = &tls_pipe_ops,
+ .tran_init = tls_tran_init,
+ .tran_fini = tls_tran_fini,
};
static nni_tran tls6_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "tls+tcp6",
- .tran_ep = &tls_ep_ops,
- .tran_pipe = &tls_pipe_ops,
- .tran_init = tls_tran_init,
- .tran_fini = tls_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "tls+tcp6",
+ .tran_dialer = &tls_dialer_ops,
+ .tran_listener = &tls_listener_ops,
+ .tran_pipe = &tls_pipe_ops,
+ .tran_init = tls_tran_init,
+ .tran_fini = tls_tran_fini,
};
int
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 19cc347c..2fa0fd67 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -21,8 +21,9 @@
#include "websocket.h"
-typedef struct ws_ep ws_ep;
-typedef struct ws_pipe ws_pipe;
+typedef struct ws_dialer ws_dialer;
+typedef struct ws_listener ws_listener;
+typedef struct ws_pipe ws_pipe;
typedef struct ws_hdr {
nni_list_node node;
@@ -30,26 +31,35 @@ typedef struct ws_hdr {
char * value;
} ws_hdr;
-struct ws_ep {
- int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+struct ws_dialer {
+ uint16_t lproto; // local protocol
+ uint16_t rproto; // remote protocol
+ size_t rcvmax;
+ char * prname;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_aio * connaio;
+ nni_ws_dialer *dialer;
+ nni_list headers; // req headers
+ bool started;
+};
+
+struct ws_listener {
uint16_t lproto; // local protocol
uint16_t rproto; // remote protocol
size_t rcvmax;
- char * protoname;
+ char * prname;
nni_list aios;
nni_mtx mtx;
- nni_aio * connaio;
nni_aio * accaio;
nni_ws_listener *listener;
- nni_ws_dialer * dialer;
- nni_list headers; // to send, res or req
+ nni_list headers; // res headers
bool started;
};
struct ws_pipe {
- int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
nni_mtx mtx;
- size_t rcvmax; // inherited from EP
+ size_t rcvmax;
bool closed;
uint16_t rproto;
uint16_t lproto;
@@ -220,7 +230,7 @@ ws_pipe_close(void *arg)
}
static int
-ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
+ws_pipe_init(ws_pipe **pipep, void *ws)
{
ws_pipe *p;
int rv;
@@ -236,12 +246,7 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
ws_pipe_fini(p);
return (rv);
}
-
- p->mode = ep->mode;
- p->rcvmax = ep->rcvmax;
- p->rproto = ep->rproto;
- p->lproto = ep->lproto;
- p->ws = ws;
+ p->ws = ws;
*pipep = p;
return (0);
@@ -261,14 +266,14 @@ ws_pipe_peer(void *arg)
static int
ws_hook(void *arg, nni_http_req *req, nni_http_res *res)
{
- ws_ep * ep = arg;
- ws_hdr *h;
+ ws_listener *l = arg;
+ ws_hdr * h;
NNI_ARG_UNUSED(req);
// Eventually we'll want user customizable hooks.
// For now we just set the headers we want.
- NNI_LIST_FOREACH (&ep->headers, h) {
+ NNI_LIST_FOREACH (&l->headers, h) {
int rv;
rv = nng_http_res_set_header(res, h->name, h->value);
if (rv != 0) {
@@ -279,38 +284,38 @@ ws_hook(void *arg, nni_http_req *req, nni_http_res *res)
}
static int
-ws_ep_bind(void *arg)
+ws_listener_bind(void *arg)
{
- ws_ep *ep = arg;
- int rv;
+ ws_listener *l = arg;
+ int rv;
- nni_ws_listener_set_maxframe(ep->listener, ep->rcvmax);
- nni_ws_listener_hook(ep->listener, ws_hook, ep);
+ nni_ws_listener_set_maxframe(l->listener, l->rcvmax);
+ nni_ws_listener_hook(l->listener, ws_hook, l);
- if ((rv = nni_ws_listener_listen(ep->listener)) == 0) {
- ep->started = true;
+ if ((rv = nni_ws_listener_listen(l->listener)) == 0) {
+ l->started = true;
}
return (rv);
}
static void
-ws_ep_cancel(nni_aio *aio, int rv)
+ws_listener_cancel(nni_aio *aio, int rv)
{
- ws_ep *ep = nni_aio_get_prov_data(aio);
+ ws_listener *l = nni_aio_get_prov_data(aio);
- nni_mtx_lock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_unlock(&l->mtx);
}
static void
-ws_ep_accept(void *arg, nni_aio *aio)
+ws_listener_accept(void *arg, nni_aio *aio)
{
- ws_ep *ep = arg;
- int rv;
+ ws_listener *l = arg;
+ int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -318,33 +323,46 @@ ws_ep_accept(void *arg, nni_aio *aio)
if (nni_aio_begin(aio) != 0) {
return;
}
- nni_mtx_lock(&ep->mtx);
- if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&l->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_listener_cancel, l)) != 0) {
+ nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- nni_list_append(&ep->aios, aio);
- if (aio == nni_list_first(&ep->aios)) {
- nni_ws_listener_accept(ep->listener, ep->accaio);
+ nni_list_append(&l->aios, aio);
+ if (aio == nni_list_first(&l->aios)) {
+ nni_ws_listener_accept(l->listener, l->accaio);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ws_dialer_cancel(nni_aio *aio, int rv)
+{
+ ws_dialer *d = nni_aio_get_prov_data(aio);
+
+ nni_mtx_lock(&d->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
}
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_unlock(&d->mtx);
}
static void
-ws_ep_connect(void *arg, nni_aio *aio)
+ws_dialer_connect(void *arg, nni_aio *aio)
{
- ws_ep *ep = arg;
- int rv;
+ ws_dialer *d = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
- if (!ep->started) {
+ if (!d->started) {
ws_hdr *h;
- NNI_LIST_FOREACH (&ep->headers, h) {
- int rv = nni_ws_dialer_header(
- ep->dialer, h->name, h->value);
+ NNI_LIST_FOREACH (&d->headers, h) {
+ int rv =
+ nni_ws_dialer_header(d->dialer, h->name, h->value);
if (rv != 0) {
nni_aio_finish_error(aio, rv);
return;
@@ -352,22 +370,22 @@ ws_ep_connect(void *arg, nni_aio *aio)
}
}
- nni_mtx_lock(&ep->mtx);
- if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_lock(&d->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_dialer_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
return;
}
- NNI_ASSERT(nni_list_empty(&ep->aios));
- ep->started = true;
- nni_list_append(&ep->aios, aio);
- nni_ws_dialer_set_maxframe(ep->dialer, ep->rcvmax);
- nni_ws_dialer_dial(ep->dialer, ep->connaio);
- nni_mtx_unlock(&ep->mtx);
+ NNI_ASSERT(nni_list_empty(&d->aios));
+ d->started = true;
+ nni_list_append(&d->aios, aio);
+ nni_ws_dialer_set_maxframe(d->dialer, d->rcvmax);
+ nni_ws_dialer_dial(d->dialer, d->connaio);
+ nni_mtx_unlock(&d->mtx);
}
static int
-ws_ep_chk_string(const void *v, size_t sz, nni_opt_type t)
+ws_check_string(const void *v, size_t sz, nni_opt_type t)
{
if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
return (NNG_EBADTYPE);
@@ -379,40 +397,59 @@ ws_ep_chk_string(const void *v, size_t sz, nni_opt_type t)
}
static int
-ws_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+ws_dialer_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep *ep = arg;
- size_t val;
- int rv;
+ ws_dialer *d = arg;
+ size_t val;
+ int rv;
if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
- nni_mtx_lock(&ep->mtx);
- ep->rcvmax = val;
- nni_mtx_unlock(&ep->mtx);
- if (ep->mode == NNI_EP_MODE_DIAL) {
- nni_ws_dialer_set_maxframe(ep->dialer, val);
- } else {
- nni_ws_listener_set_maxframe(ep->listener, val);
- }
+ nni_mtx_lock(&d->mtx);
+ d->rcvmax = val;
+ nni_mtx_unlock(&d->mtx);
+ nni_ws_dialer_set_maxframe(d->dialer, val);
}
return (rv);
}
static int
-ws_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
+ws_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
+ ws_dialer *d = arg;
+ return (nni_copyout_size(d->rcvmax, v, szp, t));
+}
+
+static int
+ws_listener_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ws_listener *l = arg;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->rcvmax = val;
+ nni_mtx_unlock(&l->mtx);
+ nni_ws_listener_set_maxframe(l->listener, val);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+{
+ ws_listener *l = arg;
+ return (nni_copyout_size(l->rcvmax, v, szp, t));
}
static int
-ws_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t)
+ws_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t)
{
- ws_ep *ep = arg;
- return (nni_copyout_size(ep->rcvmax, v, szp, t));
+ return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
}
static int
-ws_ep_set_headers(ws_ep *ep, const char *v)
+ws_set_headers(nni_list *headers, const char *v)
{
char * dupstr;
size_t duplen;
@@ -423,10 +460,6 @@ ws_ep_set_headers(ws_ep *ep, const char *v)
ws_hdr * h;
int rv;
- if (ep->started) {
- return (NNG_EBUSY);
- }
-
NNI_LIST_INIT(&l, ws_hdr, node);
if ((dupstr = nni_strdup(v)) == NULL) {
return (NNG_ENOMEM);
@@ -471,15 +504,15 @@ ws_ep_set_headers(ws_ep *ep, const char *v)
name = nl;
}
- while ((h = nni_list_first(&ep->headers)) != NULL) {
- nni_list_remove(&ep->headers, h);
+ while ((h = nni_list_first(headers)) != NULL) {
+ nni_list_remove(headers, h);
nni_strfree(h->name);
nni_strfree(h->value);
NNI_FREE_STRUCT(h);
}
while ((h = nni_list_first(&l)) != NULL) {
nni_list_remove(&l, h);
- nni_list_append(&ep->headers, h);
+ nni_list_append(headers, h);
}
rv = 0;
@@ -495,33 +528,32 @@ done:
}
static int
-ws_ep_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
+ws_dialer_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep *ep = arg;
- int rv;
+ ws_dialer *d = arg;
+ int rv;
- if ((rv = ws_ep_chk_string(v, sz, t)) == 0) {
- if (ep->mode == NNI_EP_MODE_LISTEN) {
- rv = NNG_EREADONLY;
- } else {
- rv = ws_ep_set_headers(ep, v);
- }
+ if (d->started) {
+ return (NNG_EBUSY);
+ }
+
+ if ((rv = ws_check_string(v, sz, t)) == 0) {
+ rv = ws_set_headers(&d->headers, v);
}
return (rv);
}
static int
-ws_ep_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
+ws_listener_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep *ep = arg;
- int rv;
+ ws_listener *l = arg;
+ int rv;
- if ((rv = ws_ep_chk_string(v, sz, t)) == 0) {
- if (ep->mode == NNI_EP_MODE_DIAL) {
- rv = NNG_EREADONLY;
- } else {
- rv = ws_ep_set_headers(ep, v);
- }
+ if (l->started) {
+ return (NNG_EBUSY);
+ }
+ if ((rv = ws_check_string(v, sz, t)) == 0) {
+ rv = ws_set_headers(&l->headers, v);
}
return (rv);
}
@@ -628,25 +660,39 @@ static nni_tran_pipe_ops ws_pipe_ops = {
.p_options = ws_pipe_options,
};
-static nni_tran_option ws_ep_options[] = {
+static nni_tran_option ws_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = ws_ep_get_recvmaxsz,
- .o_set = ws_ep_set_recvmaxsz,
- .o_chk = ws_ep_chk_recvmaxsz,
+ .o_get = ws_dialer_get_recvmaxsz,
+ .o_set = ws_dialer_set_recvmaxsz,
+ .o_chk = ws_check_recvmaxsz,
},
{
.o_name = NNG_OPT_WS_REQUEST_HEADERS,
.o_type = NNI_TYPE_STRING,
- .o_set = ws_ep_set_reqhdrs,
- .o_chk = ws_ep_chk_string,
+ .o_set = ws_dialer_set_reqhdrs,
+ .o_chk = ws_check_string,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_tran_option ws_listener_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_type = NNI_TYPE_SIZE,
+ .o_get = ws_listener_get_recvmaxsz,
+ .o_set = ws_listener_set_recvmaxsz,
+ .o_chk = ws_check_recvmaxsz,
},
{
.o_name = NNG_OPT_WS_RESPONSE_HEADERS,
.o_type = NNI_TYPE_STRING,
- .o_set = ws_ep_set_reshdrs,
- .o_chk = ws_ep_chk_string,
+ .o_set = ws_listener_set_reshdrs,
+ .o_chk = ws_check_string,
},
// terminate list
{
@@ -655,93 +701,117 @@ static nni_tran_option ws_ep_options[] = {
};
static void
-ws_ep_fini(void *arg)
+ws_dialer_fini(void *arg)
{
- ws_ep * ep = arg;
- ws_hdr *hdr;
+ ws_dialer *d = arg;
+ ws_hdr * hdr;
- nni_aio_stop(ep->accaio);
- nni_aio_stop(ep->connaio);
- if (ep->listener != NULL) {
- nni_ws_listener_fini(ep->listener);
+ nni_aio_stop(d->connaio);
+ if (d->dialer != NULL) {
+ nni_ws_dialer_fini(d->dialer);
}
- if (ep->dialer != NULL) {
- nni_ws_dialer_fini(ep->dialer);
- }
- nni_aio_fini(ep->accaio);
- nni_aio_fini(ep->connaio);
- while ((hdr = nni_list_first(&ep->headers)) != NULL) {
- nni_list_remove(&ep->headers, hdr);
+ nni_aio_fini(d->connaio);
+ while ((hdr = nni_list_first(&d->headers)) != NULL) {
+ nni_list_remove(&d->headers, hdr);
nni_strfree(hdr->name);
nni_strfree(hdr->value);
NNI_FREE_STRUCT(hdr);
}
- nni_strfree(ep->protoname);
- nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
+ nni_strfree(d->prname);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
}
static void
-ws_ep_conn_cb(void *arg)
+ws_listener_fini(void *arg)
{
- ws_ep * ep = arg;
- ws_pipe *p;
- nni_aio *caio = ep->connaio;
- nni_aio *uaio;
- int rv;
- nni_ws * ws = NULL;
+ ws_listener *l = arg;
+ ws_hdr * hdr;
+
+ nni_aio_stop(l->accaio);
+ if (l->listener != NULL) {
+ nni_ws_listener_fini(l->listener);
+ }
+ nni_aio_fini(l->accaio);
+ while ((hdr = nni_list_first(&l->headers)) != NULL) {
+ nni_list_remove(&l->headers, hdr);
+ nni_strfree(hdr->name);
+ nni_strfree(hdr->value);
+ NNI_FREE_STRUCT(hdr);
+ }
+ nni_strfree(l->prname);
+ nni_mtx_fini(&l->mtx);
+ NNI_FREE_STRUCT(l);
+}
- nni_mtx_lock(&ep->mtx);
+static void
+ws_connect_cb(void *arg)
+{
+ ws_dialer *d = arg;
+ ws_pipe * p;
+ nni_aio * caio = d->connaio;
+ nni_aio * uaio;
+ int rv;
+ nni_ws * ws = NULL;
+
+ nni_mtx_lock(&d->mtx);
if (nni_aio_result(caio) == 0) {
ws = nni_aio_get_output(caio, 0);
}
- if ((uaio = nni_list_first(&ep->aios)) == NULL) {
+ if ((uaio = nni_list_first(&d->aios)) == NULL) {
// The client stopped caring about this!
if (ws != NULL) {
nni_ws_fini(ws);
}
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_unlock(&d->mtx);
return;
}
nni_aio_list_remove(uaio);
- NNI_ASSERT(nni_list_empty(&ep->aios));
+ NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
- } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ } else if ((rv = ws_pipe_init(&p, ws)) != 0) {
nni_ws_fini(ws);
nni_aio_finish_error(uaio, rv);
} else {
+ p->rcvmax = d->rcvmax;
+ p->rproto = d->rproto;
+ p->lproto = d->lproto;
+
nni_aio_set_output(uaio, 0, p);
nni_aio_finish(uaio, 0, 0);
}
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_unlock(&d->mtx);
}
static void
-ws_ep_close(void *arg)
+ws_dialer_close(void *arg)
{
- ws_ep *ep = arg;
+ ws_dialer *d = arg;
- nni_aio_close(ep->accaio);
- nni_aio_close(ep->connaio);
+ nni_aio_close(d->connaio);
+ nni_ws_dialer_close(d->dialer);
+}
- if (ep->mode == NNI_EP_MODE_LISTEN) {
- nni_ws_listener_close(ep->listener);
- } else {
- nni_ws_dialer_close(ep->dialer);
- }
+static void
+ws_listener_close(void *arg)
+{
+ ws_listener *l = arg;
+
+ nni_aio_close(l->accaio);
+ nni_ws_listener_close(l->listener);
}
static void
-ws_ep_acc_cb(void *arg)
+ws_accept_cb(void *arg)
{
- ws_ep * ep = arg;
- nni_aio *aaio = ep->accaio;
- nni_aio *uaio;
- int rv;
+ ws_listener *l = arg;
+ nni_aio * aaio = l->accaio;
+ nni_aio * uaio;
+ int rv;
- nni_mtx_lock(&ep->mtx);
- uaio = nni_list_first(&ep->aios);
+ nni_mtx_lock(&l->mtx);
+ uaio = nni_list_first(&l->aios);
if ((rv = nni_aio_result(aaio)) != 0) {
if (uaio != NULL) {
nni_aio_list_remove(uaio);
@@ -753,72 +823,86 @@ ws_ep_acc_cb(void *arg)
ws_pipe *p;
// Make a pipe
nni_aio_list_remove(uaio);
- if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ if ((rv = ws_pipe_init(&p, ws)) != 0) {
nni_ws_close(ws);
nni_aio_finish_error(uaio, rv);
} else {
+ p->rcvmax = l->rcvmax;
+ p->rproto = l->rproto;
+ p->lproto = l->lproto;
+
nni_aio_set_output(uaio, 0, p);
nni_aio_finish(uaio, 0, 0);
}
}
}
- if (!nni_list_empty(&ep->aios)) {
- nni_ws_listener_accept(ep->listener, aaio);
+ if (!nni_list_empty(&l->aios)) {
+ nni_ws_listener_accept(l->listener, aaio);
}
- nni_mtx_unlock(&ep->mtx);
+ nni_mtx_unlock(&l->mtx);
}
static int
-ws_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
{
- ws_ep * ep;
- const char *pname;
+ ws_dialer * d;
+ const char *n;
int rv;
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->headers, ws_hdr, node);
+ nni_mtx_init(&d->mtx);
+ NNI_LIST_INIT(&d->headers, ws_hdr, node);
- // List of pipes (server only).
- nni_aio_list_init(&ep->aios);
+ nni_aio_list_init(&d->aios);
- ep->mode = mode;
- ep->lproto = nni_sock_proto_id(sock);
- ep->rproto = nni_sock_peer_id(sock);
+ d->lproto = nni_sock_proto_id(s);
+ d->rproto = nni_sock_peer_id(s);
+ n = nni_sock_peer_name(s);
- if (mode == NNI_EP_MODE_DIAL) {
- pname = nni_sock_peer_name(sock);
- rv = nni_ws_dialer_init(&ep->dialer, url);
- } else {
- pname = nni_sock_proto_name(sock);
- rv = nni_ws_listener_init(&ep->listener, url);
- }
-
- if ((rv != 0) ||
- ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) !=
- 0)) {
- ws_ep_fini(ep);
+ if (((rv = nni_ws_dialer_init(&d->dialer, url)) != 0) ||
+ ((rv = nni_aio_init(&d->connaio, ws_connect_cb, d)) != 0) ||
+ ((rv = nni_asprintf(&d->prname, "%s.sp.nanomsg.org", n)) != 0) ||
+ ((rv = nni_ws_dialer_proto(d->dialer, d->prname)) != 0)) {
+ ws_dialer_fini(d);
return (rv);
}
- if (mode == NNI_EP_MODE_DIAL) {
- rv = nni_ws_dialer_proto(ep->dialer, ep->protoname);
- } else {
- rv = nni_ws_listener_proto(ep->listener, ep->protoname);
+ *dp = d;
+ return (0);
+}
+
+static int
+ws_listener_init(void **lp, nni_url *url, nni_sock *sock)
+{
+ ws_listener *l;
+ const char * n;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
}
+ nni_mtx_init(&l->mtx);
+ NNI_LIST_INIT(&l->headers, ws_hdr, node);
+
+ nni_aio_list_init(&l->aios);
+
+ l->lproto = nni_sock_proto_id(sock);
+ l->rproto = nni_sock_peer_id(sock);
+ n = nni_sock_proto_name(sock);
- if (rv != 0) {
- ws_ep_fini(ep);
+ if (((rv = nni_ws_listener_init(&l->listener, url)) != 0) ||
+ ((rv = nni_aio_init(&l->accaio, ws_accept_cb, l)) != 0) ||
+ ((rv = nni_asprintf(&l->prname, "%s.sp.nanomsg.org", n)) != 0) ||
+ ((rv = nni_ws_listener_proto(l->listener, l->prname)) != 0)) {
+ ws_listener_fini(l);
return (rv);
}
-
- *epp = ep;
+ *lp = l;
return (0);
}
+
static int
ws_tran_init(void)
{
@@ -830,23 +914,31 @@ ws_tran_fini(void)
{
}
-static nni_tran_ep_ops ws_ep_ops = {
- .ep_init = ws_ep_init,
- .ep_fini = ws_ep_fini,
- .ep_connect = ws_ep_connect,
- .ep_bind = ws_ep_bind,
- .ep_accept = ws_ep_accept,
- .ep_close = ws_ep_close,
- .ep_options = ws_ep_options,
+static nni_tran_dialer_ops ws_dialer_ops = {
+ .d_init = ws_dialer_init,
+ .d_fini = ws_dialer_fini,
+ .d_connect = ws_dialer_connect,
+ .d_close = ws_dialer_close,
+ .d_options = ws_dialer_options,
+};
+
+static nni_tran_listener_ops ws_listener_ops = {
+ .l_init = ws_listener_init,
+ .l_fini = ws_listener_fini,
+ .l_bind = ws_listener_bind,
+ .l_accept = ws_listener_accept,
+ .l_close = ws_listener_close,
+ .l_options = ws_listener_options,
};
static nni_tran ws_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "ws",
- .tran_ep = &ws_ep_ops,
- .tran_pipe = &ws_pipe_ops,
- .tran_init = ws_tran_init,
- .tran_fini = ws_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ws",
+ .tran_dialer = &ws_dialer_ops,
+ .tran_listener = &ws_listener_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
};
int
@@ -858,25 +950,27 @@ nng_ws_register(void)
#ifdef NNG_TRANSPORT_WSS
static int
-wss_get_tls(ws_ep *ep, nng_tls_config **tlsp)
+wss_dialer_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- switch (ep->mode) {
- case NNI_EP_MODE_DIAL:
- return (nni_ws_dialer_get_tls(ep->dialer, tlsp));
- case NNI_EP_MODE_LISTEN:
- return (nni_ws_listener_get_tls(ep->listener, tlsp));
+ ws_dialer * d = arg;
+ nng_tls_config *tls;
+ int rv;
+
+ if (((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0) ||
+ ((rv = nni_copyout_ptr(tls, v, szp, t)) != 0)) {
+ return (rv);
}
- return (NNG_EINVAL);
+ return (0);
}
static int
-wss_ep_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t)
+wss_listener_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_listener * l = arg;
nng_tls_config *tls;
int rv;
- if (((rv = wss_get_tls(ep, &tls)) != 0) ||
+ if (((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0) ||
((rv = nni_copyout_ptr(tls, v, szp, t)) != 0)) {
return (rv);
}
@@ -884,7 +978,7 @@ wss_ep_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t)
}
static int
-wss_ep_chk_tlsconfig(const void *v, size_t sz, nni_opt_type t)
+wss_check_tlsconfig(const void *v, size_t sz, nni_opt_type t)
{
void *p;
int rv;
@@ -895,9 +989,9 @@ wss_ep_chk_tlsconfig(const void *v, size_t sz, nni_opt_type t)
}
static int
-wss_ep_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t)
+wss_dialer_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_dialer * d = arg;
nng_tls_config *cfg;
int rv;
@@ -905,56 +999,114 @@ wss_ep_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t)
return (rv);
}
if (cfg == NULL) {
- // NULL is clearly invalid.
return (NNG_EINVAL);
}
- if (ep->mode == NNI_EP_MODE_LISTEN) {
- rv = nni_ws_listener_set_tls(ep->listener, cfg);
- } else {
- rv = nni_ws_dialer_set_tls(ep->dialer, cfg);
+ return (nni_ws_dialer_set_tls(d->dialer, cfg));
+}
+
+static int
+wss_listener_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ws_listener * l = arg;
+ nng_tls_config *cfg;
+ int rv;
+
+ if ((rv = nni_copyin_ptr((void **) &cfg, v, sz, t)) != 0) {
+ return (rv);
}
- return (rv);
+ if (cfg == NULL) {
+ return (NNG_EINVAL);
+ }
+ return (nni_ws_listener_set_tls(l->listener, cfg));
}
static int
-wss_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+wss_dialer_set_cert_key_file(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_dialer * d = arg;
int rv;
nng_tls_config *tls;
- if (((rv = ws_ep_chk_string(v, sz, t)) != 0) ||
- ((rv = wss_get_tls(ep, &tls)) != 0)) {
+ if (((rv = ws_check_string(v, sz, t)) != 0) ||
+ ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) {
return (rv);
}
return (nng_tls_config_cert_key_file(tls, v, NULL));
}
static int
-wss_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+wss_listener_set_cert_key_file(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_listener * l = arg;
int rv;
nng_tls_config *tls;
- if (((rv = ws_ep_chk_string(v, sz, t)) != 0) ||
- ((rv = wss_get_tls(ep, &tls)) != 0)) {
+ if (((rv = ws_check_string(v, sz, t)) != 0) ||
+ ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) {
+ return (rv);
+ }
+ return (nng_tls_config_cert_key_file(tls, v, NULL));
+}
+
+static int
+wss_dialer_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ws_dialer * d = arg;
+ int rv;
+ nng_tls_config *tls;
+
+ if (((rv = ws_check_string(v, sz, t)) != 0) ||
+ ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) {
return (rv);
}
return (nng_tls_config_ca_file(tls, v));
}
static int
-wss_ep_chk_auth_mode(const void *v, size_t sz, nni_opt_type t)
+wss_listener_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ws_listener * l = arg;
+ int rv;
+ nng_tls_config *tls;
+
+ if (((rv = ws_check_string(v, sz, t)) != 0) ||
+ ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) {
+ return (rv);
+ }
+ return (nng_tls_config_ca_file(tls, v));
+}
+
+static int
+wss_check_auth_mode(const void *v, size_t sz, nni_opt_type t)
{
return (nni_copyin_int(NULL, v, sz, NNG_TLS_AUTH_MODE_NONE,
NNG_TLS_AUTH_MODE_REQUIRED, t));
}
static int
-wss_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
+wss_dialer_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
+{
+ ws_dialer * d = arg;
+ int rv;
+ nng_tls_config *tls;
+ int mode;
+
+ rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE,
+ NNG_TLS_AUTH_MODE_REQUIRED, t);
+
+ if ((rv != 0) ||
+ ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) {
+ return (rv);
+ }
+ return (nng_tls_config_auth_mode(tls, mode));
+}
+
+static int
+wss_listener_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_listener * l = arg;
int rv;
nng_tls_config *tls;
int mode;
@@ -962,77 +1114,73 @@ wss_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t)
rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE,
NNG_TLS_AUTH_MODE_REQUIRED, t);
- if ((rv != 0) || ((rv = wss_get_tls(ep, &tls)) != 0)) {
+ if ((rv != 0) ||
+ ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) {
return (rv);
}
return (nng_tls_config_auth_mode(tls, mode));
}
static int
-wss_ep_set_tls_server_name(void *arg, const void *v, size_t sz, nni_opt_type t)
+wss_dialer_set_tls_server_name(
+ void *arg, const void *v, size_t sz, nni_opt_type t)
{
- ws_ep * ep = arg;
+ ws_dialer * d = arg;
int rv;
nng_tls_config *tls;
- if (((rv = ws_ep_chk_string(v, sz, t)) != 0) ||
- ((rv = wss_get_tls(ep, &tls)) != 0)) {
+ if (((rv = ws_check_string(v, sz, t)) != 0) ||
+ ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) {
return (rv);
}
return (nng_tls_config_server_name(tls, v));
}
-static nni_tran_option wss_ep_options[] = {
+static nni_tran_option wss_dialer_options[] = {
{
.o_name = NNG_OPT_RECVMAXSZ,
.o_type = NNI_TYPE_SIZE,
- .o_get = ws_ep_get_recvmaxsz,
- .o_set = ws_ep_set_recvmaxsz,
- .o_chk = ws_ep_chk_recvmaxsz,
+ .o_get = ws_dialer_get_recvmaxsz,
+ .o_set = ws_dialer_set_recvmaxsz,
+ .o_chk = ws_check_recvmaxsz,
},
{
.o_name = NNG_OPT_WS_REQUEST_HEADERS,
.o_type = NNI_TYPE_STRING,
- .o_set = ws_ep_set_reqhdrs,
- .o_chk = ws_ep_chk_string,
- },
- {
- .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
- .o_type = NNI_TYPE_STRING,
- .o_set = ws_ep_set_reshdrs,
- .o_chk = ws_ep_chk_string,
+ .o_set = ws_dialer_set_reqhdrs,
+ .o_chk = ws_check_string,
},
{
.o_name = NNG_OPT_TLS_CONFIG,
.o_type = NNI_TYPE_POINTER,
- .o_get = wss_ep_get_tlsconfig,
- .o_set = wss_ep_set_tlsconfig,
- .o_chk = wss_ep_chk_tlsconfig,
+ .o_get = wss_dialer_get_tlsconfig,
+ .o_set = wss_dialer_set_tlsconfig,
+ .o_chk = wss_check_tlsconfig,
},
{
.o_name = NNG_OPT_TLS_CERT_KEY_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = wss_ep_set_cert_key_file,
- .o_chk = ws_ep_chk_string,
+ .o_set = wss_dialer_set_cert_key_file,
+ .o_chk = ws_check_string,
},
{
.o_name = NNG_OPT_TLS_CA_FILE,
.o_type = NNI_TYPE_STRING,
- .o_set = wss_ep_set_ca_file,
- .o_chk = ws_ep_chk_string,
+ .o_set = wss_dialer_set_ca_file,
+ .o_chk = ws_check_string,
},
{
.o_name = NNG_OPT_TLS_AUTH_MODE,
.o_type = NNI_TYPE_INT32,
- .o_set = wss_ep_set_auth_mode,
- .o_chk = wss_ep_chk_auth_mode,
+ .o_set = wss_dialer_set_auth_mode,
+ .o_chk = wss_check_auth_mode,
},
{
.o_name = NNG_OPT_TLS_SERVER_NAME,
.o_type = NNI_TYPE_STRING,
- .o_set = wss_ep_set_tls_server_name,
- .o_chk = ws_ep_chk_string,
+ .o_set = wss_dialer_set_tls_server_name,
+ .o_chk = ws_check_string,
},
// terminate list
{
@@ -1040,23 +1188,76 @@ static nni_tran_option wss_ep_options[] = {
},
};
-static nni_tran_ep_ops wss_ep_ops = {
- .ep_init = ws_ep_init,
- .ep_fini = ws_ep_fini,
- .ep_connect = ws_ep_connect,
- .ep_bind = ws_ep_bind,
- .ep_accept = ws_ep_accept,
- .ep_close = ws_ep_close,
- .ep_options = wss_ep_options,
+static nni_tran_option wss_listener_options[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_type = NNI_TYPE_SIZE,
+ .o_get = ws_listener_get_recvmaxsz,
+ .o_set = ws_listener_set_recvmaxsz,
+ .o_chk = ws_check_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = ws_listener_set_reshdrs,
+ .o_chk = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CONFIG,
+ .o_type = NNI_TYPE_POINTER,
+ .o_get = wss_listener_get_tlsconfig,
+ .o_set = wss_listener_set_tlsconfig,
+ .o_chk = wss_check_tlsconfig,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CERT_KEY_FILE,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = wss_listener_set_cert_key_file,
+ .o_chk = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CA_FILE,
+ .o_type = NNI_TYPE_STRING,
+ .o_set = wss_listener_set_ca_file,
+ .o_chk = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_AUTH_MODE,
+ .o_type = NNI_TYPE_INT32,
+ .o_set = wss_listener_set_auth_mode,
+ .o_chk = wss_check_auth_mode,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_tran_dialer_ops wss_dialer_ops = {
+ .d_init = ws_dialer_init,
+ .d_fini = ws_dialer_fini,
+ .d_connect = ws_dialer_connect,
+ .d_close = ws_dialer_close,
+ .d_options = wss_dialer_options,
+};
+
+static nni_tran_listener_ops wss_listener_ops = {
+ .l_init = ws_listener_init,
+ .l_fini = ws_listener_fini,
+ .l_bind = ws_listener_bind,
+ .l_accept = ws_listener_accept,
+ .l_close = ws_listener_close,
+ .l_options = wss_listener_options,
};
static nni_tran wss_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "wss",
- .tran_ep = &wss_ep_ops,
- .tran_pipe = &ws_pipe_ops,
- .tran_init = ws_tran_init,
- .tran_fini = ws_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "wss",
+ .tran_dialer = &wss_dialer_ops,
+ .tran_listener = &wss_listener_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
};
int
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index fa8458c1..3535a248 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -2251,6 +2251,18 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (0);
}
+static int
+zt_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+}
+
+static int
+zt_listener_init(void **epp, nni_url *url, nni_sock *sock)
+{
+ return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+}
+
static void
zt_ep_close(void *arg)
{
@@ -3022,25 +3034,33 @@ static nni_tran_option zt_ep_options[] = {
},
};
-static nni_tran_ep_ops zt_ep_ops = {
- .ep_init = zt_ep_init,
- .ep_fini = zt_ep_fini,
- .ep_connect = zt_ep_connect,
- .ep_bind = zt_ep_bind,
- .ep_accept = zt_ep_accept,
- .ep_close = zt_ep_close,
- .ep_options = zt_ep_options,
+static nni_tran_dialer_ops zt_dialer_ops = {
+ .d_init = zt_dialer_init,
+ .d_fini = zt_ep_fini,
+ .d_connect = zt_ep_connect,
+ .d_close = zt_ep_close,
+ .d_options = zt_ep_options,
+};
+
+static nni_tran_listener_ops zt_listener_ops = {
+ .l_init = zt_listener_init,
+ .l_fini = zt_ep_fini,
+ .l_bind = zt_ep_bind,
+ .l_accept = zt_ep_accept,
+ .l_close = zt_ep_close,
+ .l_options = zt_ep_options,
};
// This is the ZeroTier transport linkage, and should be the
// only global symbol in this entire file.
static struct nni_tran zt_tran = {
- .tran_version = NNI_TRANSPORT_VERSION,
- .tran_scheme = "zt",
- .tran_ep = &zt_ep_ops,
- .tran_pipe = &zt_pipe_ops,
- .tran_init = zt_tran_init,
- .tran_fini = zt_tran_fini,
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "zt",
+ .tran_dialer = &zt_dialer_ops,
+ .tran_listener = &zt_listener_ops,
+ .tran_pipe = &zt_pipe_ops,
+ .tran_init = zt_tran_init,
+ .tran_fini = zt_tran_fini,
};
int