diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-06-26 17:39:17 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-06-27 17:28:05 -0700 |
| commit | 251553b13e6bc8019914b9edd1292f97e856dd43 (patch) | |
| tree | 9193b8b4d4df86253f0a469cd96d8bb304a64c82 /src/core | |
| parent | 91f9061ad9289afffb0111c03a8390d0f82d7114 (diff) | |
| download | nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.gz nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.bz2 nng-251553b13e6bc8019914b9edd1292f97e856dd43.zip | |
fixes #522 Separate out the endpoint plumbing
This separates the plumbing for endpoints into distinct
dialer and listeners. Some of the transports could benefit
from further separation, but we've done some rather larger
separation e.g. for the websocket transport.
IPC would be a good one to update later, when we start looking
at exposing a more natural underlying API.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 19 | ||||
| -rw-r--r-- | src/core/dialer.c | 512 | ||||
| -rw-r--r-- | src/core/dialer.h | 32 | ||||
| -rw-r--r-- | src/core/endpt.c | 665 | ||||
| -rw-r--r-- | src/core/endpt.h | 45 | ||||
| -rw-r--r-- | src/core/init.c | 6 | ||||
| -rw-r--r-- | src/core/listener.c | 443 | ||||
| -rw-r--r-- | src/core/listener.h | 33 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 57 | ||||
| -rw-r--r-- | src/core/pipe.h | 16 | ||||
| -rw-r--r-- | src/core/socket.c | 139 | ||||
| -rw-r--r-- | src/core/socket.h | 7 | ||||
| -rw-r--r-- | src/core/transport.c | 26 | ||||
| -rw-r--r-- | src/core/transport.h | 119 |
15 files changed, 1292 insertions, 830 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 77078a7a..a0cca368 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -40,14 +40,17 @@ typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; // These are our own names. -typedef struct nni_socket nni_sock; -typedef struct nni_ctx nni_ctx; -typedef struct nni_ep nni_ep; -typedef struct nni_pipe nni_pipe; -typedef struct nni_tran nni_tran; -typedef struct nni_tran_option nni_tran_option; -typedef struct nni_tran_ep_ops nni_tran_ep_ops; -typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; +typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; +typedef struct nni_dialer nni_dialer; +typedef struct nni_listener nni_listener; +typedef struct nni_pipe nni_pipe; + +typedef struct nni_tran nni_tran; +typedef struct nni_tran_option nni_tran_option; +typedef struct nni_tran_dialer_ops nni_tran_dialer_ops; +typedef struct nni_tran_listener_ops nni_tran_listener_ops; +typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; typedef struct nni_proto_option nni_proto_option; typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; diff --git a/src/core/dialer.c b/src/core/dialer.c new file mode 100644 index 00000000..ee0d2916 --- /dev/null +++ b/src/core/dialer.c @@ -0,0 +1,512 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_dialer { + nni_tran_dialer_ops d_ops; // transport ops + nni_tran * d_tran; // transport pointer + void * d_data; // transport private + uint64_t d_id; // endpoint id + nni_list_node d_node; // per socket list + nni_sock * d_sock; + nni_url * d_url; + int d_refcnt; + int d_lastrv; // last result from synchronous + bool d_synch; // synchronous connect in progress? + bool d_started; + bool d_closed; // full shutdown + bool d_closing; // close pending (waiting on refcnt) + nni_mtx d_mtx; + nni_cv d_cv; + nni_list d_pipes; + nni_aio * d_con_aio; + nni_aio * d_tmo_aio; // backoff timer + nni_duration d_maxrtime; // maximum time for reconnect + nni_duration d_currtime; // current time for reconnect + nni_duration d_inirtime; // initial time for reconnect + nni_time d_conntime; // time of last good connect +}; + +// Functionality related to dialers. +static void dialer_connect_start(nni_dialer *); +static void dialer_connect_cb(void *); +static void dialer_timer_cb(void *); + +static nni_idhash *dialers; +static nni_mtx dialers_lk; + +int +nni_dialer_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&dialers)) != 0) { + return (rv); + } + nni_mtx_init(&dialers_lk); + nni_idhash_set_limits( + dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_dialer_sys_fini(void) +{ + nni_mtx_fini(&dialers_lk); + nni_idhash_fini(dialers); + dialers = NULL; +} + +uint32_t +nni_dialer_id(nni_dialer *d) +{ + return ((uint32_t) d->d_id); +} + +static void +dialer_destroy(nni_dialer *d) +{ + if (d == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (d->d_id != 0) { + nni_idhash_remove(dialers, d->d_id); + } + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_sock_remove_dialer(d->d_sock, d); + + nni_aio_fini(d->d_con_aio); + nni_aio_fini(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + if (d->d_data != NULL) { + d->d_ops.d_fini(d->d_data); + } + nni_mtx_unlock(&d->d_mtx); + nni_cv_fini(&d->d_cv); + nni_mtx_fini(&d->d_mtx); + nni_url_free(d->d_url); + NNI_FREE_STRUCT(d); +} + +int +nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_dialer *d; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_dialer == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + d->d_url = url; + d->d_closed = false; + d->d_closing = false; + d->d_started = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + d->d_ops = *tran->tran_dialer; + + NNI_LIST_NODE_INIT(&d->d_node); + + nni_pipe_ep_list_init(&d->d_pipes); + + nni_mtx_init(&d->d_mtx); + nni_cv_init(&d->d_cv, &d->d_mtx); + + if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_sock_add_dialer(s, d)) != 0)) { + dialer_destroy(d); + return (rv); + } + + *dp = d; + return (0); +} + +int +nni_dialer_find(nni_dialer **dp, uint32_t id) +{ + int rv; + nni_dialer *d; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&dialers_lk); + if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) { + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + *dp = d; + } + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +int +nni_dialer_hold(nni_dialer *d) +{ + int rv; + nni_mtx_lock(&dialers_lk); + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + rv = 0; + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +void +nni_dialer_rele(nni_dialer *d) +{ + nni_mtx_lock(&dialers_lk); + d->d_refcnt--; + if (d->d_closing) { + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&dialers_lk); +} + +int +nni_dialer_shutdown(nni_dialer *d) +{ + nni_mtx_lock(&d->d_mtx); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + d->d_closing = true; + nni_mtx_unlock(&d->d_mtx); + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + return (0); +} + +void +nni_dialer_close(nni_dialer *d) +{ + nni_pipe *p; + + nni_mtx_lock(&d->d_mtx); + if (d->d_closed) { + nni_mtx_unlock(&d->d_mtx); + nni_dialer_rele(d); + return; + } + d->d_closed = true; + nni_mtx_unlock(&d->d_mtx); + + nni_dialer_shutdown(d); + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { + nni_cv_wait(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); + + dialer_destroy(d); +} + +// This function starts an exponential backoff timer for reconnecting. +static void +dialer_timer_start(nni_dialer *d) +{ + nni_duration backoff; + + if (d->d_closing) { + return; + } + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +static void +dialer_timer_cb(void *arg) +{ + nni_dialer *d = arg; + nni_aio * aio = d->d_tmo_aio; + + nni_mtx_lock(&d->d_mtx); + if (nni_aio_result(aio) == 0) { + dialer_connect_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_cb(void *arg) +{ + nni_dialer *d = arg; + nni_pipe * p; + nni_aio * aio = d->d_con_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + } + if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&d->d_mtx); + switch (rv) { + case 0: + nni_pipe_set_dialer(p, d); + nni_list_append(&d->d_pipes, p); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + nni_pipe_stop(p); + return; + } + + // Good connect, so reset the backoff timer. + // Note that a host that accepts the connect, but drops + // us immediately, is going to get hit pretty hard + // (depending on the initial backoff) with no + // exponential backoff. This can happen if we wind up + // trying to connect to some port that does not speak + // SP for example. + d->d_currtime = d->d_inirtime; + + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // redial, but only if we are not synchronous + if (!d->d_synch) { + dialer_timer_start(d); + } + break; + } + if (d->d_synch) { + if (rv != 0) { + d->d_started = false; + } + d->d_lastrv = rv; + d->d_synch = false; + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_start(nni_dialer *d) +{ + nni_aio *aio = d->d_con_aio; + + // Call with the Endpoint lock held. + if (d->d_closing) { + return; + } + + d->d_ops.d_connect(d->d_data, aio); +} + +int +nni_dialer_start(nni_dialer *d, int flags) +{ + int rv = 0; + + nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime); + d->d_currtime = d->d_inirtime; + + nni_mtx_lock(&d->d_mtx); + + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + + if (d->d_started) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ESTATE); + } + + if ((flags & NNG_FLAG_NONBLOCK) != 0) { + d->d_started = true; + dialer_connect_start(d); + nni_mtx_unlock(&d->d_mtx); + return (0); + } + + d->d_synch = true; + d->d_started = true; + dialer_connect_start(d); + + while (d->d_synch && !d->d_closing) { + nni_cv_wait(&d->d_cv); + } + rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; + nni_cv_wake(&d->d_cv); + + nni_mtx_unlock(&d->d_mtx); + return (rv); +} + +void +nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) +{ + if (d == NULL) { + return; + } + + // Break up the relationship between the dialer and the pipe. + nni_mtx_lock(&d->d_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&d->d_pipes, p)) { + nni_list_remove(&d->d_pipes, p); + } + // Wake up the close thread if it is waiting. + if (d->d_closed) { + if (nni_list_empty(&d->d_pipes)) { + nni_cv_wake(&d->d_cv); + } + } else { + // If this pipe closed, then lets restart the dial operation. + // Since the remote side seems to have closed, lets start with + // a backoff. This keeps us from pounding the crap out of the + // thing if a remote server accepts but then disconnects + // immediately. + dialer_timer_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +int +nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, + nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&d->d_mtx); + rv = o->o_set(d->d_data, val, sz, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_dialer_getopt( + nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&d->d_mtx); + rv = o->o_get(d->d_data, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); +} + +void +nni_dialer_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_dialer, d_node); +} diff --git a/src/core/dialer.h b/src/core/dialer.h new file mode 100644 index 00000000..56b0fb1b --- /dev/null +++ b/src/core/dialer.h @@ -0,0 +1,32 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_DIALER_H +#define CORE_DIALER_H + +extern int nni_dialer_sys_init(void); +extern void nni_dialer_sys_fini(void); +extern int nni_dialer_find(nni_dialer **, uint32_t); +extern int nni_dialer_hold(nni_dialer *); +extern void nni_dialer_rele(nni_dialer *); +extern uint32_t nni_dialer_id(nni_dialer *); +extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); +extern int nni_dialer_shutdown(nni_dialer *); +extern void nni_dialer_close(nni_dialer *); +extern int nni_dialer_start(nni_dialer *, int); +extern void nni_dialer_list_init(nni_list *); +extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *); + +extern int nni_dialer_setopt( + nni_dialer *, const char *, const void *, size_t, nni_opt_type); +extern int nni_dialer_getopt( + nni_dialer *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_DIALER_H diff --git a/src/core/endpt.c b/src/core/endpt.c deleted file mode 100644 index 8e678fb0..00000000 --- a/src/core/endpt.c +++ /dev/null @@ -1,665 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -struct nni_ep { - nni_tran_ep_ops ep_ops; // transport ops - nni_tran * ep_tran; // transport pointer - void * ep_data; // transport private - uint64_t ep_id; // endpoint id - nni_list_node ep_node; // per socket list - nni_sock * ep_sock; - nni_url * ep_url; - int ep_mode; - int ep_refcnt; - bool ep_started; - bool ep_closed; // full shutdown - bool ep_closing; // close pending (waiting on refcnt) - bool ep_tmo_run; - nni_mtx ep_mtx; - nni_cv ep_cv; - nni_list ep_pipes; - nni_aio * ep_acc_aio; - nni_aio * ep_con_aio; - nni_aio * ep_con_syn; // used for sync connect - nni_aio * ep_tmo_aio; // backoff timer - nni_duration ep_maxrtime; // maximum time for reconnect - nni_duration ep_currtime; // current time for reconnect - nni_duration ep_inirtime; // initial time for reconnect - nni_time ep_conntime; // time of last good connect -}; - -// Functionality related to end points. - -static void nni_ep_acc_start(nni_ep *); -static void nni_ep_acc_cb(void *); -static void nni_ep_con_start(nni_ep *); -static void nni_ep_con_cb(void *); -static void nni_ep_tmo_start(nni_ep *); -static void nni_ep_tmo_cb(void *); - -static nni_idhash *nni_eps; -static nni_mtx nni_ep_lk; - -int -nni_ep_sys_init(void) -{ - int rv; - - if ((rv = nni_idhash_init(&nni_eps)) != 0) { - return (rv); - } - nni_mtx_init(&nni_ep_lk); - nni_idhash_set_limits( - nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - - return (0); -} - -void -nni_ep_sys_fini(void) -{ - nni_mtx_fini(&nni_ep_lk); - nni_idhash_fini(nni_eps); - nni_eps = NULL; -} - -uint32_t -nni_ep_id(nni_ep *ep) -{ - return ((uint32_t) ep->ep_id); -} - -static void -nni_ep_destroy(nni_ep *ep) -{ - if (ep == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (ep->ep_id != 0) { - nni_idhash_remove(nni_eps, ep->ep_id); - } - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_sock_ep_remove(ep->ep_sock, ep); - - nni_aio_fini(ep->ep_acc_aio); - nni_aio_fini(ep->ep_con_aio); - nni_aio_fini(ep->ep_con_syn); - nni_aio_fini(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_data != NULL) { - ep->ep_ops.ep_fini(ep->ep_data); - } - nni_mtx_unlock(&ep->ep_mtx); - nni_cv_fini(&ep->ep_cv); - nni_mtx_fini(&ep->ep_mtx); - nni_url_free(ep->ep_url); - NNI_FREE_STRUCT(ep); -} - -static int -nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode) -{ - nni_tran *tran; - nni_ep * ep; - int rv; - nni_url * url; - - if ((rv = nni_url_parse(&url, urlstr)) != 0) { - return (rv); - } - if ((tran = nni_tran_find(url)) == NULL) { - nni_url_free(url); - return (NNG_ENOTSUP); - } - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - nni_url_free(url); - return (NNG_ENOMEM); - } - ep->ep_url = url; - ep->ep_closed = false; - ep->ep_closing = false; - ep->ep_started = false; - ep->ep_data = NULL; - ep->ep_refcnt = 1; - ep->ep_sock = s; - ep->ep_tran = tran; - ep->ep_mode = mode; - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - ep->ep_ops = *tran->tran_ep; - - NNI_LIST_NODE_INIT(&ep->ep_node); - - nni_pipe_ep_list_init(&ep->ep_pipes); - - nni_mtx_init(&ep->ep_mtx); - nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - - if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) || - ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || - ((rv = nni_sock_ep_add(s, ep)) != 0)) { - nni_ep_destroy(ep); - return (rv); - } - - *epp = ep; - return (0); -} - -int -nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL)); -} - -int -nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN)); -} - -int -nni_ep_find(nni_ep **epp, uint32_t id) -{ - int rv; - nni_ep *ep; - - if ((rv = nni_init()) != 0) { - return (rv); - } - - nni_mtx_lock(&nni_ep_lk); - if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) { - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - *epp = ep; - } - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -int -nni_ep_hold(nni_ep *ep) -{ - int rv; - nni_mtx_lock(&nni_ep_lk); - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - rv = 0; - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -void -nni_ep_rele(nni_ep *ep) -{ - nni_mtx_lock(&nni_ep_lk); - ep->ep_refcnt--; - if (ep->ep_closing) { - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&nni_ep_lk); -} - -int -nni_ep_shutdown(nni_ep *ep) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - ep->ep_closing = true; - nni_mtx_unlock(&ep->ep_mtx); - - // Abort any remaining in-flight operations. - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - // Stop the underlying transport. - ep->ep_ops.ep_close(ep->ep_data); - - return (0); -} - -void -nni_ep_close(nni_ep *ep) -{ - nni_pipe *p; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); - return; - } - ep->ep_closed = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_shutdown(ep); - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) { - nni_cv_wait(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_destroy(ep); -} - -static void -nni_ep_tmo_cancel(nni_aio *aio, int rv) -{ - nni_ep *ep = nni_aio_get_prov_data(aio); - // The only way this ever gets "finished", is via cancellation. - if (ep != NULL) { - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_tmo_run) { - nni_aio_finish_error(aio, rv); - } - ep->ep_tmo_run = false; - nni_mtx_unlock(&ep->ep_mtx); - } -} - -static void -nni_ep_tmo_start(nni_ep *ep) -{ - nni_duration backoff; - int rv; - - if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { - return; - } - backoff = ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - - nni_aio_set_timeout( - ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); - - if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) != - 0) { - nni_aio_finish_error(ep->ep_tmo_aio, rv); - } - - ep->ep_tmo_run = true; -} - -static void -nni_ep_tmo_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_tmo_aio; - - nni_mtx_lock(&ep->ep_mtx); - if (nni_aio_result(aio) == NNG_ETIMEDOUT) { - if (ep->ep_mode == NNI_EP_MODE_DIAL) { - nni_ep_con_start(ep); - } else { - nni_ep_acc_start(ep); - } - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_con_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - ep->ep_currtime = ep->ep_inirtime; - - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. - break; - default: - // Other errors involve the use of the backoff timer. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_con_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - - ep->ep_ops.ep_connect(ep->ep_data, aio); -} - -int -nni_ep_dial(nni_ep *ep, int flags) -{ - int rv = 0; - nni_aio *aio; - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - - if (ep->ep_mode != NNI_EP_MODE_DIAL) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - if ((flags & NNG_FLAG_NONBLOCK) != 0) { - ep->ep_started = true; - nni_ep_con_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - return (0); - } - - // Synchronous mode: so we have to wait for it to complete. - aio = ep->ep_con_syn; - ep->ep_ops.ep_connect(ep->ep_data, aio); - ep->ep_started = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_aio_wait(aio); - - // As we're synchronous, we also have to handle the completion. - if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { - nni_mtx_lock(&ep->ep_mtx); - ep->ep_started = false; - nni_mtx_unlock(&ep->ep_mtx); - } - return (rv); -} - -static void -nni_ep_acc_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_acc_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL); - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - nni_ep_acc_start(ep); - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled or closed, no further action. - break; - case NNG_ECONNABORTED: - case NNG_ECONNRESET: - // These are remote conditions, no cool down. - nni_ep_acc_start(ep); - break; - default: - // We don't really know why we failed, but we backoff - // here. This is because errors here are probably due - // to system failures (resource exhaustion) and we hope - // by not thrashing we give the system a chance to - // recover. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_acc_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_acc_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - ep->ep_ops.ep_accept(ep->ep_data, aio); -} - -int -nni_ep_listen(nni_ep *ep, int flags) -{ - int rv = 0; - NNI_ARG_UNUSED(flags); - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - rv = ep->ep_ops.ep_bind(ep->ep_data); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - ep->ep_started = true; - nni_ep_acc_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - - return (0); -} - -int -nni_ep_pipe_add(nni_ep *ep, nni_pipe *p) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - nni_list_append(&ep->ep_pipes, p); - nni_mtx_unlock(&ep->ep_mtx); - return (0); -} - -void -nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) -{ - // Break up the relationship between the EP and the pipe. - nni_mtx_lock(&ep->ep_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&ep->ep_pipes, pipe)) { - nni_list_remove(&ep->ep_pipes, pipe); - } - // Wake up the close thread if it is waiting. - if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) { - nni_cv_wake(&ep->ep_cv); - } - - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) { - nni_ep_tmo_start(ep); - } - nni_mtx_unlock(&ep->ep_mtx); -} - -int -nni_ep_setopt( - nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t) -{ - nni_tran_option *o; - - if (strcmp(name, NNG_OPT_URL) == 0) { - return (NNG_EREADONLY); - } - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_set == NULL) { - return (NNG_EREADONLY); - } - - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_set(ep->ep_data, val, sz, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - return (NNG_ENOTSUP); -} - -int -nni_ep_mode(nni_ep *ep) -{ - return (ep->ep_mode); -} - -int -nni_ep_getopt( - nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t) -{ - nni_tran_option *o; - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_get == NULL) { - return (NNG_EWRITEONLY); - } - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_get(ep->ep_data, valp, szp, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - // We provide a fallback on the URL, but let the implementation - // override. This allows the URL to be created with wildcards, - // that are resolved later. - if (strcmp(name, NNG_OPT_URL) == 0) { - return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t)); - } - - return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t)); -} - -void -nni_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_ep, ep_node); -} - -nni_tran * -nni_ep_tran(nni_ep *ep) -{ - return (ep->ep_tran); -} - -nni_sock * -nni_ep_sock(nni_ep *ep) -{ - return (ep->ep_sock); -} diff --git a/src/core/endpt.h b/src/core/endpt.h deleted file mode 100644 index bf251d41..00000000 --- a/src/core/endpt.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_ENDPT_H -#define CORE_ENDPT_H - -extern int nni_ep_sys_init(void); -extern void nni_ep_sys_fini(void); -extern nni_tran *nni_ep_tran(nni_ep *); -extern nni_sock *nni_ep_sock(nni_ep *); -extern int nni_ep_find(nni_ep **, uint32_t); -extern int nni_ep_hold(nni_ep *); -extern void nni_ep_rele(nni_ep *); -extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); -extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); -extern int nni_ep_shutdown(nni_ep *); -extern void nni_ep_close(nni_ep *); -extern int nni_ep_dial(nni_ep *, int); -extern int nni_ep_listen(nni_ep *, int); -extern void nni_ep_list_init(nni_list *); -extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); -extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); -extern int nni_ep_mode(nni_ep *); - -extern int nni_ep_setopt( - nni_ep *, const char *, const void *, size_t, nni_opt_type); -extern int nni_ep_getopt( - nni_ep *, const char *, void *, size_t *, nni_opt_type); - -// Endpoint modes. Currently used by transports. Remove this when we make -// transport dialers and listeners explicit. -enum nni_ep_mode { - NNI_EP_MODE_DIAL = 1, - NNI_EP_MODE_LISTEN = 2, -}; - -#endif // CORE_ENDPT_H diff --git a/src/core/init.c b/src/core/init.c index c1b7bbac..30a7a547 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -33,7 +33,8 @@ nni_init_helper(void) ((rv = nni_aio_sys_init()) != 0) || ((rv = nni_random_sys_init()) != 0) || ((rv = nni_sock_sys_init()) != 0) || - ((rv = nni_ep_sys_init()) != 0) || + ((rv = nni_listener_sys_init()) != 0) || + ((rv = nni_dialer_sys_init()) != 0) || ((rv = nni_pipe_sys_init()) != 0) || ((rv = nni_proto_sys_init()) != 0) || ((rv = nni_tran_sys_init()) != 0)) { @@ -71,7 +72,8 @@ nni_fini(void) nni_tran_sys_fini(); nni_proto_sys_fini(); nni_pipe_sys_fini(); - nni_ep_sys_fini(); + nni_dialer_sys_fini(); + nni_listener_sys_fini(); nni_sock_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_random_sys_fini(); diff --git a/src/core/listener.c b/src/core/listener.c new file mode 100644 index 00000000..6d580cd8 --- /dev/null +++ b/src/core/listener.c @@ -0,0 +1,443 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_listener { + nni_tran_listener_ops l_ops; // transport ops + nni_tran * l_tran; // transport pointer + void * l_data; // transport private + uint64_t l_id; // endpoint id + nni_list_node l_node; // per socket list + nni_sock * l_sock; + nni_url * l_url; + int l_refcnt; + bool l_started; + bool l_closed; // full shutdown + bool l_closing; // close pending (waiting on refcnt) + nni_mtx l_mtx; + nni_cv l_cv; + nni_list l_pipes; + nni_aio * l_acc_aio; + nni_aio * l_tmo_aio; +}; + +// Functionality related to listeners. + +static void listener_accept_start(nni_listener *); +static void listener_accept_cb(void *); +static void listener_timer_cb(void *); + +static nni_idhash *listeners; +static nni_mtx listeners_lk; + +int +nni_listener_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&listeners)) != 0) { + return (rv); + } + nni_mtx_init(&listeners_lk); + nni_idhash_set_limits( + listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_listener_sys_fini(void) +{ + nni_mtx_fini(&listeners_lk); + nni_idhash_fini(listeners); + listeners = NULL; +} + +uint32_t +nni_listener_id(nni_listener *l) +{ + return ((uint32_t) l->l_id); +} + +static void +listener_destroy(nni_listener *l) +{ + if (l == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (l->l_id != 0) { + nni_idhash_remove(listeners, l->l_id); + } + + nni_aio_stop(l->l_acc_aio); + + nni_sock_remove_listener(l->l_sock, l); + + nni_aio_fini(l->l_acc_aio); + + nni_mtx_lock(&l->l_mtx); + if (l->l_data != NULL) { + l->l_ops.l_fini(l->l_data); + } + nni_mtx_unlock(&l->l_mtx); + nni_cv_fini(&l->l_cv); + nni_mtx_fini(&l->l_mtx); + nni_url_free(l->l_url); + NNI_FREE_STRUCT(l); +} + +int +nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_listener *l; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_listener == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + l->l_url = url; + l->l_closed = false; + l->l_closing = false; + l->l_started = false; + l->l_data = NULL; + l->l_refcnt = 1; + l->l_sock = s; + l->l_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + l->l_ops = *tran->tran_listener; + + NNI_LIST_NODE_INIT(&l->l_node); + + nni_pipe_ep_list_init(&l->l_pipes); + + nni_mtx_init(&l->l_mtx); + nni_cv_init(&l->l_cv, &l->l_mtx); + + if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) || + ((rv = nni_sock_add_listener(s, l)) != 0)) { + listener_destroy(l); + return (rv); + } + + *lp = l; + return (0); +} + +int +nni_listener_find(nni_listener **lp, uint32_t id) +{ + int rv; + nni_listener *l; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&listeners_lk); + if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) { + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + *lp = l; + } + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +int +nni_listener_hold(nni_listener *l) +{ + int rv; + nni_mtx_lock(&listeners_lk); + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + rv = 0; + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +void +nni_listener_rele(nni_listener *l) +{ + nni_mtx_lock(&listeners_lk); + l->l_refcnt--; + if (l->l_closing) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&listeners_lk); +} + +int +nni_listener_shutdown(nni_listener *l) +{ + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + l->l_closing = true; + nni_mtx_unlock(&l->l_mtx); + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + return (0); +} + +void +nni_listener_close(nni_listener *l) +{ + nni_pipe *p; + + nni_mtx_lock(&l->l_mtx); + if (l->l_closed) { + nni_mtx_unlock(&l->l_mtx); + nni_listener_rele(l); + return; + } + l->l_closed = true; + nni_mtx_unlock(&l->l_mtx); + + nni_listener_shutdown(l); + + nni_aio_stop(l->l_acc_aio); + nni_aio_stop(l->l_tmo_aio); + + nni_mtx_lock(&l->l_mtx); + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) { + nni_cv_wait(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); + + listener_destroy(l); +} + +static void +listener_timer_cb(void *arg) +{ + nni_listener *l = arg; + nni_aio * aio = l->l_tmo_aio; + + nni_mtx_lock(&l->l_mtx); + if (nni_aio_result(aio) == 0) { + listener_accept_start(l); + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_cb(void *arg) +{ + nni_listener *l = arg; + nni_pipe * p; + nni_aio * aio = l->l_acc_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); + } + + if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&l->l_mtx); + switch (rv) { + case 0: + nni_pipe_set_listener(p, l); + nni_list_append(&l->l_pipes, p); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + nni_pipe_stop(p); + return; + } + listener_accept_start(l); + break; + case NNG_ECONNABORTED: // remote condition, no cooldown + case NNG_ECONNRESET: // remote condition, no cooldown + listener_accept_start(l); + break; + case NNG_ECLOSED: // no further action + case NNG_ECANCELED: // no further action + break; + default: + // We don't really know why we failed, but we backoff + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. 100 msec is enough to cool down. + nni_sleep_aio(100, l->l_tmo_aio); + break; + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_start(nni_listener *l) +{ + nni_aio *aio = l->l_acc_aio; + + // Call with the listener lock held. + if (l->l_closing) { + return; + } + l->l_ops.l_accept(l->l_data, aio); +} + +int +nni_listener_start(nni_listener *l, int flags) +{ + int rv = 0; + NNI_ARG_UNUSED(flags); + + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + if (l->l_started) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ESTATE); + } + + if ((rv = l->l_ops.l_bind(l->l_data)) != 0) { + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + l->l_started = true; + listener_accept_start(l); + nni_mtx_unlock(&l->l_mtx); + + return (0); +} + +void +nni_listener_remove_pipe(nni_listener *l, nni_pipe *p) +{ + if (l == NULL) { + return; + } + // Break up relationship between listener and pipe. + nni_mtx_lock(&l->l_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&l->l_pipes, p)) { + nni_list_remove(&l->l_pipes, p); + } + // Wake up the closer if it is waiting. + if (l->l_closed && nni_list_empty(&l->l_pipes)) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); +} + +int +nni_listener_setopt(nni_listener *l, const char *name, const void *val, + size_t sz, nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&l->l_mtx); + rv = o->o_set(l->l_data, val, sz, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_listener_getopt( + nni_listener *l, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&l->l_mtx); + rv = o->o_get(l->l_data, valp, szp, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(l->l_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); +} + +void +nni_listener_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_listener, l_node); +} diff --git a/src/core/listener.h b/src/core/listener.h new file mode 100644 index 00000000..41b1a678 --- /dev/null +++ b/src/core/listener.h @@ -0,0 +1,33 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_LISTENER_H +#define CORE_LISTENER_H + +extern int nni_listener_sys_init(void); +extern void nni_listener_sys_fini(void); +extern int nni_listener_find(nni_listener **, uint32_t); +extern int nni_listener_hold(nni_listener *); +extern void nni_listener_rele(nni_listener *); +extern uint32_t nni_listener_id(nni_listener *); +extern int nni_listener_create(nni_listener **, nni_sock *, const char *); +extern int nni_listener_shutdown(nni_listener *); +extern void nni_listener_close(nni_listener *); +extern int nni_listener_start(nni_listener *, int); +extern void nni_listener_list_init(nni_list *); +extern int nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *); + +extern int nni_listener_setopt( + nni_listener *, const char *, const void *, size_t, nni_opt_type); +extern int nni_listener_getopt( + nni_listener *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_LISTENER_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index fdb2ce94..9af12720 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -52,7 +52,8 @@ // These have to come after the others - particularly transport.h -#include "core/endpt.h" +#include "core/dialer.h" +#include "core/listener.h" #include "core/pipe.h" #include "core/socket.h" diff --git a/src/core/pipe.c b/src/core/pipe.c index 93fbae99..a42cdeff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -26,7 +26,8 @@ struct nni_pipe { nni_list_node p_sock_node; nni_list_node p_ep_node; nni_sock * p_sock; - nni_ep * p_ep; + nni_listener * p_listener; + nni_dialer * p_dialer; bool p_closed; bool p_stop; bool p_cbs; @@ -98,7 +99,7 @@ nni_pipe_sys_fini(void) } } -static void +void nni_pipe_destroy(nni_pipe *p) { bool cbs; @@ -126,9 +127,8 @@ nni_pipe_destroy(nni_pipe *p) // We have exclusive access at this point, so we can check if // we are still on any lists. - if (nni_list_node_active(&p->p_ep_node)) { - nni_ep_pipe_remove(p->p_ep, p); - } + nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL + nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); @@ -303,12 +303,10 @@ nni_pipe_start_cb(void *arg) } int -nni_pipe_create(nni_ep *ep, void *tdata) +nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; - nni_tran * tran = nni_ep_tran(ep); - nni_sock * sock = nni_ep_sock(ep); void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); @@ -324,7 +322,6 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_ops = *pops; p->p_proto_data = NULL; - p->p_ep = ep; p->p_sock = sock; p->p_closed = false; p->p_stop = false; @@ -348,16 +345,27 @@ nni_pipe_create(nni_ep *ep, void *tdata) } if ((rv != 0) || - ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0) || - ((rv = nni_sock_pipe_add(sock, p)) != 0)) { + ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_destroy(p); return (rv); } + *pp = p; return (0); } +void +nni_pipe_set_listener(nni_pipe *p, nni_listener *l) +{ + p->p_listener = l; +} + +void +nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) +{ + p->p_dialer = d; +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) @@ -371,7 +379,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } // Maybe the endpoint knows? - return (nni_ep_getopt(p->p_ep, name, val, szp, t)); + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); + } + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); + } + return (NNG_ENOTSUP); } void @@ -409,15 +423,20 @@ nni_pipe_sock_id(nni_pipe *p) } uint32_t -nni_pipe_ep_id(nni_pipe *p) +nni_pipe_listener_id(nni_pipe *p) { - return (nni_ep_id(p->p_ep)); + if (p->p_listener != NULL) { + return (nni_listener_id(p->p_listener)); + } + return (0); } - -int -nni_pipe_ep_mode(nni_pipe *p) +uint32_t +nni_pipe_dialer_id(nni_pipe *p) { - return (nni_ep_mode(p->p_ep)); + if (p->p_dialer != NULL) { + return (nni_dialer_id(p->p_dialer)); + } + return (0); } static void diff --git a/src/core/pipe.h b/src/core/pipe.h index bd66d4ed..5c505514 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,6 +29,10 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); +// nni_pipe_destroy destroys a pipe -- there must not be any other +// references to it; this is used only during creation failures. +extern void nni_pipe_destroy(nni_pipe *); + // nni_pipe_close closes the underlying transport for the pipe. Further // operations against will return NNG_ECLOSED. extern void nni_pipe_close(nni_pipe *); @@ -48,7 +52,9 @@ extern void nni_pipe_stop(nni_pipe *); // endpoint, grabbing each of those locks. The function takes ownership of // the transport specific pipe (3rd argument), regardless of whether it // succeeds or not. The endpoint should be held when calling this. -extern int nni_pipe_create(nni_ep *, void *); +extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *); +extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *); +extern void nni_pipe_set_listener(nni_pipe *, nni_listener *); // nni_pipe_start is called by the socket to begin any startup activities // on the pipe before making it ready for use by protocols. For example, @@ -82,11 +88,11 @@ extern int nni_pipe_find(nni_pipe **, uint32_t); // nni_pipe_sock_id returns the socket id for the pipe (used by public API). extern uint32_t nni_pipe_sock_id(nni_pipe *); -// nni_pipe_ep_id returns the endpoint id for the pipe. -extern uint32_t nni_pipe_ep_id(nni_pipe *); +// nni_pipe_listener_id returns the listener id for the pipe (or 0 if none). +extern uint32_t nni_pipe_listener_id(nni_pipe *); -// nni_pipe_ep_mode returns the endpoint mode for the pipe. -extern int nni_pipe_ep_mode(nni_pipe *); +// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). +extern uint32_t nni_pipe_dialer_id(nni_pipe *); // nni_pipe_closed returns true if nni_pipe_close was called. // (This is used by the socket to determine if user closed the pipe diff --git a/src/core/socket.c b/src/core/socket.c index 4cf624e2..894e4fee 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,9 +82,10 @@ struct nni_socket { nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - nni_list s_ctxs; // active contexts (protected by global sock_lk) + nni_list s_listeners; // active listeners + nni_list s_dialers; // active dialers + nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global sock_lk) bool s_closing; // Socket is closing bool s_closed; // Socket closed, protected by global lock @@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); nni_pipe_sock_list_init(&s->s_pipes); - nni_ep_list_init(&s->s_eps); + nni_listener_list_init(&s->s_listeners); + nni_dialer_list_init(&s->s_dialers); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) int nni_sock_shutdown(nni_sock *sock) { - nni_pipe *pipe; - nni_ep * ep; - nni_ep * nep; - nni_ctx * ctx; - nni_ctx * nctx; + nni_pipe * pipe; + nni_dialer * d; + nni_dialer * nd; + nni_listener *l; + nni_listener *nl; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_shutdown(ep); + NNI_LIST_FOREACH (&sock->s_listeners, l) { + nni_listener_shutdown(l); } + NNI_LIST_FOREACH (&sock->s_dialers, d) { + nni_dialer_shutdown(d); + } + nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the endpoint list, attempting to close them. + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nep = nni_list_first(&sock->s_eps); - while ((ep = nep) != NULL) { - nep = nni_list_next(&sock->s_eps, nep); + nl = nni_list_first(&sock->s_listeners); + while ((l = nl) != NULL) { + nl = nni_list_next(&sock->s_listeners, nl); + + if (nni_listener_hold(l) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); + } + } + nd = nni_list_first(&sock->s_dialers); + while ((d = nd) != NULL) { + nd = nni_list_next(&sock->s_dialers, nd); - if (nni_ep_hold(ep) == 0) { + if (nni_dialer_hold(d) == 0) { nni_mtx_unlock(&sock->s_mx); - nni_ep_close(ep); + nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } } @@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock) // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_eps))) { + (!nni_list_empty(&sock->s_listeners)) || + (!nni_list_empty(&sock->s_dialers))) { nni_cv_wait(&sock->s_cv); } @@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s) // Wait for pipes, eps, and contexts to finish closing. nni_mtx_lock(&s->s_mx); - while ( - (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { + while ((!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_dialers)) || + (!nni_list_empty(&s->s_listeners))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *s, nni_ep *ep) +nni_sock_add_listener(nni_sock *s, nni_listener *l) +{ + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_listener_setopt( + l, sopt->name, sopt->data, sopt->sz, sopt->typ); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + + nni_list_append(&s->s_listeners, l); + nni_mtx_unlock(&s->s_mx); + return (0); +} + +int +nni_sock_add_dialer(nni_sock *s, nni_dialer *d) { nni_sockopt *sopt; @@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep) NNI_LIST_FOREACH (&s->s_options, sopt) { int rv; - rv = nni_ep_setopt( - ep, sopt->name, sopt->data, sopt->sz, sopt->typ); + rv = nni_dialer_setopt( + d, sopt->name, sopt->data, sopt->sz, sopt->typ); if ((rv != 0) && (rv != NNG_ENOTSUP)) { nni_mtx_unlock(&s->s_mx); return (rv); } } - nni_list_append(&s->s_eps, ep); + nni_list_append(&s->s_dialers, d); nni_mtx_unlock(&s->s_mx); return (0); } void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) +nni_sock_remove_listener(nni_sock *s, nni_listener *l) { - nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_eps, ep)) { - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_listeners, l)) { + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); } } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) +{ + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_dialers, d)) { + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + } + nni_mtx_unlock(&s->s_mx); } int @@ -948,7 +1007,8 @@ nni_sock_setopt( nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t) { int rv = NNG_ENOTSUP; - nni_ep * ep; + nni_dialer * d; + nni_listener * l; nni_sockopt * optv; nni_sockopt * oldv = NULL; const sock_option * sso; @@ -1042,9 +1102,20 @@ nni_sock_setopt( // transport (other than ENOTSUP) stops the operation // altogether. Its important that transport wide checks // properly pre-validate. - NNI_LIST_FOREACH (&s->s_eps, ep) { + NNI_LIST_FOREACH (&s->s_listeners, l) { + int x; + x = nni_listener_setopt(l, optv->name, optv->data, sz, t); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + NNI_LIST_FOREACH (&s->s_dialers, d) { int x; - x = nni_ep_setopt(ep, optv->name, optv->data, sz, t); + x = nni_dialer_setopt(d, optv->name, optv->data, sz, t); if (x != NNG_ENOTSUP) { if ((rv = x) != 0) { nni_mtx_unlock(&s->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 7c10b195..184cfb64 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,8 +46,11 @@ extern uint32_t nni_sock_id(nni_sock *); extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_ep_add(nni_sock *, nni_ep *); -extern void nni_sock_ep_remove(nni_sock *, nni_ep *); +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); + +extern int nni_sock_add_listener(nni_sock *, nni_listener *); +extern void nni_sock_remove_listener(nni_sock *, nni_listener *); // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since diff --git a/src/core/transport.c b/src/core/transport.c index 4733b6bd..8485d048 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -118,12 +118,28 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ) nni_mtx_lock(&nni_tran_lk); NNI_LIST_FOREACH (&nni_tran_list, t) { - const nni_tran_ep_ops *ep; - const nni_tran_option *o; + const nni_tran_dialer_ops * dops; + const nni_tran_listener_ops *lops; + const nni_tran_option * o; + + // Generally we look for endpoint options. We check both + // dialers and listeners. + dops = t->t_tran.tran_dialer; + for (o = dops->d_options; o && o->o_name != NULL; o++) { + if (strcmp(name, o->o_name) != 0) { + continue; + } + if (o->o_set == NULL) { + nni_mtx_unlock(&nni_tran_lk); + return (NNG_EREADONLY); + } - // Generally we look for endpoint options. - ep = t->t_tran.tran_ep; - for (o = ep->ep_options; o && o->o_name != NULL; o++) { + rv = (o->o_chk != NULL) ? o->o_chk(v, sz, typ) : 0; + nni_mtx_unlock(&nni_tran_lk); + return (rv); + } + lops = t->t_tran.tran_listener; + for (o = lops->l_options; o && o->o_name != NULL; o++) { if (strcmp(name, o->o_name) != 0) { continue; } diff --git a/src/core/transport.h b/src/core/transport.h index e45aa7ec..257d232d 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -11,30 +11,11 @@ #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -// Transport implementation details. Transports must implement the -// interfaces in this file. -struct nni_tran { - // tran_version is the version of the transport ops that this - // transport implements. We only bother to version the main - // ops vector. - uint32_t tran_version; - - // tran_scheme is the transport scheme, such as "tcp" or "inproc". - const char *tran_scheme; - - // tran_ep links our endpoint-specific operations. - const nni_tran_ep_ops *tran_ep; - - // tran_pipe links our pipe-specific operations. - const nni_tran_pipe_ops *tran_pipe; - - // tran_init, if not NULL, is called once during library - // initialization. - int (*tran_init)(void); - - // tran_fini, if not NULL, is called during library deinitialization. - // It should release any global resources, close any open files, etc. - void (*tran_fini)(void); +// Endpoint modes. Currently used by transports. Remove this when we make +// transport dialers and listeners explicit. +enum nni_ep_mode { + NNI_EP_MODE_DIAL = 1, + NNI_EP_MODE_LISTEN = 2, }; // We quite intentionally use a signature where the upper word is nonzero, @@ -48,7 +29,8 @@ struct nni_tran { #define NNI_TRANSPORT_V0 0x54520000 #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V2 +#define NNI_TRANSPORT_V3 0x54520003 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 // Option handlers. struct nni_tran_option { @@ -81,40 +63,60 @@ struct nni_tran_option { // For a given endpoint, the framework holds a lock so that each entry // point is run exclusively of the others. (Transports must still guard // against any asynchronous operations they manage themselves, though.) -struct nni_tran_ep_ops { - // ep_init creates a vanilla endpoint. The value created is - // used for the first argument for all other endpoint - // functions. - int (*ep_init)(void **, nni_url *, nni_sock *, int); - // ep_fini frees the resources associated with the endpoint. - // The endpoint will already have been closed. - void (*ep_fini)(void *); +struct nni_tran_dialer_ops { + // d_init creates a vanilla dialer. The value created is + // used for the first argument for all other dialer functions. + int (*d_init)(void **, nni_url *, nni_sock *); + + // d_fini frees the resources associated with the dialer. + // The dialer will already have been closed. + void (*d_fini)(void *); - // ep_connect establishes a connection. It can return errors + // d_connect establishes a connection. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, // NNG_ECONNFAILED, NNG_ETIMEDOUT, and NNG_EPROTO. - void (*ep_connect)(void *, nni_aio *); + void (*d_connect)(void *, nni_aio *); + + // d_close stops the dialer from operating altogether. It + // does not affect pipes that have already been created. It is + // nonblocking. + void (*d_close)(void *); + + // d_options is an array of dialer options. The final + // element must have a NULL name. If this member is NULL, then + // no dialer specific options are available. + nni_tran_option *d_options; +}; + +struct nni_tran_listener_ops { + // l_init creates a vanilla listener. The value created is + // used for the first argument for all other listener functions. + int (*l_init)(void **, nni_url *, nni_sock *); - // ep_bind just does the bind() and listen() work, + // l_fini frees the resources associated with the listener. + // The listener will already have been closed. + void (*l_fini)(void *); + + // l_bind just does the bind() and listen() work, // reserving the address but not creating any connections. // It should return NNG_EADDRINUSE if the address is already // taken. It can also return NNG_EBADADDR for an unsuitable // address, or NNG_EACCESS for permission problems. - int (*ep_bind)(void *); + int (*l_bind)(void *); - // ep_accept accepts an inbound connection. - void (*ep_accept)(void *, nni_aio *); + // l_accept accepts an inbound connection. + void (*l_accept)(void *, nni_aio *); - // ep_close stops the endpoint from operating altogether. It + // l_close stops the listener from operating altogether. It // does not affect pipes that have already been created. It is // nonblocking. - void (*ep_close)(void *); + void (*l_close)(void *); - // ep_options is an array of endpoint options. The final + // l_options is an array of listener options. The final // element must have a NULL name. If this member is NULL, then - // no transport specific options are available. - nni_tran_option *ep_options; + // no dialer specific options are available. + nni_tran_option *l_options; }; // Pipe operations are entry points called by the socket. These may be @@ -168,6 +170,35 @@ struct nni_tran_pipe_ops { nni_tran_option *p_options; }; +// Transport implementation details. Transports must implement the +// interfaces in this file. +struct nni_tran { + // tran_version is the version of the transport ops that this + // transport implements. We only bother to version the main + // ops vector. + uint32_t tran_version; + + // tran_scheme is the transport scheme, such as "tcp" or "inproc". + const char *tran_scheme; + + // tran_dialer links our dialer-specific operations. + const nni_tran_dialer_ops *tran_dialer; + + // tran_listener links our listener-specific operations. + const nni_tran_listener_ops *tran_listener; + + // tran_pipe links our pipe-specific operations. + const nni_tran_pipe_ops *tran_pipe; + + // tran_init, if not NULL, is called once during library + // initialization. + int (*tran_init)(void); + + // tran_fini, if not NULL, is called during library deinitialization. + // It should release any global resources, close any open files, etc. + void (*tran_fini)(void); +}; + // These APIs are used by the framework internally, and not for use by // transport implementations. extern nni_tran *nni_tran_find(nni_url *); |
