diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | src/core/defs.h | 19 | ||||
| -rw-r--r-- | src/core/dialer.c | 512 | ||||
| -rw-r--r-- | src/core/dialer.h | 32 | ||||
| -rw-r--r-- | src/core/endpt.c | 665 | ||||
| -rw-r--r-- | src/core/endpt.h | 45 | ||||
| -rw-r--r-- | src/core/init.c | 6 | ||||
| -rw-r--r-- | src/core/listener.c | 443 | ||||
| -rw-r--r-- | src/core/listener.h | 33 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 57 | ||||
| -rw-r--r-- | src/core/pipe.h | 16 | ||||
| -rw-r--r-- | src/core/socket.c | 139 | ||||
| -rw-r--r-- | src/core/socket.h | 7 | ||||
| -rw-r--r-- | src/core/transport.c | 26 | ||||
| -rw-r--r-- | src/core/transport.h | 119 | ||||
| -rw-r--r-- | src/nng.c | 265 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 70 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 69 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 126 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 156 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 757 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 48 |
23 files changed, 2256 insertions, 1363 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cfb6ff16..0ad53392 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,8 +36,8 @@ set (NNG_SOURCES core/clock.h core/device.c core/device.h - core/endpt.c - core/endpt.h + core/dialer.c + core/dialer.h core/file.c core/file.h core/idhash.c @@ -46,6 +46,8 @@ set (NNG_SOURCES core/init.h core/list.c core/list.h + core/listener.c + core/listener.h core/message.c core/message.h core/msgqueue.c diff --git a/src/core/defs.h b/src/core/defs.h index 77078a7a..a0cca368 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -40,14 +40,17 @@ typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; // These are our own names. -typedef struct nni_socket nni_sock; -typedef struct nni_ctx nni_ctx; -typedef struct nni_ep nni_ep; -typedef struct nni_pipe nni_pipe; -typedef struct nni_tran nni_tran; -typedef struct nni_tran_option nni_tran_option; -typedef struct nni_tran_ep_ops nni_tran_ep_ops; -typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; +typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; +typedef struct nni_dialer nni_dialer; +typedef struct nni_listener nni_listener; +typedef struct nni_pipe nni_pipe; + +typedef struct nni_tran nni_tran; +typedef struct nni_tran_option nni_tran_option; +typedef struct nni_tran_dialer_ops nni_tran_dialer_ops; +typedef struct nni_tran_listener_ops nni_tran_listener_ops; +typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; typedef struct nni_proto_option nni_proto_option; typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; diff --git a/src/core/dialer.c b/src/core/dialer.c new file mode 100644 index 00000000..ee0d2916 --- /dev/null +++ b/src/core/dialer.c @@ -0,0 +1,512 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_dialer { + nni_tran_dialer_ops d_ops; // transport ops + nni_tran * d_tran; // transport pointer + void * d_data; // transport private + uint64_t d_id; // endpoint id + nni_list_node d_node; // per socket list + nni_sock * d_sock; + nni_url * d_url; + int d_refcnt; + int d_lastrv; // last result from synchronous + bool d_synch; // synchronous connect in progress? + bool d_started; + bool d_closed; // full shutdown + bool d_closing; // close pending (waiting on refcnt) + nni_mtx d_mtx; + nni_cv d_cv; + nni_list d_pipes; + nni_aio * d_con_aio; + nni_aio * d_tmo_aio; // backoff timer + nni_duration d_maxrtime; // maximum time for reconnect + nni_duration d_currtime; // current time for reconnect + nni_duration d_inirtime; // initial time for reconnect + nni_time d_conntime; // time of last good connect +}; + +// Functionality related to dialers. +static void dialer_connect_start(nni_dialer *); +static void dialer_connect_cb(void *); +static void dialer_timer_cb(void *); + +static nni_idhash *dialers; +static nni_mtx dialers_lk; + +int +nni_dialer_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&dialers)) != 0) { + return (rv); + } + nni_mtx_init(&dialers_lk); + nni_idhash_set_limits( + dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_dialer_sys_fini(void) +{ + nni_mtx_fini(&dialers_lk); + nni_idhash_fini(dialers); + dialers = NULL; +} + +uint32_t +nni_dialer_id(nni_dialer *d) +{ + return ((uint32_t) d->d_id); +} + +static void +dialer_destroy(nni_dialer *d) +{ + if (d == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (d->d_id != 0) { + nni_idhash_remove(dialers, d->d_id); + } + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_sock_remove_dialer(d->d_sock, d); + + nni_aio_fini(d->d_con_aio); + nni_aio_fini(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + if (d->d_data != NULL) { + d->d_ops.d_fini(d->d_data); + } + nni_mtx_unlock(&d->d_mtx); + nni_cv_fini(&d->d_cv); + nni_mtx_fini(&d->d_mtx); + nni_url_free(d->d_url); + NNI_FREE_STRUCT(d); +} + +int +nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_dialer *d; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_dialer == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + d->d_url = url; + d->d_closed = false; + d->d_closing = false; + d->d_started = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + d->d_ops = *tran->tran_dialer; + + NNI_LIST_NODE_INIT(&d->d_node); + + nni_pipe_ep_list_init(&d->d_pipes); + + nni_mtx_init(&d->d_mtx); + nni_cv_init(&d->d_cv, &d->d_mtx); + + if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_sock_add_dialer(s, d)) != 0)) { + dialer_destroy(d); + return (rv); + } + + *dp = d; + return (0); +} + +int +nni_dialer_find(nni_dialer **dp, uint32_t id) +{ + int rv; + nni_dialer *d; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&dialers_lk); + if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) { + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + *dp = d; + } + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +int +nni_dialer_hold(nni_dialer *d) +{ + int rv; + nni_mtx_lock(&dialers_lk); + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + rv = 0; + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +void +nni_dialer_rele(nni_dialer *d) +{ + nni_mtx_lock(&dialers_lk); + d->d_refcnt--; + if (d->d_closing) { + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&dialers_lk); +} + +int +nni_dialer_shutdown(nni_dialer *d) +{ + nni_mtx_lock(&d->d_mtx); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + d->d_closing = true; + nni_mtx_unlock(&d->d_mtx); + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + return (0); +} + +void +nni_dialer_close(nni_dialer *d) +{ + nni_pipe *p; + + nni_mtx_lock(&d->d_mtx); + if (d->d_closed) { + nni_mtx_unlock(&d->d_mtx); + nni_dialer_rele(d); + return; + } + d->d_closed = true; + nni_mtx_unlock(&d->d_mtx); + + nni_dialer_shutdown(d); + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { + nni_cv_wait(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); + + dialer_destroy(d); +} + +// This function starts an exponential backoff timer for reconnecting. +static void +dialer_timer_start(nni_dialer *d) +{ + nni_duration backoff; + + if (d->d_closing) { + return; + } + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +static void +dialer_timer_cb(void *arg) +{ + nni_dialer *d = arg; + nni_aio * aio = d->d_tmo_aio; + + nni_mtx_lock(&d->d_mtx); + if (nni_aio_result(aio) == 0) { + dialer_connect_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_cb(void *arg) +{ + nni_dialer *d = arg; + nni_pipe * p; + nni_aio * aio = d->d_con_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + } + if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&d->d_mtx); + switch (rv) { + case 0: + nni_pipe_set_dialer(p, d); + nni_list_append(&d->d_pipes, p); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + nni_pipe_stop(p); + return; + } + + // Good connect, so reset the backoff timer. + // Note that a host that accepts the connect, but drops + // us immediately, is going to get hit pretty hard + // (depending on the initial backoff) with no + // exponential backoff. This can happen if we wind up + // trying to connect to some port that does not speak + // SP for example. + d->d_currtime = d->d_inirtime; + + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // redial, but only if we are not synchronous + if (!d->d_synch) { + dialer_timer_start(d); + } + break; + } + if (d->d_synch) { + if (rv != 0) { + d->d_started = false; + } + d->d_lastrv = rv; + d->d_synch = false; + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_start(nni_dialer *d) +{ + nni_aio *aio = d->d_con_aio; + + // Call with the Endpoint lock held. + if (d->d_closing) { + return; + } + + d->d_ops.d_connect(d->d_data, aio); +} + +int +nni_dialer_start(nni_dialer *d, int flags) +{ + int rv = 0; + + nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime); + d->d_currtime = d->d_inirtime; + + nni_mtx_lock(&d->d_mtx); + + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + + if (d->d_started) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ESTATE); + } + + if ((flags & NNG_FLAG_NONBLOCK) != 0) { + d->d_started = true; + dialer_connect_start(d); + nni_mtx_unlock(&d->d_mtx); + return (0); + } + + d->d_synch = true; + d->d_started = true; + dialer_connect_start(d); + + while (d->d_synch && !d->d_closing) { + nni_cv_wait(&d->d_cv); + } + rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; + nni_cv_wake(&d->d_cv); + + nni_mtx_unlock(&d->d_mtx); + return (rv); +} + +void +nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) +{ + if (d == NULL) { + return; + } + + // Break up the relationship between the dialer and the pipe. + nni_mtx_lock(&d->d_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&d->d_pipes, p)) { + nni_list_remove(&d->d_pipes, p); + } + // Wake up the close thread if it is waiting. + if (d->d_closed) { + if (nni_list_empty(&d->d_pipes)) { + nni_cv_wake(&d->d_cv); + } + } else { + // If this pipe closed, then lets restart the dial operation. + // Since the remote side seems to have closed, lets start with + // a backoff. This keeps us from pounding the crap out of the + // thing if a remote server accepts but then disconnects + // immediately. + dialer_timer_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +int +nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, + nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&d->d_mtx); + rv = o->o_set(d->d_data, val, sz, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_dialer_getopt( + nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&d->d_mtx); + rv = o->o_get(d->d_data, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); +} + +void +nni_dialer_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_dialer, d_node); +} diff --git a/src/core/dialer.h b/src/core/dialer.h new file mode 100644 index 00000000..56b0fb1b --- /dev/null +++ b/src/core/dialer.h @@ -0,0 +1,32 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_DIALER_H +#define CORE_DIALER_H + +extern int nni_dialer_sys_init(void); +extern void nni_dialer_sys_fini(void); +extern int nni_dialer_find(nni_dialer **, uint32_t); +extern int nni_dialer_hold(nni_dialer *); +extern void nni_dialer_rele(nni_dialer *); +extern uint32_t nni_dialer_id(nni_dialer *); +extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); +extern int nni_dialer_shutdown(nni_dialer *); +extern void nni_dialer_close(nni_dialer *); +extern int nni_dialer_start(nni_dialer *, int); +extern void nni_dialer_list_init(nni_list *); +extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *); + +extern int nni_dialer_setopt( + nni_dialer *, const char *, const void *, size_t, nni_opt_type); +extern int nni_dialer_getopt( + nni_dialer *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_DIALER_H diff --git a/src/core/endpt.c b/src/core/endpt.c deleted file mode 100644 index 8e678fb0..00000000 --- a/src/core/endpt.c +++ /dev/null @@ -1,665 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -struct nni_ep { - nni_tran_ep_ops ep_ops; // transport ops - nni_tran * ep_tran; // transport pointer - void * ep_data; // transport private - uint64_t ep_id; // endpoint id - nni_list_node ep_node; // per socket list - nni_sock * ep_sock; - nni_url * ep_url; - int ep_mode; - int ep_refcnt; - bool ep_started; - bool ep_closed; // full shutdown - bool ep_closing; // close pending (waiting on refcnt) - bool ep_tmo_run; - nni_mtx ep_mtx; - nni_cv ep_cv; - nni_list ep_pipes; - nni_aio * ep_acc_aio; - nni_aio * ep_con_aio; - nni_aio * ep_con_syn; // used for sync connect - nni_aio * ep_tmo_aio; // backoff timer - nni_duration ep_maxrtime; // maximum time for reconnect - nni_duration ep_currtime; // current time for reconnect - nni_duration ep_inirtime; // initial time for reconnect - nni_time ep_conntime; // time of last good connect -}; - -// Functionality related to end points. - -static void nni_ep_acc_start(nni_ep *); -static void nni_ep_acc_cb(void *); -static void nni_ep_con_start(nni_ep *); -static void nni_ep_con_cb(void *); -static void nni_ep_tmo_start(nni_ep *); -static void nni_ep_tmo_cb(void *); - -static nni_idhash *nni_eps; -static nni_mtx nni_ep_lk; - -int -nni_ep_sys_init(void) -{ - int rv; - - if ((rv = nni_idhash_init(&nni_eps)) != 0) { - return (rv); - } - nni_mtx_init(&nni_ep_lk); - nni_idhash_set_limits( - nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - - return (0); -} - -void -nni_ep_sys_fini(void) -{ - nni_mtx_fini(&nni_ep_lk); - nni_idhash_fini(nni_eps); - nni_eps = NULL; -} - -uint32_t -nni_ep_id(nni_ep *ep) -{ - return ((uint32_t) ep->ep_id); -} - -static void -nni_ep_destroy(nni_ep *ep) -{ - if (ep == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (ep->ep_id != 0) { - nni_idhash_remove(nni_eps, ep->ep_id); - } - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_sock_ep_remove(ep->ep_sock, ep); - - nni_aio_fini(ep->ep_acc_aio); - nni_aio_fini(ep->ep_con_aio); - nni_aio_fini(ep->ep_con_syn); - nni_aio_fini(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_data != NULL) { - ep->ep_ops.ep_fini(ep->ep_data); - } - nni_mtx_unlock(&ep->ep_mtx); - nni_cv_fini(&ep->ep_cv); - nni_mtx_fini(&ep->ep_mtx); - nni_url_free(ep->ep_url); - NNI_FREE_STRUCT(ep); -} - -static int -nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode) -{ - nni_tran *tran; - nni_ep * ep; - int rv; - nni_url * url; - - if ((rv = nni_url_parse(&url, urlstr)) != 0) { - return (rv); - } - if ((tran = nni_tran_find(url)) == NULL) { - nni_url_free(url); - return (NNG_ENOTSUP); - } - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - nni_url_free(url); - return (NNG_ENOMEM); - } - ep->ep_url = url; - ep->ep_closed = false; - ep->ep_closing = false; - ep->ep_started = false; - ep->ep_data = NULL; - ep->ep_refcnt = 1; - ep->ep_sock = s; - ep->ep_tran = tran; - ep->ep_mode = mode; - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - ep->ep_ops = *tran->tran_ep; - - NNI_LIST_NODE_INIT(&ep->ep_node); - - nni_pipe_ep_list_init(&ep->ep_pipes); - - nni_mtx_init(&ep->ep_mtx); - nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - - if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) || - ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || - ((rv = nni_sock_ep_add(s, ep)) != 0)) { - nni_ep_destroy(ep); - return (rv); - } - - *epp = ep; - return (0); -} - -int -nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL)); -} - -int -nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN)); -} - -int -nni_ep_find(nni_ep **epp, uint32_t id) -{ - int rv; - nni_ep *ep; - - if ((rv = nni_init()) != 0) { - return (rv); - } - - nni_mtx_lock(&nni_ep_lk); - if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) { - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - *epp = ep; - } - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -int -nni_ep_hold(nni_ep *ep) -{ - int rv; - nni_mtx_lock(&nni_ep_lk); - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - rv = 0; - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -void -nni_ep_rele(nni_ep *ep) -{ - nni_mtx_lock(&nni_ep_lk); - ep->ep_refcnt--; - if (ep->ep_closing) { - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&nni_ep_lk); -} - -int -nni_ep_shutdown(nni_ep *ep) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - ep->ep_closing = true; - nni_mtx_unlock(&ep->ep_mtx); - - // Abort any remaining in-flight operations. - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - // Stop the underlying transport. - ep->ep_ops.ep_close(ep->ep_data); - - return (0); -} - -void -nni_ep_close(nni_ep *ep) -{ - nni_pipe *p; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); - return; - } - ep->ep_closed = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_shutdown(ep); - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) { - nni_cv_wait(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_destroy(ep); -} - -static void -nni_ep_tmo_cancel(nni_aio *aio, int rv) -{ - nni_ep *ep = nni_aio_get_prov_data(aio); - // The only way this ever gets "finished", is via cancellation. - if (ep != NULL) { - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_tmo_run) { - nni_aio_finish_error(aio, rv); - } - ep->ep_tmo_run = false; - nni_mtx_unlock(&ep->ep_mtx); - } -} - -static void -nni_ep_tmo_start(nni_ep *ep) -{ - nni_duration backoff; - int rv; - - if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { - return; - } - backoff = ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - - nni_aio_set_timeout( - ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); - - if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) != - 0) { - nni_aio_finish_error(ep->ep_tmo_aio, rv); - } - - ep->ep_tmo_run = true; -} - -static void -nni_ep_tmo_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_tmo_aio; - - nni_mtx_lock(&ep->ep_mtx); - if (nni_aio_result(aio) == NNG_ETIMEDOUT) { - if (ep->ep_mode == NNI_EP_MODE_DIAL) { - nni_ep_con_start(ep); - } else { - nni_ep_acc_start(ep); - } - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_con_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - ep->ep_currtime = ep->ep_inirtime; - - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. - break; - default: - // Other errors involve the use of the backoff timer. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_con_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - - ep->ep_ops.ep_connect(ep->ep_data, aio); -} - -int -nni_ep_dial(nni_ep *ep, int flags) -{ - int rv = 0; - nni_aio *aio; - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - - if (ep->ep_mode != NNI_EP_MODE_DIAL) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - if ((flags & NNG_FLAG_NONBLOCK) != 0) { - ep->ep_started = true; - nni_ep_con_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - return (0); - } - - // Synchronous mode: so we have to wait for it to complete. - aio = ep->ep_con_syn; - ep->ep_ops.ep_connect(ep->ep_data, aio); - ep->ep_started = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_aio_wait(aio); - - // As we're synchronous, we also have to handle the completion. - if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { - nni_mtx_lock(&ep->ep_mtx); - ep->ep_started = false; - nni_mtx_unlock(&ep->ep_mtx); - } - return (rv); -} - -static void -nni_ep_acc_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_acc_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL); - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - nni_ep_acc_start(ep); - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled or closed, no further action. - break; - case NNG_ECONNABORTED: - case NNG_ECONNRESET: - // These are remote conditions, no cool down. - nni_ep_acc_start(ep); - break; - default: - // We don't really know why we failed, but we backoff - // here. This is because errors here are probably due - // to system failures (resource exhaustion) and we hope - // by not thrashing we give the system a chance to - // recover. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_acc_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_acc_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - ep->ep_ops.ep_accept(ep->ep_data, aio); -} - -int -nni_ep_listen(nni_ep *ep, int flags) -{ - int rv = 0; - NNI_ARG_UNUSED(flags); - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - rv = ep->ep_ops.ep_bind(ep->ep_data); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - ep->ep_started = true; - nni_ep_acc_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - - return (0); -} - -int -nni_ep_pipe_add(nni_ep *ep, nni_pipe *p) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - nni_list_append(&ep->ep_pipes, p); - nni_mtx_unlock(&ep->ep_mtx); - return (0); -} - -void -nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) -{ - // Break up the relationship between the EP and the pipe. - nni_mtx_lock(&ep->ep_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&ep->ep_pipes, pipe)) { - nni_list_remove(&ep->ep_pipes, pipe); - } - // Wake up the close thread if it is waiting. - if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) { - nni_cv_wake(&ep->ep_cv); - } - - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) { - nni_ep_tmo_start(ep); - } - nni_mtx_unlock(&ep->ep_mtx); -} - -int -nni_ep_setopt( - nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t) -{ - nni_tran_option *o; - - if (strcmp(name, NNG_OPT_URL) == 0) { - return (NNG_EREADONLY); - } - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_set == NULL) { - return (NNG_EREADONLY); - } - - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_set(ep->ep_data, val, sz, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - return (NNG_ENOTSUP); -} - -int -nni_ep_mode(nni_ep *ep) -{ - return (ep->ep_mode); -} - -int -nni_ep_getopt( - nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t) -{ - nni_tran_option *o; - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_get == NULL) { - return (NNG_EWRITEONLY); - } - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_get(ep->ep_data, valp, szp, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - // We provide a fallback on the URL, but let the implementation - // override. This allows the URL to be created with wildcards, - // that are resolved later. - if (strcmp(name, NNG_OPT_URL) == 0) { - return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t)); - } - - return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t)); -} - -void -nni_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_ep, ep_node); -} - -nni_tran * -nni_ep_tran(nni_ep *ep) -{ - return (ep->ep_tran); -} - -nni_sock * -nni_ep_sock(nni_ep *ep) -{ - return (ep->ep_sock); -} diff --git a/src/core/endpt.h b/src/core/endpt.h deleted file mode 100644 index bf251d41..00000000 --- a/src/core/endpt.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_ENDPT_H -#define CORE_ENDPT_H - -extern int nni_ep_sys_init(void); -extern void nni_ep_sys_fini(void); -extern nni_tran *nni_ep_tran(nni_ep *); -extern nni_sock *nni_ep_sock(nni_ep *); -extern int nni_ep_find(nni_ep **, uint32_t); -extern int nni_ep_hold(nni_ep *); -extern void nni_ep_rele(nni_ep *); -extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); -extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); -extern int nni_ep_shutdown(nni_ep *); -extern void nni_ep_close(nni_ep *); -extern int nni_ep_dial(nni_ep *, int); -extern int nni_ep_listen(nni_ep *, int); -extern void nni_ep_list_init(nni_list *); -extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); -extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); -extern int nni_ep_mode(nni_ep *); - -extern int nni_ep_setopt( - nni_ep *, const char *, const void *, size_t, nni_opt_type); -extern int nni_ep_getopt( - nni_ep *, const char *, void *, size_t *, nni_opt_type); - -// Endpoint modes. Currently used by transports. Remove this when we make -// transport dialers and listeners explicit. -enum nni_ep_mode { - NNI_EP_MODE_DIAL = 1, - NNI_EP_MODE_LISTEN = 2, -}; - -#endif // CORE_ENDPT_H diff --git a/src/core/init.c b/src/core/init.c index c1b7bbac..30a7a547 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -33,7 +33,8 @@ nni_init_helper(void) ((rv = nni_aio_sys_init()) != 0) || ((rv = nni_random_sys_init()) != 0) || ((rv = nni_sock_sys_init()) != 0) || - ((rv = nni_ep_sys_init()) != 0) || + ((rv = nni_listener_sys_init()) != 0) || + ((rv = nni_dialer_sys_init()) != 0) || ((rv = nni_pipe_sys_init()) != 0) || ((rv = nni_proto_sys_init()) != 0) || ((rv = nni_tran_sys_init()) != 0)) { @@ -71,7 +72,8 @@ nni_fini(void) nni_tran_sys_fini(); nni_proto_sys_fini(); nni_pipe_sys_fini(); - nni_ep_sys_fini(); + nni_dialer_sys_fini(); + nni_listener_sys_fini(); nni_sock_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_random_sys_fini(); diff --git a/src/core/listener.c b/src/core/listener.c new file mode 100644 index 00000000..6d580cd8 --- /dev/null +++ b/src/core/listener.c @@ -0,0 +1,443 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_listener { + nni_tran_listener_ops l_ops; // transport ops + nni_tran * l_tran; // transport pointer + void * l_data; // transport private + uint64_t l_id; // endpoint id + nni_list_node l_node; // per socket list + nni_sock * l_sock; + nni_url * l_url; + int l_refcnt; + bool l_started; + bool l_closed; // full shutdown + bool l_closing; // close pending (waiting on refcnt) + nni_mtx l_mtx; + nni_cv l_cv; + nni_list l_pipes; + nni_aio * l_acc_aio; + nni_aio * l_tmo_aio; +}; + +// Functionality related to listeners. + +static void listener_accept_start(nni_listener *); +static void listener_accept_cb(void *); +static void listener_timer_cb(void *); + +static nni_idhash *listeners; +static nni_mtx listeners_lk; + +int +nni_listener_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&listeners)) != 0) { + return (rv); + } + nni_mtx_init(&listeners_lk); + nni_idhash_set_limits( + listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_listener_sys_fini(void) +{ + nni_mtx_fini(&listeners_lk); + nni_idhash_fini(listeners); + listeners = NULL; +} + +uint32_t +nni_listener_id(nni_listener *l) +{ + return ((uint32_t) l->l_id); +} + +static void +listener_destroy(nni_listener *l) +{ + if (l == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (l->l_id != 0) { + nni_idhash_remove(listeners, l->l_id); + } + + nni_aio_stop(l->l_acc_aio); + + nni_sock_remove_listener(l->l_sock, l); + + nni_aio_fini(l->l_acc_aio); + + nni_mtx_lock(&l->l_mtx); + if (l->l_data != NULL) { + l->l_ops.l_fini(l->l_data); + } + nni_mtx_unlock(&l->l_mtx); + nni_cv_fini(&l->l_cv); + nni_mtx_fini(&l->l_mtx); + nni_url_free(l->l_url); + NNI_FREE_STRUCT(l); +} + +int +nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_listener *l; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_listener == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + l->l_url = url; + l->l_closed = false; + l->l_closing = false; + l->l_started = false; + l->l_data = NULL; + l->l_refcnt = 1; + l->l_sock = s; + l->l_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + l->l_ops = *tran->tran_listener; + + NNI_LIST_NODE_INIT(&l->l_node); + + nni_pipe_ep_list_init(&l->l_pipes); + + nni_mtx_init(&l->l_mtx); + nni_cv_init(&l->l_cv, &l->l_mtx); + + if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) || + ((rv = nni_sock_add_listener(s, l)) != 0)) { + listener_destroy(l); + return (rv); + } + + *lp = l; + return (0); +} + +int +nni_listener_find(nni_listener **lp, uint32_t id) +{ + int rv; + nni_listener *l; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&listeners_lk); + if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) { + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + *lp = l; + } + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +int +nni_listener_hold(nni_listener *l) +{ + int rv; + nni_mtx_lock(&listeners_lk); + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + rv = 0; + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +void +nni_listener_rele(nni_listener *l) +{ + nni_mtx_lock(&listeners_lk); + l->l_refcnt--; + if (l->l_closing) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&listeners_lk); +} + +int +nni_listener_shutdown(nni_listener *l) +{ + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + l->l_closing = true; + nni_mtx_unlock(&l->l_mtx); + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + return (0); +} + +void +nni_listener_close(nni_listener *l) +{ + nni_pipe *p; + + nni_mtx_lock(&l->l_mtx); + if (l->l_closed) { + nni_mtx_unlock(&l->l_mtx); + nni_listener_rele(l); + return; + } + l->l_closed = true; + nni_mtx_unlock(&l->l_mtx); + + nni_listener_shutdown(l); + + nni_aio_stop(l->l_acc_aio); + nni_aio_stop(l->l_tmo_aio); + + nni_mtx_lock(&l->l_mtx); + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) { + nni_cv_wait(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); + + listener_destroy(l); +} + +static void +listener_timer_cb(void *arg) +{ + nni_listener *l = arg; + nni_aio * aio = l->l_tmo_aio; + + nni_mtx_lock(&l->l_mtx); + if (nni_aio_result(aio) == 0) { + listener_accept_start(l); + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_cb(void *arg) +{ + nni_listener *l = arg; + nni_pipe * p; + nni_aio * aio = l->l_acc_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); + } + + if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&l->l_mtx); + switch (rv) { + case 0: + nni_pipe_set_listener(p, l); + nni_list_append(&l->l_pipes, p); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + nni_pipe_stop(p); + return; + } + listener_accept_start(l); + break; + case NNG_ECONNABORTED: // remote condition, no cooldown + case NNG_ECONNRESET: // remote condition, no cooldown + listener_accept_start(l); + break; + case NNG_ECLOSED: // no further action + case NNG_ECANCELED: // no further action + break; + default: + // We don't really know why we failed, but we backoff + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. 100 msec is enough to cool down. + nni_sleep_aio(100, l->l_tmo_aio); + break; + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_start(nni_listener *l) +{ + nni_aio *aio = l->l_acc_aio; + + // Call with the listener lock held. + if (l->l_closing) { + return; + } + l->l_ops.l_accept(l->l_data, aio); +} + +int +nni_listener_start(nni_listener *l, int flags) +{ + int rv = 0; + NNI_ARG_UNUSED(flags); + + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + if (l->l_started) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ESTATE); + } + + if ((rv = l->l_ops.l_bind(l->l_data)) != 0) { + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + l->l_started = true; + listener_accept_start(l); + nni_mtx_unlock(&l->l_mtx); + + return (0); +} + +void +nni_listener_remove_pipe(nni_listener *l, nni_pipe *p) +{ + if (l == NULL) { + return; + } + // Break up relationship between listener and pipe. + nni_mtx_lock(&l->l_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&l->l_pipes, p)) { + nni_list_remove(&l->l_pipes, p); + } + // Wake up the closer if it is waiting. + if (l->l_closed && nni_list_empty(&l->l_pipes)) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); +} + +int +nni_listener_setopt(nni_listener *l, const char *name, const void *val, + size_t sz, nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&l->l_mtx); + rv = o->o_set(l->l_data, val, sz, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_listener_getopt( + nni_listener *l, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&l->l_mtx); + rv = o->o_get(l->l_data, valp, szp, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(l->l_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); +} + +void +nni_listener_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_listener, l_node); +} diff --git a/src/core/listener.h b/src/core/listener.h new file mode 100644 index 00000000..41b1a678 --- /dev/null +++ b/src/core/listener.h @@ -0,0 +1,33 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_LISTENER_H +#define CORE_LISTENER_H + +extern int nni_listener_sys_init(void); +extern void nni_listener_sys_fini(void); +extern int nni_listener_find(nni_listener **, uint32_t); +extern int nni_listener_hold(nni_listener *); +extern void nni_listener_rele(nni_listener *); +extern uint32_t nni_listener_id(nni_listener *); +extern int nni_listener_create(nni_listener **, nni_sock *, const char *); +extern int nni_listener_shutdown(nni_listener *); +extern void nni_listener_close(nni_listener *); +extern int nni_listener_start(nni_listener *, int); +extern void nni_listener_list_init(nni_list *); +extern int nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *); + +extern int nni_listener_setopt( + nni_listener *, const char *, const void *, size_t, nni_opt_type); +extern int nni_listener_getopt( + nni_listener *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_LISTENER_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index fdb2ce94..9af12720 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -52,7 +52,8 @@ // These have to come after the others - particularly transport.h -#include "core/endpt.h" +#include "core/dialer.h" +#include "core/listener.h" #include "core/pipe.h" #include "core/socket.h" diff --git a/src/core/pipe.c b/src/core/pipe.c index 93fbae99..a42cdeff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -26,7 +26,8 @@ struct nni_pipe { nni_list_node p_sock_node; nni_list_node p_ep_node; nni_sock * p_sock; - nni_ep * p_ep; + nni_listener * p_listener; + nni_dialer * p_dialer; bool p_closed; bool p_stop; bool p_cbs; @@ -98,7 +99,7 @@ nni_pipe_sys_fini(void) } } -static void +void nni_pipe_destroy(nni_pipe *p) { bool cbs; @@ -126,9 +127,8 @@ nni_pipe_destroy(nni_pipe *p) // We have exclusive access at this point, so we can check if // we are still on any lists. - if (nni_list_node_active(&p->p_ep_node)) { - nni_ep_pipe_remove(p->p_ep, p); - } + nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL + nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); @@ -303,12 +303,10 @@ nni_pipe_start_cb(void *arg) } int -nni_pipe_create(nni_ep *ep, void *tdata) +nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; - nni_tran * tran = nni_ep_tran(ep); - nni_sock * sock = nni_ep_sock(ep); void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); @@ -324,7 +322,6 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_ops = *pops; p->p_proto_data = NULL; - p->p_ep = ep; p->p_sock = sock; p->p_closed = false; p->p_stop = false; @@ -348,16 +345,27 @@ nni_pipe_create(nni_ep *ep, void *tdata) } if ((rv != 0) || - ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0) || - ((rv = nni_sock_pipe_add(sock, p)) != 0)) { + ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_destroy(p); return (rv); } + *pp = p; return (0); } +void +nni_pipe_set_listener(nni_pipe *p, nni_listener *l) +{ + p->p_listener = l; +} + +void +nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) +{ + p->p_dialer = d; +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) @@ -371,7 +379,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } // Maybe the endpoint knows? - return (nni_ep_getopt(p->p_ep, name, val, szp, t)); + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); + } + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); + } + return (NNG_ENOTSUP); } void @@ -409,15 +423,20 @@ nni_pipe_sock_id(nni_pipe *p) } uint32_t -nni_pipe_ep_id(nni_pipe *p) +nni_pipe_listener_id(nni_pipe *p) { - return (nni_ep_id(p->p_ep)); + if (p->p_listener != NULL) { + return (nni_listener_id(p->p_listener)); + } + return (0); } - -int -nni_pipe_ep_mode(nni_pipe *p) +uint32_t +nni_pipe_dialer_id(nni_pipe *p) { - return (nni_ep_mode(p->p_ep)); + if (p->p_dialer != NULL) { + return (nni_dialer_id(p->p_dialer)); + } + return (0); } static void diff --git a/src/core/pipe.h b/src/core/pipe.h index bd66d4ed..5c505514 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,6 +29,10 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); +// nni_pipe_destroy destroys a pipe -- there must not be any other +// references to it; this is used only during creation failures. +extern void nni_pipe_destroy(nni_pipe *); + // nni_pipe_close closes the underlying transport for the pipe. Further // operations against will return NNG_ECLOSED. extern void nni_pipe_close(nni_pipe *); @@ -48,7 +52,9 @@ extern void nni_pipe_stop(nni_pipe *); // endpoint, grabbing each of those locks. The function takes ownership of // the transport specific pipe (3rd argument), regardless of whether it // succeeds or not. The endpoint should be held when calling this. -extern int nni_pipe_create(nni_ep *, void *); +extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *); +extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *); +extern void nni_pipe_set_listener(nni_pipe *, nni_listener *); // nni_pipe_start is called by the socket to begin any startup activities // on the pipe before making it ready for use by protocols. For example, @@ -82,11 +88,11 @@ extern int nni_pipe_find(nni_pipe **, uint32_t); // nni_pipe_sock_id returns the socket id for the pipe (used by public API). extern uint32_t nni_pipe_sock_id(nni_pipe *); -// nni_pipe_ep_id returns the endpoint id for the pipe. -extern uint32_t nni_pipe_ep_id(nni_pipe *); +// nni_pipe_listener_id returns the listener id for the pipe (or 0 if none). +extern uint32_t nni_pipe_listener_id(nni_pipe *); -// nni_pipe_ep_mode returns the endpoint mode for the pipe. -extern int nni_pipe_ep_mode(nni_pipe *); +// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). +extern uint32_t nni_pipe_dialer_id(nni_pipe *); // nni_pipe_closed returns true if nni_pipe_close was called. // (This is used by the socket to determine if user closed the pipe diff --git a/src/core/socket.c b/src/core/socket.c index 4cf624e2..894e4fee 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,9 +82,10 @@ struct nni_socket { nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - nni_list s_ctxs; // active contexts (protected by global sock_lk) + nni_list s_listeners; // active listeners + nni_list s_dialers; // active dialers + nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global sock_lk) bool s_closing; // Socket is closing bool s_closed; // Socket closed, protected by global lock @@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); nni_pipe_sock_list_init(&s->s_pipes); - nni_ep_list_init(&s->s_eps); + nni_listener_list_init(&s->s_listeners); + nni_dialer_list_init(&s->s_dialers); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) int nni_sock_shutdown(nni_sock *sock) { - nni_pipe *pipe; - nni_ep * ep; - nni_ep * nep; - nni_ctx * ctx; - nni_ctx * nctx; + nni_pipe * pipe; + nni_dialer * d; + nni_dialer * nd; + nni_listener *l; + nni_listener *nl; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_shutdown(ep); + NNI_LIST_FOREACH (&sock->s_listeners, l) { + nni_listener_shutdown(l); } + NNI_LIST_FOREACH (&sock->s_dialers, d) { + nni_dialer_shutdown(d); + } + nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the endpoint list, attempting to close them. + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nep = nni_list_first(&sock->s_eps); - while ((ep = nep) != NULL) { - nep = nni_list_next(&sock->s_eps, nep); + nl = nni_list_first(&sock->s_listeners); + while ((l = nl) != NULL) { + nl = nni_list_next(&sock->s_listeners, nl); + + if (nni_listener_hold(l) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); + } + } + nd = nni_list_first(&sock->s_dialers); + while ((d = nd) != NULL) { + nd = nni_list_next(&sock->s_dialers, nd); - if (nni_ep_hold(ep) == 0) { + if (nni_dialer_hold(d) == 0) { nni_mtx_unlock(&sock->s_mx); - nni_ep_close(ep); + nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } } @@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock) // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_eps))) { + (!nni_list_empty(&sock->s_listeners)) || + (!nni_list_empty(&sock->s_dialers))) { nni_cv_wait(&sock->s_cv); } @@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s) // Wait for pipes, eps, and contexts to finish closing. nni_mtx_lock(&s->s_mx); - while ( - (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { + while ((!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_dialers)) || + (!nni_list_empty(&s->s_listeners))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *s, nni_ep *ep) +nni_sock_add_listener(nni_sock *s, nni_listener *l) +{ + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_listener_setopt( + l, sopt->name, sopt->data, sopt->sz, sopt->typ); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + + nni_list_append(&s->s_listeners, l); + nni_mtx_unlock(&s->s_mx); + return (0); +} + +int +nni_sock_add_dialer(nni_sock *s, nni_dialer *d) { nni_sockopt *sopt; @@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep) NNI_LIST_FOREACH (&s->s_options, sopt) { int rv; - rv = nni_ep_setopt( - ep, sopt->name, sopt->data, sopt->sz, sopt->typ); + rv = nni_dialer_setopt( + d, sopt->name, sopt->data, sopt->sz, sopt->typ); if ((rv != 0) && (rv != NNG_ENOTSUP)) { nni_mtx_unlock(&s->s_mx); return (rv); } } - nni_list_append(&s->s_eps, ep); + nni_list_append(&s->s_dialers, d); nni_mtx_unlock(&s->s_mx); return (0); } void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) +nni_sock_remove_listener(nni_sock *s, nni_listener *l) { - nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_eps, ep)) { - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_listeners, l)) { + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); } } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) +{ + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_dialers, d)) { + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + } + nni_mtx_unlock(&s->s_mx); } int @@ -948,7 +1007,8 @@ nni_sock_setopt( nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t) { int rv = NNG_ENOTSUP; - nni_ep * ep; + nni_dialer * d; + nni_listener * l; nni_sockopt * optv; nni_sockopt * oldv = NULL; const sock_option * sso; @@ -1042,9 +1102,20 @@ nni_sock_setopt( // transport (other than ENOTSUP) stops the operation // altogether. Its important that transport wide checks // properly pre-validate. - NNI_LIST_FOREACH (&s->s_eps, ep) { + NNI_LIST_FOREACH (&s->s_listeners, l) { + int x; + x = nni_listener_setopt(l, optv->name, optv->data, sz, t); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + NNI_LIST_FOREACH (&s->s_dialers, d) { int x; - x = nni_ep_setopt(ep, optv->name, optv->data, sz, t); + x = nni_dialer_setopt(d, optv->name, optv->data, sz, t); if (x != NNG_ENOTSUP) { if ((rv = x) != 0) { nni_mtx_unlock(&s->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 7c10b195..184cfb64 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,8 +46,11 @@ extern uint32_t nni_sock_id(nni_sock *); extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_ep_add(nni_sock *, nni_ep *); -extern void nni_sock_ep_remove(nni_sock *, nni_ep *); +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); + +extern int nni_sock_add_listener(nni_sock *, nni_listener *); +extern void nni_sock_remove_listener(nni_sock *, nni_listener *); // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since diff --git a/src/core/transport.c b/src/core/transport.c index 4733b6bd..8485d048 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -118,12 +118,28 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ) nni_mtx_lock(&nni_tran_lk); NNI_LIST_FOREACH (&nni_tran_list, t) { - const nni_tran_ep_ops *ep; - const nni_tran_option *o; + const nni_tran_dialer_ops * dops; + const nni_tran_listener_ops *lops; + const nni_tran_option * o; + + // Generally we look for endpoint options. We check both + // dialers and listeners. + dops = t->t_tran.tran_dialer; + for (o = dops->d_options; o && o->o_name != NULL; o++) { + if (strcmp(name, o->o_name) != 0) { + continue; + } + if (o->o_set == NULL) { + nni_mtx_unlock(&nni_tran_lk); + return (NNG_EREADONLY); + } - // Generally we look for endpoint options. - ep = t->t_tran.tran_ep; - for (o = ep->ep_options; o && o->o_name != NULL; o++) { + rv = (o->o_chk != NULL) ? o->o_chk(v, sz, typ) : 0; + nni_mtx_unlock(&nni_tran_lk); + return (rv); + } + lops = t->t_tran.tran_listener; + for (o = lops->l_options; o && o->o_name != NULL; o++) { if (strcmp(name, o->o_name) != 0) { continue; } diff --git a/src/core/transport.h b/src/core/transport.h index e45aa7ec..257d232d 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -11,30 +11,11 @@ #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -// Transport implementation details. Transports must implement the -// interfaces in this file. -struct nni_tran { - // tran_version is the version of the transport ops that this - // transport implements. We only bother to version the main - // ops vector. - uint32_t tran_version; - - // tran_scheme is the transport scheme, such as "tcp" or "inproc". - const char *tran_scheme; - - // tran_ep links our endpoint-specific operations. - const nni_tran_ep_ops *tran_ep; - - // tran_pipe links our pipe-specific operations. - const nni_tran_pipe_ops *tran_pipe; - - // tran_init, if not NULL, is called once during library - // initialization. - int (*tran_init)(void); - - // tran_fini, if not NULL, is called during library deinitialization. - // It should release any global resources, close any open files, etc. - void (*tran_fini)(void); +// Endpoint modes. Currently used by transports. Remove this when we make +// transport dialers and listeners explicit. +enum nni_ep_mode { + NNI_EP_MODE_DIAL = 1, + NNI_EP_MODE_LISTEN = 2, }; // We quite intentionally use a signature where the upper word is nonzero, @@ -48,7 +29,8 @@ struct nni_tran { #define NNI_TRANSPORT_V0 0x54520000 #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V2 +#define NNI_TRANSPORT_V3 0x54520003 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 // Option handlers. struct nni_tran_option { @@ -81,40 +63,60 @@ struct nni_tran_option { // For a given endpoint, the framework holds a lock so that each entry // point is run exclusively of the others. (Transports must still guard // against any asynchronous operations they manage themselves, though.) -struct nni_tran_ep_ops { - // ep_init creates a vanilla endpoint. The value created is - // used for the first argument for all other endpoint - // functions. - int (*ep_init)(void **, nni_url *, nni_sock *, int); - // ep_fini frees the resources associated with the endpoint. - // The endpoint will already have been closed. - void (*ep_fini)(void *); +struct nni_tran_dialer_ops { + // d_init creates a vanilla dialer. The value created is + // used for the first argument for all other dialer functions. + int (*d_init)(void **, nni_url *, nni_sock *); + + // d_fini frees the resources associated with the dialer. + // The dialer will already have been closed. + void (*d_fini)(void *); - // ep_connect establishes a connection. It can return errors + // d_connect establishes a connection. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, // NNG_ECONNFAILED, NNG_ETIMEDOUT, and NNG_EPROTO. - void (*ep_connect)(void *, nni_aio *); + void (*d_connect)(void *, nni_aio *); + + // d_close stops the dialer from operating altogether. It + // does not affect pipes that have already been created. It is + // nonblocking. + void (*d_close)(void *); + + // d_options is an array of dialer options. The final + // element must have a NULL name. If this member is NULL, then + // no dialer specific options are available. + nni_tran_option *d_options; +}; + +struct nni_tran_listener_ops { + // l_init creates a vanilla listener. The value created is + // used for the first argument for all other listener functions. + int (*l_init)(void **, nni_url *, nni_sock *); - // ep_bind just does the bind() and listen() work, + // l_fini frees the resources associated with the listener. + // The listener will already have been closed. + void (*l_fini)(void *); + + // l_bind just does the bind() and listen() work, // reserving the address but not creating any connections. // It should return NNG_EADDRINUSE if the address is already // taken. It can also return NNG_EBADADDR for an unsuitable // address, or NNG_EACCESS for permission problems. - int (*ep_bind)(void *); + int (*l_bind)(void *); - // ep_accept accepts an inbound connection. - void (*ep_accept)(void *, nni_aio *); + // l_accept accepts an inbound connection. + void (*l_accept)(void *, nni_aio *); - // ep_close stops the endpoint from operating altogether. It + // l_close stops the listener from operating altogether. It // does not affect pipes that have already been created. It is // nonblocking. - void (*ep_close)(void *); + void (*l_close)(void *); - // ep_options is an array of endpoint options. The final + // l_options is an array of listener options. The final // element must have a NULL name. If this member is NULL, then - // no transport specific options are available. - nni_tran_option *ep_options; + // no dialer specific options are available. + nni_tran_option *l_options; }; // Pipe operations are entry points called by the socket. These may be @@ -168,6 +170,35 @@ struct nni_tran_pipe_ops { nni_tran_option *p_options; }; +// Transport implementation details. Transports must implement the +// interfaces in this file. +struct nni_tran { + // tran_version is the version of the transport ops that this + // transport implements. We only bother to version the main + // ops vector. + uint32_t tran_version; + + // tran_scheme is the transport scheme, such as "tcp" or "inproc". + const char *tran_scheme; + + // tran_dialer links our dialer-specific operations. + const nni_tran_dialer_ops *tran_dialer; + + // tran_listener links our listener-specific operations. + const nni_tran_listener_ops *tran_listener; + + // tran_pipe links our pipe-specific operations. + const nni_tran_pipe_ops *tran_pipe; + + // tran_init, if not NULL, is called once during library + // initialization. + int (*tran_init)(void); + + // tran_fini, if not NULL, is called during library deinitialization. + // It should release any global resources, close any open files, etc. + void (*tran_fini)(void); +}; + // These APIs are used by the framework internally, and not for use by // transport implementations. extern nni_tran *nni_tran_find(nni_url *); @@ -316,7 +316,7 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio) } static int -nng_ctx_getx(nng_ctx id, const char *n, void *v, size_t *szp, int t) +nng_ctx_getx(nng_ctx id, const char *n, void *v, size_t *szp, nni_opt_type t) { nni_ctx *ctx; int rv; @@ -367,7 +367,8 @@ nng_ctx_getopt_ms(nng_ctx id, const char *name, nng_duration *vp) } static int -nng_ctx_setx(nng_ctx id, const char *n, const void *v, size_t sz, int t) +nng_ctx_setx( + nng_ctx id, const char *n, const void *v, size_t sz, nni_opt_type t) { nni_ctx *ctx; int rv; @@ -414,97 +415,97 @@ nng_ctx_setopt_ms(nng_ctx id, const char *name, nng_duration v) } int -nng_dial(nng_socket s, const char *addr, nng_dialer *dp, int flags) +nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { - nni_ep * ep; - int rv; - nni_sock *sock; + nni_dialer *d; + int rv; + nni_sock * s; - if ((rv = nni_sock_find(&sock, s.id)) != 0) { + if ((rv = nni_sock_find(&s, sid.id)) != 0) { return (rv); } - if ((rv = nni_ep_create_dialer(&ep, sock, addr)) != 0) { - nni_sock_rele(sock); + if ((rv = nni_dialer_create(&d, s, addr)) != 0) { + nni_sock_rele(s); return (rv); } - if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_close(ep); - nni_sock_rele(sock); + if ((rv = nni_dialer_start(d, flags)) != 0) { + nni_dialer_close(d); + nni_sock_rele(s); return (rv); } if (dp != NULL) { - nng_dialer d; - d.id = nni_ep_id(ep); - *dp = d; + nng_dialer did; + did.id = nni_dialer_id(d); + *dp = did; } - nni_ep_rele(ep); - nni_sock_rele(sock); + nni_dialer_rele(d); + nni_sock_rele(s); return (0); } int -nng_listen(nng_socket s, const char *addr, nng_listener *lp, int flags) +nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags) { - nni_ep * ep; - int rv; - nni_sock *sock; + int rv; + nni_sock * s; + nni_listener *l; - if ((rv = nni_sock_find(&sock, s.id)) != 0) { + if ((rv = nni_sock_find(&s, sid.id)) != 0) { return (rv); } - if ((rv = nni_ep_create_listener(&ep, sock, addr)) != 0) { - nni_sock_rele(sock); + if ((rv = nni_listener_create(&l, s, addr)) != 0) { + nni_sock_rele(s); return (rv); } - if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_close(ep); - nni_sock_rele(sock); + if ((rv = nni_listener_start(l, flags)) != 0) { + nni_listener_close(l); + nni_sock_rele(s); return (rv); } if (lp != NULL) { - nng_listener l; - l.id = nni_ep_id(ep); - *lp = l; + nng_listener lid; + lid.id = nni_listener_id(l); + *lp = lid; } - nni_ep_rele(ep); - nni_sock_rele(sock); + nni_listener_rele(l); + nni_sock_rele(s); return (rv); } int -nng_listener_create(nng_listener *lp, nng_socket s, const char *addr) +nng_listener_create(nng_listener *lp, nng_socket sid, const char *addr) { - nni_sock * sock; - nni_ep * ep; - int rv; - nng_listener l; + nni_sock * s; + int rv; + nni_listener *l; + nng_listener lid; - if ((rv = nni_sock_find(&sock, s.id)) != 0) { + if ((rv = nni_sock_find(&s, sid.id)) != 0) { return (rv); } - if ((rv = nni_ep_create_listener(&ep, sock, addr)) != 0) { - nni_sock_rele(sock); + if ((rv = nni_listener_create(&l, s, addr)) != 0) { + nni_sock_rele(s); return (rv); } - l.id = nni_ep_id(ep); - *lp = l; - nni_ep_rele(ep); - nni_sock_rele(sock); + lid.id = nni_listener_id(l); + *lp = lid; + nni_listener_rele(l); + nni_sock_rele(s); return (0); } int -nng_listener_start(nng_listener l, int flags) +nng_listener_start(nng_listener lid, int flags) { - nni_ep *ep; - int rv; + nni_listener *l; + int rv; - if ((rv = nni_ep_find(&ep, l.id)) != 0) { + if ((rv = nni_listener_find(&l, lid.id)) != 0) { return (rv); } - rv = nni_ep_listen(ep, flags); - nni_ep_rele(ep); + rv = nni_listener_start(l, flags); + nni_listener_rele(l); return (rv); } @@ -515,38 +516,38 @@ nng_listener_id(nng_listener l) } int -nng_dialer_create(nng_dialer *dp, nng_socket s, const char *addr) +nng_dialer_create(nng_dialer *dp, nng_socket sid, const char *addr) { - nni_sock * sock; - nni_ep * ep; - int rv; - nng_dialer d; + nni_sock * s; + nni_dialer *d; + int rv; + nng_dialer did; - if ((rv = nni_sock_find(&sock, s.id)) != 0) { + if ((rv = nni_sock_find(&s, sid.id)) != 0) { return (rv); } - if ((rv = nni_ep_create_dialer(&ep, sock, addr)) != 0) { - nni_sock_rele(sock); + if ((rv = nni_dialer_create(&d, s, addr)) != 0) { + nni_sock_rele(s); return (rv); } - d.id = nni_ep_id(ep); - *dp = d; - nni_ep_rele(ep); - nni_sock_rele(sock); + did.id = nni_dialer_id(d); + *dp = did; + nni_dialer_rele(d); + nni_sock_rele(s); return (0); } int -nng_dialer_start(nng_dialer d, int flags) +nng_dialer_start(nng_dialer did, int flags) { - nni_ep *ep; - int rv; + nni_dialer *d; + int rv; - if ((rv = nni_ep_find(&ep, d.id)) != 0) { + if ((rv = nni_dialer_find(&d, did.id)) != 0) { return (rv); } - rv = nni_ep_dial(ep, flags); - nni_ep_rele(ep); + rv = nni_dialer_start(d, flags); + nni_dialer_rele(d); return (rv); } @@ -557,54 +558,41 @@ nng_dialer_id(nng_dialer d) } static int -nng_ep_setx( - uint32_t id, const char *n, const void *v, size_t sz, int mode, int t) +nng_dialer_setx( + nng_dialer did, const char *n, const void *v, size_t sz, nni_opt_type t) { - nni_ep *ep; - int rv; + nni_dialer *d; + int rv; if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_ep_find(&ep, id)) != 0) { + if ((rv = nni_dialer_find(&d, did.id)) != 0) { return (rv); } - if (nni_ep_mode(ep) == mode) { - rv = nni_ep_setopt(ep, n, v, sz, t); - } else { - rv = NNG_ENOENT; - } - nni_ep_rele(ep); + rv = nni_dialer_setopt(d, n, v, sz, t); + nni_dialer_rele(d); return (rv); } static int -nng_ep_getx(uint32_t id, const char *n, void *v, size_t *szp, int mode, int t) +nng_dialer_getx( + nng_dialer did, const char *n, void *v, size_t *szp, nni_opt_type t) { - nni_ep *ep; - int rv; + nni_dialer *d; + int rv; if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_ep_find(&ep, id)) != 0) { + if ((rv = nni_dialer_find(&d, did.id)) != 0) { return (rv); } - if (nni_ep_mode(ep) == mode) { - rv = nni_ep_getopt(ep, n, v, szp, t); - } else { - rv = NNG_ENOENT; - } - nni_ep_rele(ep); + rv = nni_dialer_getopt(d, n, v, szp, t); + nni_dialer_rele(d); return (rv); } -static int -nng_dialer_setx(nng_dialer d, const char *nm, const void *v, size_t sz, int t) -{ - return (nng_ep_setx(d.id, nm, v, sz, NNI_EP_MODE_DIAL, t)); -} - int nng_dialer_setopt(nng_dialer d, const char *name, const void *v, size_t sz) { @@ -653,12 +641,6 @@ nng_dialer_setopt_string(nng_dialer d, const char *name, const char *v) return (nng_dialer_setx(d, name, v, strlen(v) + 1, NNI_TYPE_STRING)); } -static int -nng_dialer_getx(nng_dialer d, const char *n, void *v, size_t *szp, int t) -{ - return (nng_ep_getx(d.id, n, v, szp, NNI_EP_MODE_DIAL, t)); -} - int nng_dialer_getopt(nng_dialer d, const char *name, void *val, size_t *szp) { @@ -722,10 +704,21 @@ nng_dialer_getopt_ms(nng_dialer d, const char *name, nng_duration *vp) } int -nng_listener_setx( - nng_listener l, const char *name, const void *v, size_t sz, int t) +nng_listener_setx(nng_listener lid, const char *name, const void *v, size_t sz, + nni_opt_type t) { - return (nng_ep_setx(l.id, name, v, sz, NNI_EP_MODE_LISTEN, t)); + nni_listener *l; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_listener_find(&l, lid.id)) != 0) { + return (rv); + } + rv = nni_listener_setopt(l, name, v, sz, t); + nni_listener_rele(l); + return (rv); } int @@ -778,9 +771,20 @@ nng_listener_setopt_string(nng_listener l, const char *n, const char *v) int nng_listener_getx( - nng_listener l, const char *name, void *v, size_t *szp, int t) + nng_listener lid, const char *name, void *v, size_t *szp, nni_opt_type t) { - return (nng_ep_getx(l.id, name, v, szp, NNI_EP_MODE_LISTEN, t)); + nni_listener *l; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_listener_find(&l, lid.id)) != 0) { + return (rv); + } + rv = nni_listener_getopt(l, name, v, szp, t); + nni_listener_rele(l); + return (rv); } int @@ -846,38 +850,35 @@ nng_listener_getopt_ms(nng_listener l, const char *name, nng_duration *vp) return (nng_listener_getx(l, name, vp, &sz, NNI_TYPE_DURATION)); } -static int -nng_ep_close(uint32_t id, int mode) +int +nng_dialer_close(nng_dialer did) { - nni_ep *ep; - int rv; + nni_dialer *d; + int rv; - if ((rv = nni_ep_find(&ep, id)) != 0) { + if ((rv = nni_dialer_find(&d, did.id)) != 0) { return (rv); } - if (nni_ep_mode(ep) != mode) { - nni_ep_rele(ep); - return (NNG_ENOENT); - } - - nni_ep_close(ep); + nni_dialer_close(d); return (0); } int -nng_dialer_close(nng_dialer d) +nng_listener_close(nng_listener lid) { - return (nng_ep_close(d.id, NNI_EP_MODE_DIAL)); -} + nni_listener *l; + int rv; -int -nng_listener_close(nng_listener l) -{ - return (nng_ep_close(l.id, NNI_EP_MODE_LISTEN)); + if ((rv = nni_listener_find(&l, lid.id)) != 0) { + return (rv); + } + nni_listener_close(l); + return (0); } static int -nng_setx(nng_socket s, const char *name, const void *val, size_t sz, int t) +nng_setx( + nng_socket s, const char *name, const void *val, size_t sz, nni_opt_type t) { nni_sock *sock; int rv; @@ -900,7 +901,8 @@ nng_setopt(nng_socket s, const char *name, const void *val, size_t sz) } static int -nng_getx(nng_socket s, const char *name, void *val, size_t *szp, int t) +nng_getx( + nng_socket s, const char *name, void *val, size_t *szp, nni_opt_type t) { nni_sock *sock; int rv; @@ -1130,7 +1132,8 @@ nng_strerror(int num) } static int -nng_pipe_getx(nng_pipe p, const char *name, void *val, size_t *szp, int t) +nng_pipe_getx( + nng_pipe p, const char *name, void *val, size_t *szp, nni_opt_type t) { int rv; nni_pipe *pipe; @@ -1227,9 +1230,7 @@ nng_pipe_dialer(nng_pipe p) nng_dialer d = NNG_DIALER_INITIALIZER; nni_pipe * pipe; if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) { - if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_DIAL) { - d.id = nni_pipe_ep_id(pipe); - } + d.id = nni_pipe_dialer_id(pipe); nni_pipe_rele(pipe); } return (d); @@ -1241,9 +1242,7 @@ nng_pipe_listener(nng_pipe p) nng_listener l = NNG_LISTENER_INITIALIZER; nni_pipe * pipe; if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) { - if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_LISTEN) { - l.id = nni_pipe_ep_id(pipe); - } + l.id = nni_pipe_listener_id(pipe); nni_pipe_rele(pipe); } return (l); diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 7a52d89f..db8aeff5 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -48,7 +48,7 @@ struct nni_inproc_pair { struct nni_inproc_ep { const char * addr; - int mode; + bool listener; nni_list_node node; uint16_t proto; nni_cv cv; @@ -189,7 +189,7 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) } static int -nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock) { nni_inproc_ep *ep; @@ -197,8 +197,26 @@ nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_ENOMEM); } - ep->mode = mode; - ep->proto = nni_sock_proto_id(sock); + ep->listener = false; + ep->proto = nni_sock_proto_id(sock); + NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); + nni_aio_list_init(&ep->aios); + + ep->addr = url->u_rawurl; // we match on the full URL. + *epp = ep; + return (0); +} +static int +nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock) +{ + nni_inproc_ep *ep; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + ep->listener = true; + ep->proto = nni_sock_proto_id(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -222,8 +240,7 @@ nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) nni_aio_list_remove(aio); - if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && - nni_list_empty(&ep->aios)) { + if ((ep != NULL) && (!ep->listener) && nni_list_empty(&ep->aios)) { nni_list_node_remove(&ep->node); } @@ -354,10 +371,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) if (nni_aio_begin(aio) != 0) { return; } - if (ep->mode != NNI_EP_MODE_DIAL) { - nni_aio_finish_error(aio, NNG_EINVAL); - return; - } + nni_mtx_lock(&nni_inproc.mx); // Find a server. @@ -468,25 +482,33 @@ static nni_tran_option nni_inproc_ep_options[] = { }, }; -static nni_tran_ep_ops nni_inproc_ep_ops = { - .ep_init = nni_inproc_ep_init, - .ep_fini = nni_inproc_ep_fini, - .ep_connect = nni_inproc_ep_connect, - .ep_bind = nni_inproc_ep_bind, - .ep_accept = nni_inproc_ep_accept, - .ep_close = nni_inproc_ep_close, - .ep_options = nni_inproc_ep_options, +static nni_tran_dialer_ops nni_inproc_dialer_ops = { + .d_init = nni_inproc_dialer_init, + .d_fini = nni_inproc_ep_fini, + .d_connect = nni_inproc_ep_connect, + .d_close = nni_inproc_ep_close, + .d_options = nni_inproc_ep_options, +}; + +static nni_tran_listener_ops nni_inproc_listener_ops = { + .l_init = nni_inproc_listener_init, + .l_fini = nni_inproc_ep_fini, + .l_bind = nni_inproc_ep_bind, + .l_accept = nni_inproc_ep_accept, + .l_close = nni_inproc_ep_close, + .l_options = nni_inproc_ep_options, }; // This is the inproc transport linkage, and should be the only global // symbol in this entire file. struct nni_tran nni_inproc_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "inproc", - .tran_ep = &nni_inproc_ep_ops, - .tran_pipe = &nni_inproc_pipe_ops, - .tran_init = nni_inproc_init, - .tran_fini = nni_inproc_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "inproc", + .tran_dialer = &nni_inproc_dialer_ops, + .tran_listener = &nni_inproc_listener_ops, + .tran_pipe = &nni_inproc_pipe_ops, + .tran_init = nni_inproc_init, + .tran_fini = nni_inproc_fini, }; int diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c5c7032a..b48b82d9 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -656,6 +656,18 @@ ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (0); } +static int +ipc_dialer_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); +} + +static int +ipc_listener_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (ipc_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); +} + static void ipc_ep_close(void *arg) { @@ -915,7 +927,26 @@ static nni_tran_pipe_ops ipc_pipe_ops = { .p_options = ipc_pipe_options, }; -static nni_tran_option ipc_ep_options[] = { +static nni_tran_option ipc_dialer_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = ipc_ep_get_recvmaxsz, + .o_set = ipc_ep_set_recvmaxsz, + .o_chk = ipc_ep_chk_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_type = NNI_TYPE_SOCKADDR, + .o_get = ipc_ep_get_addr, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_tran_option ipc_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, @@ -948,23 +979,31 @@ static nni_tran_option ipc_ep_options[] = { }, }; -static nni_tran_ep_ops ipc_ep_ops = { - .ep_init = ipc_ep_init, - .ep_fini = ipc_ep_fini, - .ep_connect = ipc_ep_connect, - .ep_bind = ipc_ep_bind, - .ep_accept = ipc_ep_accept, - .ep_close = ipc_ep_close, - .ep_options = ipc_ep_options, +static nni_tran_dialer_ops ipc_dialer_ops = { + .d_init = ipc_dialer_init, + .d_fini = ipc_ep_fini, + .d_connect = ipc_ep_connect, + .d_close = ipc_ep_close, + .d_options = ipc_dialer_options, +}; + +static nni_tran_listener_ops ipc_listener_ops = { + .l_init = ipc_listener_init, + .l_fini = ipc_ep_fini, + .l_bind = ipc_ep_bind, + .l_accept = ipc_ep_accept, + .l_close = ipc_ep_close, + .l_options = ipc_listener_options, }; static nni_tran ipc_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "ipc", - .tran_ep = &ipc_ep_ops, - .tran_pipe = &ipc_pipe_ops, - .tran_init = ipc_tran_init, - .tran_fini = ipc_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ipc", + .tran_dialer = &ipc_dialer_ops, + .tran_listener = &ipc_listener_ops, + .tran_pipe = &ipc_pipe_ops, + .tran_init = ipc_tran_init, + .tran_fini = ipc_tran_fini, }; int diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index f23d5b3a..1a183ecd 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -56,7 +56,6 @@ struct tcp_ep { nni_aio * user_aio; nni_url * url; nng_sockaddr bsa; // bound addr - int mode; nni_mtx mtx; }; @@ -688,7 +687,6 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (rv); } ep->proto = nni_sock_proto_id(sock); - ep->mode = mode; ep->nodelay = true; ep->keepalive = false; @@ -696,6 +694,18 @@ tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (0); } +static int +tcp_dialer_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); +} + +static int +tcp_listener_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (tcp_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); +} + static void tcp_ep_close(void *arg) { @@ -897,16 +907,21 @@ tcp_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tcp_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tcp_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tcp_ep *ep = arg; + + return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); +} + +static int +tcp_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { tcp_ep *ep = arg; char ustr[128]; char ipstr[48]; // max for IPv6 addresses including [] char portstr[6]; // max for 16-bit port - if (ep->mode == NNI_EP_MODE_DIAL) { - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); - } nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); snprintf(ustr, sizeof(ustr), "tcp://%s:%s", ipstr, portstr); return (nni_copyout_str(ustr, v, szp, t)); @@ -957,7 +972,40 @@ static nni_tran_pipe_ops tcp_pipe_ops = { .p_options = tcp_pipe_options, }; -static nni_tran_option tcp_ep_options[] = { +static nni_tran_option tcp_dialer_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = tcp_ep_get_recvmaxsz, + .o_set = tcp_ep_set_recvmaxsz, + .o_chk = tcp_ep_chk_recvmaxsz, + }, + { + .o_name = NNG_OPT_URL, + .o_type = NNI_TYPE_STRING, + .o_get = tcp_dialer_get_url, + }, + { + .o_name = NNG_OPT_TCP_NODELAY, + .o_type = NNI_TYPE_BOOL, + .o_get = tcp_ep_get_nodelay, + .o_set = tcp_ep_set_nodelay, + .o_chk = tcp_ep_chk_bool, + }, + { + .o_name = NNG_OPT_TCP_KEEPALIVE, + .o_type = NNI_TYPE_BOOL, + .o_get = tcp_ep_get_keepalive, + .o_set = tcp_ep_set_keepalive, + .o_chk = tcp_ep_chk_bool, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_tran_option tcp_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, @@ -968,7 +1016,7 @@ static nni_tran_option tcp_ep_options[] = { { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tcp_ep_get_url, + .o_get = tcp_listener_get_url, }, { .o_name = NNG_OPT_TCP_NODELAY, @@ -990,41 +1038,51 @@ static nni_tran_option tcp_ep_options[] = { }, }; -static nni_tran_ep_ops tcp_ep_ops = { - .ep_init = tcp_ep_init, - .ep_fini = tcp_ep_fini, - .ep_connect = tcp_ep_connect, - .ep_bind = tcp_ep_bind, - .ep_accept = tcp_ep_accept, - .ep_close = tcp_ep_close, - .ep_options = tcp_ep_options, +static nni_tran_dialer_ops tcp_dialer_ops = { + .d_init = tcp_dialer_init, + .d_fini = tcp_ep_fini, + .d_connect = tcp_ep_connect, + .d_close = tcp_ep_close, + .d_options = tcp_dialer_options, +}; + +static nni_tran_listener_ops tcp_listener_ops = { + .l_init = tcp_listener_init, + .l_fini = tcp_ep_fini, + .l_bind = tcp_ep_bind, + .l_accept = tcp_ep_accept, + .l_close = tcp_ep_close, + .l_options = tcp_listener_options, }; static nni_tran tcp_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tcp", - .tran_ep = &tcp_ep_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tcp", + .tran_dialer = &tcp_dialer_ops, + .tran_listener = &tcp_listener_ops, + .tran_pipe = &tcp_pipe_ops, + .tran_init = tcp_tran_init, + .tran_fini = tcp_tran_fini, }; static nni_tran tcp4_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tcp4", - .tran_ep = &tcp_ep_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tcp4", + .tran_dialer = &tcp_dialer_ops, + .tran_listener = &tcp_listener_ops, + .tran_pipe = &tcp_pipe_ops, + .tran_init = tcp_tran_init, + .tran_fini = tcp_tran_fini, }; static nni_tran tcp6_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tcp6", - .tran_ep = &tcp_ep_ops, - .tran_pipe = &tcp_pipe_ops, - .tran_init = tcp_tran_init, - .tran_fini = tcp_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tcp6", + .tran_dialer = &tcp_dialer_ops, + .tran_listener = &tcp_listener_ops, + .tran_pipe = &tcp_pipe_ops, + .tran_init = tcp_tran_init, + .tran_fini = tcp_tran_fini, }; int diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 35f88e25..b4f555da 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -691,7 +691,6 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } nni_mtx_init(&ep->mtx); ep->url = url; - ep->mode = mode; ep->keepalive = false; ep->nodelay = true; @@ -715,6 +714,18 @@ tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (0); } +static int +tls_dialer_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (tls_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); +} + +static int +tls_listener_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (tls_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); +} + static void tls_ep_close(void *arg) { @@ -891,16 +902,21 @@ tls_ep_get_keepalive(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -tls_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +tls_dialer_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + tls_ep *ep = arg; + + return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); +} + +static int +tls_listener_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) { tls_ep *ep = arg; char ustr[128]; char ipstr[48]; // max for IPv6 addresses including [] char portstr[6]; // max for 16-bit port - if (ep->mode == NNI_EP_MODE_DIAL) { - return (nni_copyout_str(ep->url->u_rawurl, v, szp, t)); - } nni_plat_tcp_ntop(&ep->bsa, ipstr, portstr); snprintf(ustr, sizeof(ustr), "tls+tcp://%s:%s", ipstr, portstr); return (nni_copyout_str(ustr, v, szp, t)); @@ -1095,7 +1111,7 @@ static nni_tran_pipe_ops tls_pipe_ops = { .p_options = tls_pipe_options, }; -static nni_tran_option tls_ep_options[] = { +static nni_tran_option tls_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, @@ -1106,7 +1122,7 @@ static nni_tran_option tls_ep_options[] = { { .o_name = NNG_OPT_URL, .o_type = NNI_TYPE_STRING, - .o_get = tls_ep_get_url, + .o_get = tls_dialer_get_url, }, { .o_name = NNG_OPT_TLS_CONFIG, @@ -1159,41 +1175,115 @@ static nni_tran_option tls_ep_options[] = { }, }; -static nni_tran_ep_ops tls_ep_ops = { - .ep_init = tls_ep_init, - .ep_fini = tls_ep_fini, - .ep_connect = tls_ep_connect, - .ep_bind = tls_ep_bind, - .ep_accept = tls_ep_accept, - .ep_close = tls_ep_close, - .ep_options = tls_ep_options, +static nni_tran_option tls_listener_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = tls_ep_get_recvmaxsz, + .o_set = tls_ep_set_recvmaxsz, + .o_chk = tls_ep_chk_recvmaxsz, + }, + { + .o_name = NNG_OPT_URL, + .o_type = NNI_TYPE_STRING, + .o_get = tls_listener_get_url, + }, + { + .o_name = NNG_OPT_TLS_CONFIG, + .o_type = NNI_TYPE_POINTER, + .o_get = tls_ep_get_config, + .o_set = tls_ep_set_config, + .o_chk = tls_ep_chk_config, + }, + { + .o_name = NNG_OPT_TLS_CERT_KEY_FILE, + .o_type = NNI_TYPE_STRING, + .o_set = tls_ep_set_cert_key_file, + .o_chk = tls_ep_chk_string, + }, + { + .o_name = NNG_OPT_TLS_CA_FILE, + .o_type = NNI_TYPE_STRING, + .o_set = tls_ep_set_ca_file, + .o_chk = tls_ep_chk_string, + }, + { + .o_name = NNG_OPT_TLS_AUTH_MODE, + .o_type = NNI_TYPE_INT32, // enum really + .o_set = tls_ep_set_auth_mode, + .o_chk = tls_ep_chk_auth_mode, + }, + { + .o_name = NNG_OPT_TLS_SERVER_NAME, + .o_type = NNI_TYPE_STRING, + .o_set = tls_ep_set_server_name, + .o_chk = tls_ep_chk_string, + }, + { + .o_name = NNG_OPT_TCP_NODELAY, + .o_type = NNI_TYPE_BOOL, + .o_get = tls_ep_get_nodelay, + .o_set = tls_ep_set_nodelay, + .o_chk = tls_ep_chk_bool, + }, + { + .o_name = NNG_OPT_TCP_KEEPALIVE, + .o_type = NNI_TYPE_BOOL, + .o_get = tls_ep_get_keepalive, + .o_set = tls_ep_set_keepalive, + .o_chk = tls_ep_chk_bool, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_tran_dialer_ops tls_dialer_ops = { + .d_init = tls_dialer_init, + .d_fini = tls_ep_fini, + .d_connect = tls_ep_connect, + .d_close = tls_ep_close, + .d_options = tls_dialer_options, +}; + +static nni_tran_listener_ops tls_listener_ops = { + .l_init = tls_listener_init, + .l_fini = tls_ep_fini, + .l_bind = tls_ep_bind, + .l_accept = tls_ep_accept, + .l_close = tls_ep_close, + .l_options = tls_listener_options, }; static nni_tran tls_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tls+tcp", - .tran_ep = &tls_ep_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tls+tcp", + .tran_dialer = &tls_dialer_ops, + .tran_listener = &tls_listener_ops, + .tran_pipe = &tls_pipe_ops, + .tran_init = tls_tran_init, + .tran_fini = tls_tran_fini, }; static nni_tran tls4_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tls+tcp4", - .tran_ep = &tls_ep_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tls+tcp4", + .tran_dialer = &tls_dialer_ops, + .tran_listener = &tls_listener_ops, + .tran_pipe = &tls_pipe_ops, + .tran_init = tls_tran_init, + .tran_fini = tls_tran_fini, }; static nni_tran tls6_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "tls+tcp6", - .tran_ep = &tls_ep_ops, - .tran_pipe = &tls_pipe_ops, - .tran_init = tls_tran_init, - .tran_fini = tls_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "tls+tcp6", + .tran_dialer = &tls_dialer_ops, + .tran_listener = &tls_listener_ops, + .tran_pipe = &tls_pipe_ops, + .tran_init = tls_tran_init, + .tran_fini = tls_tran_fini, }; int diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 19cc347c..2fa0fd67 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -21,8 +21,9 @@ #include "websocket.h" -typedef struct ws_ep ws_ep; -typedef struct ws_pipe ws_pipe; +typedef struct ws_dialer ws_dialer; +typedef struct ws_listener ws_listener; +typedef struct ws_pipe ws_pipe; typedef struct ws_hdr { nni_list_node node; @@ -30,26 +31,35 @@ typedef struct ws_hdr { char * value; } ws_hdr; -struct ws_ep { - int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN +struct ws_dialer { + uint16_t lproto; // local protocol + uint16_t rproto; // remote protocol + size_t rcvmax; + char * prname; + nni_list aios; + nni_mtx mtx; + nni_aio * connaio; + nni_ws_dialer *dialer; + nni_list headers; // req headers + bool started; +}; + +struct ws_listener { uint16_t lproto; // local protocol uint16_t rproto; // remote protocol size_t rcvmax; - char * protoname; + char * prname; nni_list aios; nni_mtx mtx; - nni_aio * connaio; nni_aio * accaio; nni_ws_listener *listener; - nni_ws_dialer * dialer; - nni_list headers; // to send, res or req + nni_list headers; // res headers bool started; }; struct ws_pipe { - int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN nni_mtx mtx; - size_t rcvmax; // inherited from EP + size_t rcvmax; bool closed; uint16_t rproto; uint16_t lproto; @@ -220,7 +230,7 @@ ws_pipe_close(void *arg) } static int -ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) +ws_pipe_init(ws_pipe **pipep, void *ws) { ws_pipe *p; int rv; @@ -236,12 +246,7 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) ws_pipe_fini(p); return (rv); } - - p->mode = ep->mode; - p->rcvmax = ep->rcvmax; - p->rproto = ep->rproto; - p->lproto = ep->lproto; - p->ws = ws; + p->ws = ws; *pipep = p; return (0); @@ -261,14 +266,14 @@ ws_pipe_peer(void *arg) static int ws_hook(void *arg, nni_http_req *req, nni_http_res *res) { - ws_ep * ep = arg; - ws_hdr *h; + ws_listener *l = arg; + ws_hdr * h; NNI_ARG_UNUSED(req); // Eventually we'll want user customizable hooks. // For now we just set the headers we want. - NNI_LIST_FOREACH (&ep->headers, h) { + NNI_LIST_FOREACH (&l->headers, h) { int rv; rv = nng_http_res_set_header(res, h->name, h->value); if (rv != 0) { @@ -279,38 +284,38 @@ ws_hook(void *arg, nni_http_req *req, nni_http_res *res) } static int -ws_ep_bind(void *arg) +ws_listener_bind(void *arg) { - ws_ep *ep = arg; - int rv; + ws_listener *l = arg; + int rv; - nni_ws_listener_set_maxframe(ep->listener, ep->rcvmax); - nni_ws_listener_hook(ep->listener, ws_hook, ep); + nni_ws_listener_set_maxframe(l->listener, l->rcvmax); + nni_ws_listener_hook(l->listener, ws_hook, l); - if ((rv = nni_ws_listener_listen(ep->listener)) == 0) { - ep->started = true; + if ((rv = nni_ws_listener_listen(l->listener)) == 0) { + l->started = true; } return (rv); } static void -ws_ep_cancel(nni_aio *aio, int rv) +ws_listener_cancel(nni_aio *aio, int rv) { - ws_ep *ep = nni_aio_get_prov_data(aio); + ws_listener *l = nni_aio_get_prov_data(aio); - nni_mtx_lock(&ep->mtx); + nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } - nni_mtx_unlock(&ep->mtx); + nni_mtx_unlock(&l->mtx); } static void -ws_ep_accept(void *arg, nni_aio *aio) +ws_listener_accept(void *arg, nni_aio *aio) { - ws_ep *ep = arg; - int rv; + ws_listener *l = arg; + int rv; // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. @@ -318,33 +323,46 @@ ws_ep_accept(void *arg, nni_aio *aio) if (nni_aio_begin(aio) != 0) { return; } - nni_mtx_lock(&ep->mtx); - if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&l->mtx); + if ((rv = nni_aio_schedule(aio, ws_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&ep->aios, aio); - if (aio == nni_list_first(&ep->aios)) { - nni_ws_listener_accept(ep->listener, ep->accaio); + nni_list_append(&l->aios, aio); + if (aio == nni_list_first(&l->aios)) { + nni_ws_listener_accept(l->listener, l->accaio); + } + nni_mtx_unlock(&l->mtx); +} + +static void +ws_dialer_cancel(nni_aio *aio, int rv) +{ + ws_dialer *d = nni_aio_get_prov_data(aio); + + nni_mtx_lock(&d->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); } - nni_mtx_unlock(&ep->mtx); + nni_mtx_unlock(&d->mtx); } static void -ws_ep_connect(void *arg, nni_aio *aio) +ws_dialer_connect(void *arg, nni_aio *aio) { - ws_ep *ep = arg; - int rv; + ws_dialer *d = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } - if (!ep->started) { + if (!d->started) { ws_hdr *h; - NNI_LIST_FOREACH (&ep->headers, h) { - int rv = nni_ws_dialer_header( - ep->dialer, h->name, h->value); + NNI_LIST_FOREACH (&d->headers, h) { + int rv = + nni_ws_dialer_header(d->dialer, h->name, h->value); if (rv != 0) { nni_aio_finish_error(aio, rv); return; @@ -352,22 +370,22 @@ ws_ep_connect(void *arg, nni_aio *aio) } } - nni_mtx_lock(&ep->mtx); - if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + nni_mtx_lock(&d->mtx); + if ((rv = nni_aio_schedule(aio, ws_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); return; } - NNI_ASSERT(nni_list_empty(&ep->aios)); - ep->started = true; - nni_list_append(&ep->aios, aio); - nni_ws_dialer_set_maxframe(ep->dialer, ep->rcvmax); - nni_ws_dialer_dial(ep->dialer, ep->connaio); - nni_mtx_unlock(&ep->mtx); + NNI_ASSERT(nni_list_empty(&d->aios)); + d->started = true; + nni_list_append(&d->aios, aio); + nni_ws_dialer_set_maxframe(d->dialer, d->rcvmax); + nni_ws_dialer_dial(d->dialer, d->connaio); + nni_mtx_unlock(&d->mtx); } static int -ws_ep_chk_string(const void *v, size_t sz, nni_opt_type t) +ws_check_string(const void *v, size_t sz, nni_opt_type t) { if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { return (NNG_EBADTYPE); @@ -379,40 +397,59 @@ ws_ep_chk_string(const void *v, size_t sz, nni_opt_type t) } static int -ws_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +ws_dialer_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep *ep = arg; - size_t val; - int rv; + ws_dialer *d = arg; + size_t val; + int rv; if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - nni_mtx_unlock(&ep->mtx); - if (ep->mode == NNI_EP_MODE_DIAL) { - nni_ws_dialer_set_maxframe(ep->dialer, val); - } else { - nni_ws_listener_set_maxframe(ep->listener, val); - } + nni_mtx_lock(&d->mtx); + d->rcvmax = val; + nni_mtx_unlock(&d->mtx); + nni_ws_dialer_set_maxframe(d->dialer, val); } return (rv); } static int -ws_ep_chk_recvmaxsz(const void *v, size_t sz, nni_opt_type t) +ws_dialer_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); + ws_dialer *d = arg; + return (nni_copyout_size(d->rcvmax, v, szp, t)); +} + +static int +ws_listener_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ws_listener *l = arg; + size_t val; + int rv; + + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->rcvmax = val; + nni_mtx_unlock(&l->mtx); + nni_ws_listener_set_maxframe(l->listener, val); + } + return (rv); +} + +static int +ws_listener_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + ws_listener *l = arg; + return (nni_copyout_size(l->rcvmax, v, szp, t)); } static int -ws_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +ws_check_recvmaxsz(const void *v, size_t sz, nni_opt_type t) { - ws_ep *ep = arg; - return (nni_copyout_size(ep->rcvmax, v, szp, t)); + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } static int -ws_ep_set_headers(ws_ep *ep, const char *v) +ws_set_headers(nni_list *headers, const char *v) { char * dupstr; size_t duplen; @@ -423,10 +460,6 @@ ws_ep_set_headers(ws_ep *ep, const char *v) ws_hdr * h; int rv; - if (ep->started) { - return (NNG_EBUSY); - } - NNI_LIST_INIT(&l, ws_hdr, node); if ((dupstr = nni_strdup(v)) == NULL) { return (NNG_ENOMEM); @@ -471,15 +504,15 @@ ws_ep_set_headers(ws_ep *ep, const char *v) name = nl; } - while ((h = nni_list_first(&ep->headers)) != NULL) { - nni_list_remove(&ep->headers, h); + while ((h = nni_list_first(headers)) != NULL) { + nni_list_remove(headers, h); nni_strfree(h->name); nni_strfree(h->value); NNI_FREE_STRUCT(h); } while ((h = nni_list_first(&l)) != NULL) { nni_list_remove(&l, h); - nni_list_append(&ep->headers, h); + nni_list_append(headers, h); } rv = 0; @@ -495,33 +528,32 @@ done: } static int -ws_ep_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t) +ws_dialer_set_reqhdrs(void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep *ep = arg; - int rv; + ws_dialer *d = arg; + int rv; - if ((rv = ws_ep_chk_string(v, sz, t)) == 0) { - if (ep->mode == NNI_EP_MODE_LISTEN) { - rv = NNG_EREADONLY; - } else { - rv = ws_ep_set_headers(ep, v); - } + if (d->started) { + return (NNG_EBUSY); + } + + if ((rv = ws_check_string(v, sz, t)) == 0) { + rv = ws_set_headers(&d->headers, v); } return (rv); } static int -ws_ep_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t) +ws_listener_set_reshdrs(void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep *ep = arg; - int rv; + ws_listener *l = arg; + int rv; - if ((rv = ws_ep_chk_string(v, sz, t)) == 0) { - if (ep->mode == NNI_EP_MODE_DIAL) { - rv = NNG_EREADONLY; - } else { - rv = ws_ep_set_headers(ep, v); - } + if (l->started) { + return (NNG_EBUSY); + } + if ((rv = ws_check_string(v, sz, t)) == 0) { + rv = ws_set_headers(&l->headers, v); } return (rv); } @@ -628,25 +660,39 @@ static nni_tran_pipe_ops ws_pipe_ops = { .p_options = ws_pipe_options, }; -static nni_tran_option ws_ep_options[] = { +static nni_tran_option ws_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ws_ep_get_recvmaxsz, - .o_set = ws_ep_set_recvmaxsz, - .o_chk = ws_ep_chk_recvmaxsz, + .o_get = ws_dialer_get_recvmaxsz, + .o_set = ws_dialer_set_recvmaxsz, + .o_chk = ws_check_recvmaxsz, }, { .o_name = NNG_OPT_WS_REQUEST_HEADERS, .o_type = NNI_TYPE_STRING, - .o_set = ws_ep_set_reqhdrs, - .o_chk = ws_ep_chk_string, + .o_set = ws_dialer_set_reqhdrs, + .o_chk = ws_check_string, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_tran_option ws_listener_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = ws_listener_get_recvmaxsz, + .o_set = ws_listener_set_recvmaxsz, + .o_chk = ws_check_recvmaxsz, }, { .o_name = NNG_OPT_WS_RESPONSE_HEADERS, .o_type = NNI_TYPE_STRING, - .o_set = ws_ep_set_reshdrs, - .o_chk = ws_ep_chk_string, + .o_set = ws_listener_set_reshdrs, + .o_chk = ws_check_string, }, // terminate list { @@ -655,93 +701,117 @@ static nni_tran_option ws_ep_options[] = { }; static void -ws_ep_fini(void *arg) +ws_dialer_fini(void *arg) { - ws_ep * ep = arg; - ws_hdr *hdr; + ws_dialer *d = arg; + ws_hdr * hdr; - nni_aio_stop(ep->accaio); - nni_aio_stop(ep->connaio); - if (ep->listener != NULL) { - nni_ws_listener_fini(ep->listener); + nni_aio_stop(d->connaio); + if (d->dialer != NULL) { + nni_ws_dialer_fini(d->dialer); } - if (ep->dialer != NULL) { - nni_ws_dialer_fini(ep->dialer); - } - nni_aio_fini(ep->accaio); - nni_aio_fini(ep->connaio); - while ((hdr = nni_list_first(&ep->headers)) != NULL) { - nni_list_remove(&ep->headers, hdr); + nni_aio_fini(d->connaio); + while ((hdr = nni_list_first(&d->headers)) != NULL) { + nni_list_remove(&d->headers, hdr); nni_strfree(hdr->name); nni_strfree(hdr->value); NNI_FREE_STRUCT(hdr); } - nni_strfree(ep->protoname); - nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); + nni_strfree(d->prname); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); } static void -ws_ep_conn_cb(void *arg) +ws_listener_fini(void *arg) { - ws_ep * ep = arg; - ws_pipe *p; - nni_aio *caio = ep->connaio; - nni_aio *uaio; - int rv; - nni_ws * ws = NULL; + ws_listener *l = arg; + ws_hdr * hdr; + + nni_aio_stop(l->accaio); + if (l->listener != NULL) { + nni_ws_listener_fini(l->listener); + } + nni_aio_fini(l->accaio); + while ((hdr = nni_list_first(&l->headers)) != NULL) { + nni_list_remove(&l->headers, hdr); + nni_strfree(hdr->name); + nni_strfree(hdr->value); + NNI_FREE_STRUCT(hdr); + } + nni_strfree(l->prname); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} - nni_mtx_lock(&ep->mtx); +static void +ws_connect_cb(void *arg) +{ + ws_dialer *d = arg; + ws_pipe * p; + nni_aio * caio = d->connaio; + nni_aio * uaio; + int rv; + nni_ws * ws = NULL; + + nni_mtx_lock(&d->mtx); if (nni_aio_result(caio) == 0) { ws = nni_aio_get_output(caio, 0); } - if ((uaio = nni_list_first(&ep->aios)) == NULL) { + if ((uaio = nni_list_first(&d->aios)) == NULL) { // The client stopped caring about this! if (ws != NULL) { nni_ws_fini(ws); } - nni_mtx_unlock(&ep->mtx); + nni_mtx_unlock(&d->mtx); return; } nni_aio_list_remove(uaio); - NNI_ASSERT(nni_list_empty(&ep->aios)); + NNI_ASSERT(nni_list_empty(&d->aios)); if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); - } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) { + } else if ((rv = ws_pipe_init(&p, ws)) != 0) { nni_ws_fini(ws); nni_aio_finish_error(uaio, rv); } else { + p->rcvmax = d->rcvmax; + p->rproto = d->rproto; + p->lproto = d->lproto; + nni_aio_set_output(uaio, 0, p); nni_aio_finish(uaio, 0, 0); } - nni_mtx_unlock(&ep->mtx); + nni_mtx_unlock(&d->mtx); } static void -ws_ep_close(void *arg) +ws_dialer_close(void *arg) { - ws_ep *ep = arg; + ws_dialer *d = arg; - nni_aio_close(ep->accaio); - nni_aio_close(ep->connaio); + nni_aio_close(d->connaio); + nni_ws_dialer_close(d->dialer); +} - if (ep->mode == NNI_EP_MODE_LISTEN) { - nni_ws_listener_close(ep->listener); - } else { - nni_ws_dialer_close(ep->dialer); - } +static void +ws_listener_close(void *arg) +{ + ws_listener *l = arg; + + nni_aio_close(l->accaio); + nni_ws_listener_close(l->listener); } static void -ws_ep_acc_cb(void *arg) +ws_accept_cb(void *arg) { - ws_ep * ep = arg; - nni_aio *aaio = ep->accaio; - nni_aio *uaio; - int rv; + ws_listener *l = arg; + nni_aio * aaio = l->accaio; + nni_aio * uaio; + int rv; - nni_mtx_lock(&ep->mtx); - uaio = nni_list_first(&ep->aios); + nni_mtx_lock(&l->mtx); + uaio = nni_list_first(&l->aios); if ((rv = nni_aio_result(aaio)) != 0) { if (uaio != NULL) { nni_aio_list_remove(uaio); @@ -753,72 +823,86 @@ ws_ep_acc_cb(void *arg) ws_pipe *p; // Make a pipe nni_aio_list_remove(uaio); - if ((rv = ws_pipe_init(&p, ep, ws)) != 0) { + if ((rv = ws_pipe_init(&p, ws)) != 0) { nni_ws_close(ws); nni_aio_finish_error(uaio, rv); } else { + p->rcvmax = l->rcvmax; + p->rproto = l->rproto; + p->lproto = l->lproto; + nni_aio_set_output(uaio, 0, p); nni_aio_finish(uaio, 0, 0); } } } - if (!nni_list_empty(&ep->aios)) { - nni_ws_listener_accept(ep->listener, aaio); + if (!nni_list_empty(&l->aios)) { + nni_ws_listener_accept(l->listener, aaio); } - nni_mtx_unlock(&ep->mtx); + nni_mtx_unlock(&l->mtx); } static int -ws_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +ws_dialer_init(void **dp, nni_url *url, nni_sock *s) { - ws_ep * ep; - const char *pname; + ws_dialer * d; + const char *n; int rv; - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->headers, ws_hdr, node); + nni_mtx_init(&d->mtx); + NNI_LIST_INIT(&d->headers, ws_hdr, node); - // List of pipes (server only). - nni_aio_list_init(&ep->aios); + nni_aio_list_init(&d->aios); - ep->mode = mode; - ep->lproto = nni_sock_proto_id(sock); - ep->rproto = nni_sock_peer_id(sock); + d->lproto = nni_sock_proto_id(s); + d->rproto = nni_sock_peer_id(s); + n = nni_sock_peer_name(s); - if (mode == NNI_EP_MODE_DIAL) { - pname = nni_sock_peer_name(sock); - rv = nni_ws_dialer_init(&ep->dialer, url); - } else { - pname = nni_sock_proto_name(sock); - rv = nni_ws_listener_init(&ep->listener, url); - } - - if ((rv != 0) || - ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) || - ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) != - 0)) { - ws_ep_fini(ep); + if (((rv = nni_ws_dialer_init(&d->dialer, url)) != 0) || + ((rv = nni_aio_init(&d->connaio, ws_connect_cb, d)) != 0) || + ((rv = nni_asprintf(&d->prname, "%s.sp.nanomsg.org", n)) != 0) || + ((rv = nni_ws_dialer_proto(d->dialer, d->prname)) != 0)) { + ws_dialer_fini(d); return (rv); } - if (mode == NNI_EP_MODE_DIAL) { - rv = nni_ws_dialer_proto(ep->dialer, ep->protoname); - } else { - rv = nni_ws_listener_proto(ep->listener, ep->protoname); + *dp = d; + return (0); +} + +static int +ws_listener_init(void **lp, nni_url *url, nni_sock *sock) +{ + ws_listener *l; + const char * n; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); } + nni_mtx_init(&l->mtx); + NNI_LIST_INIT(&l->headers, ws_hdr, node); + + nni_aio_list_init(&l->aios); + + l->lproto = nni_sock_proto_id(sock); + l->rproto = nni_sock_peer_id(sock); + n = nni_sock_proto_name(sock); - if (rv != 0) { - ws_ep_fini(ep); + if (((rv = nni_ws_listener_init(&l->listener, url)) != 0) || + ((rv = nni_aio_init(&l->accaio, ws_accept_cb, l)) != 0) || + ((rv = nni_asprintf(&l->prname, "%s.sp.nanomsg.org", n)) != 0) || + ((rv = nni_ws_listener_proto(l->listener, l->prname)) != 0)) { + ws_listener_fini(l); return (rv); } - - *epp = ep; + *lp = l; return (0); } + static int ws_tran_init(void) { @@ -830,23 +914,31 @@ ws_tran_fini(void) { } -static nni_tran_ep_ops ws_ep_ops = { - .ep_init = ws_ep_init, - .ep_fini = ws_ep_fini, - .ep_connect = ws_ep_connect, - .ep_bind = ws_ep_bind, - .ep_accept = ws_ep_accept, - .ep_close = ws_ep_close, - .ep_options = ws_ep_options, +static nni_tran_dialer_ops ws_dialer_ops = { + .d_init = ws_dialer_init, + .d_fini = ws_dialer_fini, + .d_connect = ws_dialer_connect, + .d_close = ws_dialer_close, + .d_options = ws_dialer_options, +}; + +static nni_tran_listener_ops ws_listener_ops = { + .l_init = ws_listener_init, + .l_fini = ws_listener_fini, + .l_bind = ws_listener_bind, + .l_accept = ws_listener_accept, + .l_close = ws_listener_close, + .l_options = ws_listener_options, }; static nni_tran ws_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "ws", - .tran_ep = &ws_ep_ops, - .tran_pipe = &ws_pipe_ops, - .tran_init = ws_tran_init, - .tran_fini = ws_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ws", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = ws_tran_init, + .tran_fini = ws_tran_fini, }; int @@ -858,25 +950,27 @@ nng_ws_register(void) #ifdef NNG_TRANSPORT_WSS static int -wss_get_tls(ws_ep *ep, nng_tls_config **tlsp) +wss_dialer_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t) { - switch (ep->mode) { - case NNI_EP_MODE_DIAL: - return (nni_ws_dialer_get_tls(ep->dialer, tlsp)); - case NNI_EP_MODE_LISTEN: - return (nni_ws_listener_get_tls(ep->listener, tlsp)); + ws_dialer * d = arg; + nng_tls_config *tls; + int rv; + + if (((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0) || + ((rv = nni_copyout_ptr(tls, v, szp, t)) != 0)) { + return (rv); } - return (NNG_EINVAL); + return (0); } static int -wss_ep_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t) +wss_listener_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t) { - ws_ep * ep = arg; + ws_listener * l = arg; nng_tls_config *tls; int rv; - if (((rv = wss_get_tls(ep, &tls)) != 0) || + if (((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0) || ((rv = nni_copyout_ptr(tls, v, szp, t)) != 0)) { return (rv); } @@ -884,7 +978,7 @@ wss_ep_get_tlsconfig(void *arg, void *v, size_t *szp, nni_opt_type t) } static int -wss_ep_chk_tlsconfig(const void *v, size_t sz, nni_opt_type t) +wss_check_tlsconfig(const void *v, size_t sz, nni_opt_type t) { void *p; int rv; @@ -895,9 +989,9 @@ wss_ep_chk_tlsconfig(const void *v, size_t sz, nni_opt_type t) } static int -wss_ep_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t) +wss_dialer_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep * ep = arg; + ws_dialer * d = arg; nng_tls_config *cfg; int rv; @@ -905,56 +999,114 @@ wss_ep_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t) return (rv); } if (cfg == NULL) { - // NULL is clearly invalid. return (NNG_EINVAL); } - if (ep->mode == NNI_EP_MODE_LISTEN) { - rv = nni_ws_listener_set_tls(ep->listener, cfg); - } else { - rv = nni_ws_dialer_set_tls(ep->dialer, cfg); + return (nni_ws_dialer_set_tls(d->dialer, cfg)); +} + +static int +wss_listener_set_tlsconfig(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ws_listener * l = arg; + nng_tls_config *cfg; + int rv; + + if ((rv = nni_copyin_ptr((void **) &cfg, v, sz, t)) != 0) { + return (rv); } - return (rv); + if (cfg == NULL) { + return (NNG_EINVAL); + } + return (nni_ws_listener_set_tls(l->listener, cfg)); } static int -wss_ep_set_cert_key_file(void *arg, const void *v, size_t sz, nni_opt_type t) +wss_dialer_set_cert_key_file( + void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep * ep = arg; + ws_dialer * d = arg; int rv; nng_tls_config *tls; - if (((rv = ws_ep_chk_string(v, sz, t)) != 0) || - ((rv = wss_get_tls(ep, &tls)) != 0)) { + if (((rv = ws_check_string(v, sz, t)) != 0) || + ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) { return (rv); } return (nng_tls_config_cert_key_file(tls, v, NULL)); } static int -wss_ep_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) +wss_listener_set_cert_key_file( + void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep * ep = arg; + ws_listener * l = arg; int rv; nng_tls_config *tls; - if (((rv = ws_ep_chk_string(v, sz, t)) != 0) || - ((rv = wss_get_tls(ep, &tls)) != 0)) { + if (((rv = ws_check_string(v, sz, t)) != 0) || + ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) { + return (rv); + } + return (nng_tls_config_cert_key_file(tls, v, NULL)); +} + +static int +wss_dialer_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ws_dialer * d = arg; + int rv; + nng_tls_config *tls; + + if (((rv = ws_check_string(v, sz, t)) != 0) || + ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) { return (rv); } return (nng_tls_config_ca_file(tls, v)); } static int -wss_ep_chk_auth_mode(const void *v, size_t sz, nni_opt_type t) +wss_listener_set_ca_file(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ws_listener * l = arg; + int rv; + nng_tls_config *tls; + + if (((rv = ws_check_string(v, sz, t)) != 0) || + ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) { + return (rv); + } + return (nng_tls_config_ca_file(tls, v)); +} + +static int +wss_check_auth_mode(const void *v, size_t sz, nni_opt_type t) { return (nni_copyin_int(NULL, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t)); } static int -wss_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) +wss_dialer_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + ws_dialer * d = arg; + int rv; + nng_tls_config *tls; + int mode; + + rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE, + NNG_TLS_AUTH_MODE_REQUIRED, t); + + if ((rv != 0) || + ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) { + return (rv); + } + return (nng_tls_config_auth_mode(tls, mode)); +} + +static int +wss_listener_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep * ep = arg; + ws_listener * l = arg; int rv; nng_tls_config *tls; int mode; @@ -962,77 +1114,73 @@ wss_ep_set_auth_mode(void *arg, const void *v, size_t sz, nni_opt_type t) rv = nni_copyin_int(&mode, v, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t); - if ((rv != 0) || ((rv = wss_get_tls(ep, &tls)) != 0)) { + if ((rv != 0) || + ((rv = nni_ws_listener_get_tls(l->listener, &tls)) != 0)) { return (rv); } return (nng_tls_config_auth_mode(tls, mode)); } static int -wss_ep_set_tls_server_name(void *arg, const void *v, size_t sz, nni_opt_type t) +wss_dialer_set_tls_server_name( + void *arg, const void *v, size_t sz, nni_opt_type t) { - ws_ep * ep = arg; + ws_dialer * d = arg; int rv; nng_tls_config *tls; - if (((rv = ws_ep_chk_string(v, sz, t)) != 0) || - ((rv = wss_get_tls(ep, &tls)) != 0)) { + if (((rv = ws_check_string(v, sz, t)) != 0) || + ((rv = nni_ws_dialer_get_tls(d->dialer, &tls)) != 0)) { return (rv); } return (nng_tls_config_server_name(tls, v)); } -static nni_tran_option wss_ep_options[] = { +static nni_tran_option wss_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_type = NNI_TYPE_SIZE, - .o_get = ws_ep_get_recvmaxsz, - .o_set = ws_ep_set_recvmaxsz, - .o_chk = ws_ep_chk_recvmaxsz, + .o_get = ws_dialer_get_recvmaxsz, + .o_set = ws_dialer_set_recvmaxsz, + .o_chk = ws_check_recvmaxsz, }, { .o_name = NNG_OPT_WS_REQUEST_HEADERS, .o_type = NNI_TYPE_STRING, - .o_set = ws_ep_set_reqhdrs, - .o_chk = ws_ep_chk_string, - }, - { - .o_name = NNG_OPT_WS_RESPONSE_HEADERS, - .o_type = NNI_TYPE_STRING, - .o_set = ws_ep_set_reshdrs, - .o_chk = ws_ep_chk_string, + .o_set = ws_dialer_set_reqhdrs, + .o_chk = ws_check_string, }, { .o_name = NNG_OPT_TLS_CONFIG, .o_type = NNI_TYPE_POINTER, - .o_get = wss_ep_get_tlsconfig, - .o_set = wss_ep_set_tlsconfig, - .o_chk = wss_ep_chk_tlsconfig, + .o_get = wss_dialer_get_tlsconfig, + .o_set = wss_dialer_set_tlsconfig, + .o_chk = wss_check_tlsconfig, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, .o_type = NNI_TYPE_STRING, - .o_set = wss_ep_set_cert_key_file, - .o_chk = ws_ep_chk_string, + .o_set = wss_dialer_set_cert_key_file, + .o_chk = ws_check_string, }, { .o_name = NNG_OPT_TLS_CA_FILE, .o_type = NNI_TYPE_STRING, - .o_set = wss_ep_set_ca_file, - .o_chk = ws_ep_chk_string, + .o_set = wss_dialer_set_ca_file, + .o_chk = ws_check_string, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, .o_type = NNI_TYPE_INT32, - .o_set = wss_ep_set_auth_mode, - .o_chk = wss_ep_chk_auth_mode, + .o_set = wss_dialer_set_auth_mode, + .o_chk = wss_check_auth_mode, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, .o_type = NNI_TYPE_STRING, - .o_set = wss_ep_set_tls_server_name, - .o_chk = ws_ep_chk_string, + .o_set = wss_dialer_set_tls_server_name, + .o_chk = ws_check_string, }, // terminate list { @@ -1040,23 +1188,76 @@ static nni_tran_option wss_ep_options[] = { }, }; -static nni_tran_ep_ops wss_ep_ops = { - .ep_init = ws_ep_init, - .ep_fini = ws_ep_fini, - .ep_connect = ws_ep_connect, - .ep_bind = ws_ep_bind, - .ep_accept = ws_ep_accept, - .ep_close = ws_ep_close, - .ep_options = wss_ep_options, +static nni_tran_option wss_listener_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = ws_listener_get_recvmaxsz, + .o_set = ws_listener_set_recvmaxsz, + .o_chk = ws_check_recvmaxsz, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_type = NNI_TYPE_STRING, + .o_set = ws_listener_set_reshdrs, + .o_chk = ws_check_string, + }, + { + .o_name = NNG_OPT_TLS_CONFIG, + .o_type = NNI_TYPE_POINTER, + .o_get = wss_listener_get_tlsconfig, + .o_set = wss_listener_set_tlsconfig, + .o_chk = wss_check_tlsconfig, + }, + { + .o_name = NNG_OPT_TLS_CERT_KEY_FILE, + .o_type = NNI_TYPE_STRING, + .o_set = wss_listener_set_cert_key_file, + .o_chk = ws_check_string, + }, + { + .o_name = NNG_OPT_TLS_CA_FILE, + .o_type = NNI_TYPE_STRING, + .o_set = wss_listener_set_ca_file, + .o_chk = ws_check_string, + }, + { + .o_name = NNG_OPT_TLS_AUTH_MODE, + .o_type = NNI_TYPE_INT32, + .o_set = wss_listener_set_auth_mode, + .o_chk = wss_check_auth_mode, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static nni_tran_dialer_ops wss_dialer_ops = { + .d_init = ws_dialer_init, + .d_fini = ws_dialer_fini, + .d_connect = ws_dialer_connect, + .d_close = ws_dialer_close, + .d_options = wss_dialer_options, +}; + +static nni_tran_listener_ops wss_listener_ops = { + .l_init = ws_listener_init, + .l_fini = ws_listener_fini, + .l_bind = ws_listener_bind, + .l_accept = ws_listener_accept, + .l_close = ws_listener_close, + .l_options = wss_listener_options, }; static nni_tran wss_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "wss", - .tran_ep = &wss_ep_ops, - .tran_pipe = &ws_pipe_ops, - .tran_init = ws_tran_init, - .tran_fini = ws_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "wss", + .tran_dialer = &wss_dialer_ops, + .tran_listener = &wss_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = ws_tran_init, + .tran_fini = ws_tran_fini, }; int diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index fa8458c1..3535a248 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -2251,6 +2251,18 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (0); } +static int +zt_dialer_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); +} + +static int +zt_listener_init(void **epp, nni_url *url, nni_sock *sock) +{ + return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); +} + static void zt_ep_close(void *arg) { @@ -3022,25 +3034,33 @@ static nni_tran_option zt_ep_options[] = { }, }; -static nni_tran_ep_ops zt_ep_ops = { - .ep_init = zt_ep_init, - .ep_fini = zt_ep_fini, - .ep_connect = zt_ep_connect, - .ep_bind = zt_ep_bind, - .ep_accept = zt_ep_accept, - .ep_close = zt_ep_close, - .ep_options = zt_ep_options, +static nni_tran_dialer_ops zt_dialer_ops = { + .d_init = zt_dialer_init, + .d_fini = zt_ep_fini, + .d_connect = zt_ep_connect, + .d_close = zt_ep_close, + .d_options = zt_ep_options, +}; + +static nni_tran_listener_ops zt_listener_ops = { + .l_init = zt_listener_init, + .l_fini = zt_ep_fini, + .l_bind = zt_ep_bind, + .l_accept = zt_ep_accept, + .l_close = zt_ep_close, + .l_options = zt_ep_options, }; // This is the ZeroTier transport linkage, and should be the // only global symbol in this entire file. static struct nni_tran zt_tran = { - .tran_version = NNI_TRANSPORT_VERSION, - .tran_scheme = "zt", - .tran_ep = &zt_ep_ops, - .tran_pipe = &zt_pipe_ops, - .tran_init = zt_tran_init, - .tran_fini = zt_tran_fini, + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "zt", + .tran_dialer = &zt_dialer_ops, + .tran_listener = &zt_listener_ops, + .tran_pipe = &zt_pipe_ops, + .tran_init = zt_tran_init, + .tran_fini = zt_tran_fini, }; int |
