diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 19 | ||||
| -rw-r--r-- | src/core/dialer.c | 512 | ||||
| -rw-r--r-- | src/core/dialer.h | 32 | ||||
| -rw-r--r-- | src/core/endpt.c | 665 | ||||
| -rw-r--r-- | src/core/endpt.h | 45 | ||||
| -rw-r--r-- | src/core/init.c | 6 | ||||
| -rw-r--r-- | src/core/listener.c | 443 | ||||
| -rw-r--r-- | src/core/listener.h | 33 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 57 | ||||
| -rw-r--r-- | src/core/pipe.h | 16 | ||||
| -rw-r--r-- | src/core/socket.c | 139 | ||||
| -rw-r--r-- | src/core/socket.h | 7 | ||||
| -rw-r--r-- | src/core/transport.c | 26 | ||||
| -rw-r--r-- | src/core/transport.h | 119 |
15 files changed, 1292 insertions, 830 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 77078a7a..a0cca368 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -40,14 +40,17 @@ typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; // These are our own names. -typedef struct nni_socket nni_sock; -typedef struct nni_ctx nni_ctx; -typedef struct nni_ep nni_ep; -typedef struct nni_pipe nni_pipe; -typedef struct nni_tran nni_tran; -typedef struct nni_tran_option nni_tran_option; -typedef struct nni_tran_ep_ops nni_tran_ep_ops; -typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; +typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; +typedef struct nni_dialer nni_dialer; +typedef struct nni_listener nni_listener; +typedef struct nni_pipe nni_pipe; + +typedef struct nni_tran nni_tran; +typedef struct nni_tran_option nni_tran_option; +typedef struct nni_tran_dialer_ops nni_tran_dialer_ops; +typedef struct nni_tran_listener_ops nni_tran_listener_ops; +typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; typedef struct nni_proto_option nni_proto_option; typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; diff --git a/src/core/dialer.c b/src/core/dialer.c new file mode 100644 index 00000000..ee0d2916 --- /dev/null +++ b/src/core/dialer.c @@ -0,0 +1,512 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_dialer { + nni_tran_dialer_ops d_ops; // transport ops + nni_tran * d_tran; // transport pointer + void * d_data; // transport private + uint64_t d_id; // endpoint id + nni_list_node d_node; // per socket list + nni_sock * d_sock; + nni_url * d_url; + int d_refcnt; + int d_lastrv; // last result from synchronous + bool d_synch; // synchronous connect in progress? + bool d_started; + bool d_closed; // full shutdown + bool d_closing; // close pending (waiting on refcnt) + nni_mtx d_mtx; + nni_cv d_cv; + nni_list d_pipes; + nni_aio * d_con_aio; + nni_aio * d_tmo_aio; // backoff timer + nni_duration d_maxrtime; // maximum time for reconnect + nni_duration d_currtime; // current time for reconnect + nni_duration d_inirtime; // initial time for reconnect + nni_time d_conntime; // time of last good connect +}; + +// Functionality related to dialers. +static void dialer_connect_start(nni_dialer *); +static void dialer_connect_cb(void *); +static void dialer_timer_cb(void *); + +static nni_idhash *dialers; +static nni_mtx dialers_lk; + +int +nni_dialer_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&dialers)) != 0) { + return (rv); + } + nni_mtx_init(&dialers_lk); + nni_idhash_set_limits( + dialers, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_dialer_sys_fini(void) +{ + nni_mtx_fini(&dialers_lk); + nni_idhash_fini(dialers); + dialers = NULL; +} + +uint32_t +nni_dialer_id(nni_dialer *d) +{ + return ((uint32_t) d->d_id); +} + +static void +dialer_destroy(nni_dialer *d) +{ + if (d == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (d->d_id != 0) { + nni_idhash_remove(dialers, d->d_id); + } + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_sock_remove_dialer(d->d_sock, d); + + nni_aio_fini(d->d_con_aio); + nni_aio_fini(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + if (d->d_data != NULL) { + d->d_ops.d_fini(d->d_data); + } + nni_mtx_unlock(&d->d_mtx); + nni_cv_fini(&d->d_cv); + nni_mtx_fini(&d->d_mtx); + nni_url_free(d->d_url); + NNI_FREE_STRUCT(d); +} + +int +nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_dialer *d; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_dialer == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + d->d_url = url; + d->d_closed = false; + d->d_closing = false; + d->d_started = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + d->d_ops = *tran->tran_dialer; + + NNI_LIST_NODE_INIT(&d->d_node); + + nni_pipe_ep_list_init(&d->d_pipes); + + nni_mtx_init(&d->d_mtx); + nni_cv_init(&d->d_cv, &d->d_mtx); + + if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_sock_add_dialer(s, d)) != 0)) { + dialer_destroy(d); + return (rv); + } + + *dp = d; + return (0); +} + +int +nni_dialer_find(nni_dialer **dp, uint32_t id) +{ + int rv; + nni_dialer *d; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&dialers_lk); + if ((rv = nni_idhash_find(dialers, id, (void **) &d)) == 0) { + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + *dp = d; + } + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +int +nni_dialer_hold(nni_dialer *d) +{ + int rv; + nni_mtx_lock(&dialers_lk); + if (d->d_closed) { + rv = NNG_ECLOSED; + } else { + d->d_refcnt++; + rv = 0; + } + nni_mtx_unlock(&dialers_lk); + return (rv); +} + +void +nni_dialer_rele(nni_dialer *d) +{ + nni_mtx_lock(&dialers_lk); + d->d_refcnt--; + if (d->d_closing) { + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&dialers_lk); +} + +int +nni_dialer_shutdown(nni_dialer *d) +{ + nni_mtx_lock(&d->d_mtx); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + d->d_closing = true; + nni_mtx_unlock(&d->d_mtx); + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + return (0); +} + +void +nni_dialer_close(nni_dialer *d) +{ + nni_pipe *p; + + nni_mtx_lock(&d->d_mtx); + if (d->d_closed) { + nni_mtx_unlock(&d->d_mtx); + nni_dialer_rele(d); + return; + } + d->d_closed = true; + nni_mtx_unlock(&d->d_mtx); + + nni_dialer_shutdown(d); + + nni_aio_stop(d->d_con_aio); + nni_aio_stop(d->d_tmo_aio); + + nni_mtx_lock(&d->d_mtx); + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { + nni_cv_wait(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); + + dialer_destroy(d); +} + +// This function starts an exponential backoff timer for reconnecting. +static void +dialer_timer_start(nni_dialer *d) +{ + nni_duration backoff; + + if (d->d_closing) { + return; + } + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +static void +dialer_timer_cb(void *arg) +{ + nni_dialer *d = arg; + nni_aio * aio = d->d_tmo_aio; + + nni_mtx_lock(&d->d_mtx); + if (nni_aio_result(aio) == 0) { + dialer_connect_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_cb(void *arg) +{ + nni_dialer *d = arg; + nni_pipe * p; + nni_aio * aio = d->d_con_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + } + if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&d->d_mtx); + switch (rv) { + case 0: + nni_pipe_set_dialer(p, d); + nni_list_append(&d->d_pipes, p); + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + nni_pipe_stop(p); + return; + } + + // Good connect, so reset the backoff timer. + // Note that a host that accepts the connect, but drops + // us immediately, is going to get hit pretty hard + // (depending on the initial backoff) with no + // exponential backoff. This can happen if we wind up + // trying to connect to some port that does not speak + // SP for example. + d->d_currtime = d->d_inirtime; + + // No further outgoing connects -- we will restart a + // connection from the pipe when the pipe is removed. + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled/closed -- stop everything. + break; + default: + // redial, but only if we are not synchronous + if (!d->d_synch) { + dialer_timer_start(d); + } + break; + } + if (d->d_synch) { + if (rv != 0) { + d->d_started = false; + } + d->d_lastrv = rv; + d->d_synch = false; + nni_cv_wake(&d->d_cv); + } + nni_mtx_unlock(&d->d_mtx); +} + +static void +dialer_connect_start(nni_dialer *d) +{ + nni_aio *aio = d->d_con_aio; + + // Call with the Endpoint lock held. + if (d->d_closing) { + return; + } + + d->d_ops.d_connect(d->d_data, aio); +} + +int +nni_dialer_start(nni_dialer *d, int flags) +{ + int rv = 0; + + nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime); + d->d_currtime = d->d_inirtime; + + nni_mtx_lock(&d->d_mtx); + + if (d->d_closing) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ECLOSED); + } + + if (d->d_started) { + nni_mtx_unlock(&d->d_mtx); + return (NNG_ESTATE); + } + + if ((flags & NNG_FLAG_NONBLOCK) != 0) { + d->d_started = true; + dialer_connect_start(d); + nni_mtx_unlock(&d->d_mtx); + return (0); + } + + d->d_synch = true; + d->d_started = true; + dialer_connect_start(d); + + while (d->d_synch && !d->d_closing) { + nni_cv_wait(&d->d_cv); + } + rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; + nni_cv_wake(&d->d_cv); + + nni_mtx_unlock(&d->d_mtx); + return (rv); +} + +void +nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) +{ + if (d == NULL) { + return; + } + + // Break up the relationship between the dialer and the pipe. + nni_mtx_lock(&d->d_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&d->d_pipes, p)) { + nni_list_remove(&d->d_pipes, p); + } + // Wake up the close thread if it is waiting. + if (d->d_closed) { + if (nni_list_empty(&d->d_pipes)) { + nni_cv_wake(&d->d_cv); + } + } else { + // If this pipe closed, then lets restart the dial operation. + // Since the remote side seems to have closed, lets start with + // a backoff. This keeps us from pounding the crap out of the + // thing if a remote server accepts but then disconnects + // immediately. + dialer_timer_start(d); + } + nni_mtx_unlock(&d->d_mtx); +} + +int +nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, + nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&d->d_mtx); + rv = o->o_set(d->d_data, val, sz, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_dialer_getopt( + nni_dialer *d, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = d->d_ops.d_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&d->d_mtx); + rv = o->o_get(d->d_data, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(d->d_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); +} + +void +nni_dialer_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_dialer, d_node); +} diff --git a/src/core/dialer.h b/src/core/dialer.h new file mode 100644 index 00000000..56b0fb1b --- /dev/null +++ b/src/core/dialer.h @@ -0,0 +1,32 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_DIALER_H +#define CORE_DIALER_H + +extern int nni_dialer_sys_init(void); +extern void nni_dialer_sys_fini(void); +extern int nni_dialer_find(nni_dialer **, uint32_t); +extern int nni_dialer_hold(nni_dialer *); +extern void nni_dialer_rele(nni_dialer *); +extern uint32_t nni_dialer_id(nni_dialer *); +extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); +extern int nni_dialer_shutdown(nni_dialer *); +extern void nni_dialer_close(nni_dialer *); +extern int nni_dialer_start(nni_dialer *, int); +extern void nni_dialer_list_init(nni_list *); +extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *); + +extern int nni_dialer_setopt( + nni_dialer *, const char *, const void *, size_t, nni_opt_type); +extern int nni_dialer_getopt( + nni_dialer *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_DIALER_H diff --git a/src/core/endpt.c b/src/core/endpt.c deleted file mode 100644 index 8e678fb0..00000000 --- a/src/core/endpt.c +++ /dev/null @@ -1,665 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -struct nni_ep { - nni_tran_ep_ops ep_ops; // transport ops - nni_tran * ep_tran; // transport pointer - void * ep_data; // transport private - uint64_t ep_id; // endpoint id - nni_list_node ep_node; // per socket list - nni_sock * ep_sock; - nni_url * ep_url; - int ep_mode; - int ep_refcnt; - bool ep_started; - bool ep_closed; // full shutdown - bool ep_closing; // close pending (waiting on refcnt) - bool ep_tmo_run; - nni_mtx ep_mtx; - nni_cv ep_cv; - nni_list ep_pipes; - nni_aio * ep_acc_aio; - nni_aio * ep_con_aio; - nni_aio * ep_con_syn; // used for sync connect - nni_aio * ep_tmo_aio; // backoff timer - nni_duration ep_maxrtime; // maximum time for reconnect - nni_duration ep_currtime; // current time for reconnect - nni_duration ep_inirtime; // initial time for reconnect - nni_time ep_conntime; // time of last good connect -}; - -// Functionality related to end points. - -static void nni_ep_acc_start(nni_ep *); -static void nni_ep_acc_cb(void *); -static void nni_ep_con_start(nni_ep *); -static void nni_ep_con_cb(void *); -static void nni_ep_tmo_start(nni_ep *); -static void nni_ep_tmo_cb(void *); - -static nni_idhash *nni_eps; -static nni_mtx nni_ep_lk; - -int -nni_ep_sys_init(void) -{ - int rv; - - if ((rv = nni_idhash_init(&nni_eps)) != 0) { - return (rv); - } - nni_mtx_init(&nni_ep_lk); - nni_idhash_set_limits( - nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - - return (0); -} - -void -nni_ep_sys_fini(void) -{ - nni_mtx_fini(&nni_ep_lk); - nni_idhash_fini(nni_eps); - nni_eps = NULL; -} - -uint32_t -nni_ep_id(nni_ep *ep) -{ - return ((uint32_t) ep->ep_id); -} - -static void -nni_ep_destroy(nni_ep *ep) -{ - if (ep == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (ep->ep_id != 0) { - nni_idhash_remove(nni_eps, ep->ep_id); - } - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_sock_ep_remove(ep->ep_sock, ep); - - nni_aio_fini(ep->ep_acc_aio); - nni_aio_fini(ep->ep_con_aio); - nni_aio_fini(ep->ep_con_syn); - nni_aio_fini(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_data != NULL) { - ep->ep_ops.ep_fini(ep->ep_data); - } - nni_mtx_unlock(&ep->ep_mtx); - nni_cv_fini(&ep->ep_cv); - nni_mtx_fini(&ep->ep_mtx); - nni_url_free(ep->ep_url); - NNI_FREE_STRUCT(ep); -} - -static int -nni_ep_create(nni_ep **epp, nni_sock *s, const char *urlstr, int mode) -{ - nni_tran *tran; - nni_ep * ep; - int rv; - nni_url * url; - - if ((rv = nni_url_parse(&url, urlstr)) != 0) { - return (rv); - } - if ((tran = nni_tran_find(url)) == NULL) { - nni_url_free(url); - return (NNG_ENOTSUP); - } - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - nni_url_free(url); - return (NNG_ENOMEM); - } - ep->ep_url = url; - ep->ep_closed = false; - ep->ep_closing = false; - ep->ep_started = false; - ep->ep_data = NULL; - ep->ep_refcnt = 1; - ep->ep_sock = s; - ep->ep_tran = tran; - ep->ep_mode = mode; - - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - ep->ep_ops = *tran->tran_ep; - - NNI_LIST_NODE_INIT(&ep->ep_node); - - nni_pipe_ep_list_init(&ep->ep_pipes); - - nni_mtx_init(&ep->ep_mtx); - nni_cv_init(&ep->ep_cv, &ep->ep_mtx); - - if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) || - ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || - ((rv = nni_sock_ep_add(s, ep)) != 0)) { - nni_ep_destroy(ep); - return (rv); - } - - *epp = ep; - return (0); -} - -int -nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_DIAL)); -} - -int -nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *urlstr) -{ - return (nni_ep_create(epp, s, urlstr, NNI_EP_MODE_LISTEN)); -} - -int -nni_ep_find(nni_ep **epp, uint32_t id) -{ - int rv; - nni_ep *ep; - - if ((rv = nni_init()) != 0) { - return (rv); - } - - nni_mtx_lock(&nni_ep_lk); - if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) { - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - *epp = ep; - } - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -int -nni_ep_hold(nni_ep *ep) -{ - int rv; - nni_mtx_lock(&nni_ep_lk); - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - ep->ep_refcnt++; - rv = 0; - } - nni_mtx_unlock(&nni_ep_lk); - return (rv); -} - -void -nni_ep_rele(nni_ep *ep) -{ - nni_mtx_lock(&nni_ep_lk); - ep->ep_refcnt--; - if (ep->ep_closing) { - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&nni_ep_lk); -} - -int -nni_ep_shutdown(nni_ep *ep) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - ep->ep_closing = true; - nni_mtx_unlock(&ep->ep_mtx); - - // Abort any remaining in-flight operations. - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - // Stop the underlying transport. - ep->ep_ops.ep_close(ep->ep_data); - - return (0); -} - -void -nni_ep_close(nni_ep *ep) -{ - nni_pipe *p; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - nni_ep_rele(ep); - return; - } - ep->ep_closed = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_shutdown(ep); - - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); - - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) { - nni_cv_wait(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); - - nni_ep_destroy(ep); -} - -static void -nni_ep_tmo_cancel(nni_aio *aio, int rv) -{ - nni_ep *ep = nni_aio_get_prov_data(aio); - // The only way this ever gets "finished", is via cancellation. - if (ep != NULL) { - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_tmo_run) { - nni_aio_finish_error(aio, rv); - } - ep->ep_tmo_run = false; - nni_mtx_unlock(&ep->ep_mtx); - } -} - -static void -nni_ep_tmo_start(nni_ep *ep) -{ - nni_duration backoff; - int rv; - - if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { - return; - } - backoff = ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - - nni_aio_set_timeout( - ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); - - if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) != - 0) { - nni_aio_finish_error(ep->ep_tmo_aio, rv); - } - - ep->ep_tmo_run = true; -} - -static void -nni_ep_tmo_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_tmo_aio; - - nni_mtx_lock(&ep->ep_mtx); - if (nni_aio_result(aio) == NNG_ETIMEDOUT) { - if (ep->ep_mode == NNI_EP_MODE_DIAL) { - nni_ep_con_start(ep); - } else { - nni_ep_acc_start(ep); - } - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_con_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - ep->ep_currtime = ep->ep_inirtime; - - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. - break; - default: - // Other errors involve the use of the backoff timer. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_con_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_con_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - - ep->ep_ops.ep_connect(ep->ep_data, aio); -} - -int -nni_ep_dial(nni_ep *ep, int flags) -{ - int rv = 0; - nni_aio *aio; - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - - if (ep->ep_mode != NNI_EP_MODE_DIAL) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - if ((flags & NNG_FLAG_NONBLOCK) != 0) { - ep->ep_started = true; - nni_ep_con_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - return (0); - } - - // Synchronous mode: so we have to wait for it to complete. - aio = ep->ep_con_syn; - ep->ep_ops.ep_connect(ep->ep_data, aio); - ep->ep_started = true; - nni_mtx_unlock(&ep->ep_mtx); - - nni_aio_wait(aio); - - // As we're synchronous, we also have to handle the completion. - if (((rv = nni_aio_result(aio)) != 0) || - ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) { - nni_mtx_lock(&ep->ep_mtx); - ep->ep_started = false; - nni_mtx_unlock(&ep->ep_mtx); - } - return (rv); -} - -static void -nni_ep_acc_cb(void *arg) -{ - nni_ep * ep = arg; - nni_aio *aio = ep->ep_acc_aio; - int rv; - - if ((rv = nni_aio_result(aio)) == 0) { - NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL); - rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0)); - } - - nni_mtx_lock(&ep->ep_mtx); - switch (rv) { - case 0: - nni_ep_acc_start(ep); - break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled or closed, no further action. - break; - case NNG_ECONNABORTED: - case NNG_ECONNRESET: - // These are remote conditions, no cool down. - nni_ep_acc_start(ep); - break; - default: - // We don't really know why we failed, but we backoff - // here. This is because errors here are probably due - // to system failures (resource exhaustion) and we hope - // by not thrashing we give the system a chance to - // recover. - nni_ep_tmo_start(ep); - break; - } - nni_mtx_unlock(&ep->ep_mtx); -} - -static void -nni_ep_acc_start(nni_ep *ep) -{ - nni_aio *aio = ep->ep_acc_aio; - - // Call with the Endpoint lock held. - if (ep->ep_closing) { - return; - } - ep->ep_ops.ep_accept(ep->ep_data, aio); -} - -int -nni_ep_listen(nni_ep *ep, int flags) -{ - int rv = 0; - NNI_ARG_UNUSED(flags); - - nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); - ep->ep_currtime = ep->ep_inirtime; - - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_mode != NNI_EP_MODE_LISTEN) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ENOTSUP); - } - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ESTATE); - } - - rv = ep->ep_ops.ep_bind(ep->ep_data); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - ep->ep_started = true; - nni_ep_acc_start(ep); - nni_mtx_unlock(&ep->ep_mtx); - - return (0); -} - -int -nni_ep_pipe_add(nni_ep *ep, nni_pipe *p) -{ - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closing) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); - } - nni_list_append(&ep->ep_pipes, p); - nni_mtx_unlock(&ep->ep_mtx); - return (0); -} - -void -nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) -{ - // Break up the relationship between the EP and the pipe. - nni_mtx_lock(&ep->ep_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&ep->ep_pipes, pipe)) { - nni_list_remove(&ep->ep_pipes, pipe); - } - // Wake up the close thread if it is waiting. - if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) { - nni_cv_wake(&ep->ep_cv); - } - - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) { - nni_ep_tmo_start(ep); - } - nni_mtx_unlock(&ep->ep_mtx); -} - -int -nni_ep_setopt( - nni_ep *ep, const char *name, const void *val, size_t sz, nni_opt_type t) -{ - nni_tran_option *o; - - if (strcmp(name, NNG_OPT_URL) == 0) { - return (NNG_EREADONLY); - } - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_set == NULL) { - return (NNG_EREADONLY); - } - - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_set(ep->ep_data, val, sz, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - return (NNG_ENOTSUP); -} - -int -nni_ep_mode(nni_ep *ep) -{ - return (ep->ep_mode); -} - -int -nni_ep_getopt( - nni_ep *ep, const char *name, void *valp, size_t *szp, nni_opt_type t) -{ - nni_tran_option *o; - - for (o = ep->ep_ops.ep_options; o && o->o_name; o++) { - int rv; - if (strcmp(o->o_name, name) != 0) { - continue; - } - if (o->o_get == NULL) { - return (NNG_EWRITEONLY); - } - nni_mtx_lock(&ep->ep_mtx); - rv = o->o_get(ep->ep_data, valp, szp, t); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - // We provide a fallback on the URL, but let the implementation - // override. This allows the URL to be created with wildcards, - // that are resolved later. - if (strcmp(name, NNG_OPT_URL) == 0) { - return (nni_copyout_str(ep->ep_url->u_rawurl, valp, szp, t)); - } - - return (nni_sock_getopt(ep->ep_sock, name, valp, szp, t)); -} - -void -nni_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_ep, ep_node); -} - -nni_tran * -nni_ep_tran(nni_ep *ep) -{ - return (ep->ep_tran); -} - -nni_sock * -nni_ep_sock(nni_ep *ep) -{ - return (ep->ep_sock); -} diff --git a/src/core/endpt.h b/src/core/endpt.h deleted file mode 100644 index bf251d41..00000000 --- a/src/core/endpt.h +++ /dev/null @@ -1,45 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_ENDPT_H -#define CORE_ENDPT_H - -extern int nni_ep_sys_init(void); -extern void nni_ep_sys_fini(void); -extern nni_tran *nni_ep_tran(nni_ep *); -extern nni_sock *nni_ep_sock(nni_ep *); -extern int nni_ep_find(nni_ep **, uint32_t); -extern int nni_ep_hold(nni_ep *); -extern void nni_ep_rele(nni_ep *); -extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); -extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); -extern int nni_ep_shutdown(nni_ep *); -extern void nni_ep_close(nni_ep *); -extern int nni_ep_dial(nni_ep *, int); -extern int nni_ep_listen(nni_ep *, int); -extern void nni_ep_list_init(nni_list *); -extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); -extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); -extern int nni_ep_mode(nni_ep *); - -extern int nni_ep_setopt( - nni_ep *, const char *, const void *, size_t, nni_opt_type); -extern int nni_ep_getopt( - nni_ep *, const char *, void *, size_t *, nni_opt_type); - -// Endpoint modes. Currently used by transports. Remove this when we make -// transport dialers and listeners explicit. -enum nni_ep_mode { - NNI_EP_MODE_DIAL = 1, - NNI_EP_MODE_LISTEN = 2, -}; - -#endif // CORE_ENDPT_H diff --git a/src/core/init.c b/src/core/init.c index c1b7bbac..30a7a547 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -33,7 +33,8 @@ nni_init_helper(void) ((rv = nni_aio_sys_init()) != 0) || ((rv = nni_random_sys_init()) != 0) || ((rv = nni_sock_sys_init()) != 0) || - ((rv = nni_ep_sys_init()) != 0) || + ((rv = nni_listener_sys_init()) != 0) || + ((rv = nni_dialer_sys_init()) != 0) || ((rv = nni_pipe_sys_init()) != 0) || ((rv = nni_proto_sys_init()) != 0) || ((rv = nni_tran_sys_init()) != 0)) { @@ -71,7 +72,8 @@ nni_fini(void) nni_tran_sys_fini(); nni_proto_sys_fini(); nni_pipe_sys_fini(); - nni_ep_sys_fini(); + nni_dialer_sys_fini(); + nni_listener_sys_fini(); nni_sock_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_random_sys_fini(); diff --git a/src/core/listener.c b/src/core/listener.c new file mode 100644 index 00000000..6d580cd8 --- /dev/null +++ b/src/core/listener.c @@ -0,0 +1,443 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +struct nni_listener { + nni_tran_listener_ops l_ops; // transport ops + nni_tran * l_tran; // transport pointer + void * l_data; // transport private + uint64_t l_id; // endpoint id + nni_list_node l_node; // per socket list + nni_sock * l_sock; + nni_url * l_url; + int l_refcnt; + bool l_started; + bool l_closed; // full shutdown + bool l_closing; // close pending (waiting on refcnt) + nni_mtx l_mtx; + nni_cv l_cv; + nni_list l_pipes; + nni_aio * l_acc_aio; + nni_aio * l_tmo_aio; +}; + +// Functionality related to listeners. + +static void listener_accept_start(nni_listener *); +static void listener_accept_cb(void *); +static void listener_timer_cb(void *); + +static nni_idhash *listeners; +static nni_mtx listeners_lk; + +int +nni_listener_sys_init(void) +{ + int rv; + + if ((rv = nni_idhash_init(&listeners)) != 0) { + return (rv); + } + nni_mtx_init(&listeners_lk); + nni_idhash_set_limits( + listeners, 1, 0x7fffffff, nni_random() & 0x7fffffff); + + return (0); +} + +void +nni_listener_sys_fini(void) +{ + nni_mtx_fini(&listeners_lk); + nni_idhash_fini(listeners); + listeners = NULL; +} + +uint32_t +nni_listener_id(nni_listener *l) +{ + return ((uint32_t) l->l_id); +} + +static void +listener_destroy(nni_listener *l) +{ + if (l == NULL) { + return; + } + + // Remove us from the table so we cannot be found. + if (l->l_id != 0) { + nni_idhash_remove(listeners, l->l_id); + } + + nni_aio_stop(l->l_acc_aio); + + nni_sock_remove_listener(l->l_sock, l); + + nni_aio_fini(l->l_acc_aio); + + nni_mtx_lock(&l->l_mtx); + if (l->l_data != NULL) { + l->l_ops.l_fini(l->l_data); + } + nni_mtx_unlock(&l->l_mtx); + nni_cv_fini(&l->l_cv); + nni_mtx_fini(&l->l_mtx); + nni_url_free(l->l_url); + NNI_FREE_STRUCT(l); +} + +int +nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) +{ + nni_tran * tran; + nni_listener *l; + int rv; + nni_url * url; + + if ((rv = nni_url_parse(&url, urlstr)) != 0) { + return (rv); + } + if (((tran = nni_tran_find(url)) == NULL) || + (tran->tran_listener == NULL)) { + nni_url_free(url); + return (NNG_ENOTSUP); + } + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + nni_url_free(url); + return (NNG_ENOMEM); + } + l->l_url = url; + l->l_closed = false; + l->l_closing = false; + l->l_started = false; + l->l_data = NULL; + l->l_refcnt = 1; + l->l_sock = s; + l->l_tran = tran; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + l->l_ops = *tran->tran_listener; + + NNI_LIST_NODE_INIT(&l->l_node); + + nni_pipe_ep_list_init(&l->l_pipes); + + nni_mtx_init(&l->l_mtx); + nni_cv_init(&l->l_cv, &l->l_mtx); + + if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || + ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) || + ((rv = nni_sock_add_listener(s, l)) != 0)) { + listener_destroy(l); + return (rv); + } + + *lp = l; + return (0); +} + +int +nni_listener_find(nni_listener **lp, uint32_t id) +{ + int rv; + nni_listener *l; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(&listeners_lk); + if ((rv = nni_idhash_find(listeners, id, (void **) &l)) == 0) { + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + *lp = l; + } + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +int +nni_listener_hold(nni_listener *l) +{ + int rv; + nni_mtx_lock(&listeners_lk); + if (l->l_closed) { + rv = NNG_ECLOSED; + } else { + l->l_refcnt++; + rv = 0; + } + nni_mtx_unlock(&listeners_lk); + return (rv); +} + +void +nni_listener_rele(nni_listener *l) +{ + nni_mtx_lock(&listeners_lk); + l->l_refcnt--; + if (l->l_closing) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&listeners_lk); +} + +int +nni_listener_shutdown(nni_listener *l) +{ + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + l->l_closing = true; + nni_mtx_unlock(&l->l_mtx); + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + return (0); +} + +void +nni_listener_close(nni_listener *l) +{ + nni_pipe *p; + + nni_mtx_lock(&l->l_mtx); + if (l->l_closed) { + nni_mtx_unlock(&l->l_mtx); + nni_listener_rele(l); + return; + } + l->l_closed = true; + nni_mtx_unlock(&l->l_mtx); + + nni_listener_shutdown(l); + + nni_aio_stop(l->l_acc_aio); + nni_aio_stop(l->l_tmo_aio); + + nni_mtx_lock(&l->l_mtx); + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) { + nni_cv_wait(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); + + listener_destroy(l); +} + +static void +listener_timer_cb(void *arg) +{ + nni_listener *l = arg; + nni_aio * aio = l->l_tmo_aio; + + nni_mtx_lock(&l->l_mtx); + if (nni_aio_result(aio) == 0) { + listener_accept_start(l); + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_cb(void *arg) +{ + nni_listener *l = arg; + nni_pipe * p; + nni_aio * aio = l->l_acc_aio; + int rv; + + if ((rv = nni_aio_result(aio)) == 0) { + void *data = nni_aio_get_output(aio, 0); + NNI_ASSERT(data != NULL); + rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); + } + + if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { + nni_pipe_stop(p); + } + + nni_mtx_lock(&l->l_mtx); + switch (rv) { + case 0: + nni_pipe_set_listener(p, l); + nni_list_append(&l->l_pipes, p); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + nni_pipe_stop(p); + return; + } + listener_accept_start(l); + break; + case NNG_ECONNABORTED: // remote condition, no cooldown + case NNG_ECONNRESET: // remote condition, no cooldown + listener_accept_start(l); + break; + case NNG_ECLOSED: // no further action + case NNG_ECANCELED: // no further action + break; + default: + // We don't really know why we failed, but we backoff + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. 100 msec is enough to cool down. + nni_sleep_aio(100, l->l_tmo_aio); + break; + } + nni_mtx_unlock(&l->l_mtx); +} + +static void +listener_accept_start(nni_listener *l) +{ + nni_aio *aio = l->l_acc_aio; + + // Call with the listener lock held. + if (l->l_closing) { + return; + } + l->l_ops.l_accept(l->l_data, aio); +} + +int +nni_listener_start(nni_listener *l, int flags) +{ + int rv = 0; + NNI_ARG_UNUSED(flags); + + nni_mtx_lock(&l->l_mtx); + if (l->l_closing) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ECLOSED); + } + if (l->l_started) { + nni_mtx_unlock(&l->l_mtx); + return (NNG_ESTATE); + } + + if ((rv = l->l_ops.l_bind(l->l_data)) != 0) { + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + l->l_started = true; + listener_accept_start(l); + nni_mtx_unlock(&l->l_mtx); + + return (0); +} + +void +nni_listener_remove_pipe(nni_listener *l, nni_pipe *p) +{ + if (l == NULL) { + return; + } + // Break up relationship between listener and pipe. + nni_mtx_lock(&l->l_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&l->l_pipes, p)) { + nni_list_remove(&l->l_pipes, p); + } + // Wake up the closer if it is waiting. + if (l->l_closed && nni_list_empty(&l->l_pipes)) { + nni_cv_wake(&l->l_cv); + } + nni_mtx_unlock(&l->l_mtx); +} + +int +nni_listener_setopt(nni_listener *l, const char *name, const void *val, + size_t sz, nni_opt_type t) +{ + nni_tran_option *o; + + if (strcmp(name, NNG_OPT_URL) == 0) { + return (NNG_EREADONLY); + } + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_set == NULL) { + return (NNG_EREADONLY); + } + + nni_mtx_lock(&l->l_mtx); + rv = o->o_set(l->l_data, val, sz, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + return (NNG_ENOTSUP); +} + +int +nni_listener_getopt( + nni_listener *l, const char *name, void *valp, size_t *szp, nni_opt_type t) +{ + nni_tran_option *o; + + for (o = l->l_ops.l_options; o && o->o_name; o++) { + int rv; + if (strcmp(o->o_name, name) != 0) { + continue; + } + if (o->o_get == NULL) { + return (NNG_EWRITEONLY); + } + nni_mtx_lock(&l->l_mtx); + rv = o->o_get(l->l_data, valp, szp, t); + nni_mtx_unlock(&l->l_mtx); + return (rv); + } + + // We provide a fallback on the URL, but let the implementation + // override. This allows the URL to be created with wildcards, + // that are resolved later. + if (strcmp(name, NNG_OPT_URL) == 0) { + return (nni_copyout_str(l->l_url->u_rawurl, valp, szp, t)); + } + + return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); +} + +void +nni_listener_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_listener, l_node); +} diff --git a/src/core/listener.h b/src/core/listener.h new file mode 100644 index 00000000..41b1a678 --- /dev/null +++ b/src/core/listener.h @@ -0,0 +1,33 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_LISTENER_H +#define CORE_LISTENER_H + +extern int nni_listener_sys_init(void); +extern void nni_listener_sys_fini(void); +extern int nni_listener_find(nni_listener **, uint32_t); +extern int nni_listener_hold(nni_listener *); +extern void nni_listener_rele(nni_listener *); +extern uint32_t nni_listener_id(nni_listener *); +extern int nni_listener_create(nni_listener **, nni_sock *, const char *); +extern int nni_listener_shutdown(nni_listener *); +extern void nni_listener_close(nni_listener *); +extern int nni_listener_start(nni_listener *, int); +extern void nni_listener_list_init(nni_list *); +extern int nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *); + +extern int nni_listener_setopt( + nni_listener *, const char *, const void *, size_t, nni_opt_type); +extern int nni_listener_getopt( + nni_listener *, const char *, void *, size_t *, nni_opt_type); + +#endif // CORE_LISTENER_H diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index fdb2ce94..9af12720 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -52,7 +52,8 @@ // These have to come after the others - particularly transport.h -#include "core/endpt.h" +#include "core/dialer.h" +#include "core/listener.h" #include "core/pipe.h" #include "core/socket.h" diff --git a/src/core/pipe.c b/src/core/pipe.c index 93fbae99..a42cdeff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -26,7 +26,8 @@ struct nni_pipe { nni_list_node p_sock_node; nni_list_node p_ep_node; nni_sock * p_sock; - nni_ep * p_ep; + nni_listener * p_listener; + nni_dialer * p_dialer; bool p_closed; bool p_stop; bool p_cbs; @@ -98,7 +99,7 @@ nni_pipe_sys_fini(void) } } -static void +void nni_pipe_destroy(nni_pipe *p) { bool cbs; @@ -126,9 +127,8 @@ nni_pipe_destroy(nni_pipe *p) // We have exclusive access at this point, so we can check if // we are still on any lists. - if (nni_list_node_active(&p->p_ep_node)) { - nni_ep_pipe_remove(p->p_ep, p); - } + nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL + nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); @@ -303,12 +303,10 @@ nni_pipe_start_cb(void *arg) } int -nni_pipe_create(nni_ep *ep, void *tdata) +nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; - nni_tran * tran = nni_ep_tran(ep); - nni_sock * sock = nni_ep_sock(ep); void * sdata = nni_sock_proto_data(sock); nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); @@ -324,7 +322,6 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_data = tdata; p->p_proto_ops = *pops; p->p_proto_data = NULL; - p->p_ep = ep; p->p_sock = sock; p->p_closed = false; p->p_stop = false; @@ -348,16 +345,27 @@ nni_pipe_create(nni_ep *ep, void *tdata) } if ((rv != 0) || - ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0) || - ((rv = nni_sock_pipe_add(sock, p)) != 0)) { + ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_destroy(p); return (rv); } + *pp = p; return (0); } +void +nni_pipe_set_listener(nni_pipe *p, nni_listener *l) +{ + p->p_listener = l; +} + +void +nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) +{ + p->p_dialer = d; +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) @@ -371,7 +379,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } // Maybe the endpoint knows? - return (nni_ep_getopt(p->p_ep, name, val, szp, t)); + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); + } + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); + } + return (NNG_ENOTSUP); } void @@ -409,15 +423,20 @@ nni_pipe_sock_id(nni_pipe *p) } uint32_t -nni_pipe_ep_id(nni_pipe *p) +nni_pipe_listener_id(nni_pipe *p) { - return (nni_ep_id(p->p_ep)); + if (p->p_listener != NULL) { + return (nni_listener_id(p->p_listener)); + } + return (0); } - -int -nni_pipe_ep_mode(nni_pipe *p) +uint32_t +nni_pipe_dialer_id(nni_pipe *p) { - return (nni_ep_mode(p->p_ep)); + if (p->p_dialer != NULL) { + return (nni_dialer_id(p->p_dialer)); + } + return (0); } static void diff --git a/src/core/pipe.h b/src/core/pipe.h index bd66d4ed..5c505514 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,6 +29,10 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); +// nni_pipe_destroy destroys a pipe -- there must not be any other +// references to it; this is used only during creation failures. +extern void nni_pipe_destroy(nni_pipe *); + // nni_pipe_close closes the underlying transport for the pipe. Further // operations against will return NNG_ECLOSED. extern void nni_pipe_close(nni_pipe *); @@ -48,7 +52,9 @@ extern void nni_pipe_stop(nni_pipe *); // endpoint, grabbing each of those locks. The function takes ownership of // the transport specific pipe (3rd argument), regardless of whether it // succeeds or not. The endpoint should be held when calling this. -extern int nni_pipe_create(nni_ep *, void *); +extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *); +extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *); +extern void nni_pipe_set_listener(nni_pipe *, nni_listener *); // nni_pipe_start is called by the socket to begin any startup activities // on the pipe before making it ready for use by protocols. For example, @@ -82,11 +88,11 @@ extern int nni_pipe_find(nni_pipe **, uint32_t); // nni_pipe_sock_id returns the socket id for the pipe (used by public API). extern uint32_t nni_pipe_sock_id(nni_pipe *); -// nni_pipe_ep_id returns the endpoint id for the pipe. -extern uint32_t nni_pipe_ep_id(nni_pipe *); +// nni_pipe_listener_id returns the listener id for the pipe (or 0 if none). +extern uint32_t nni_pipe_listener_id(nni_pipe *); -// nni_pipe_ep_mode returns the endpoint mode for the pipe. -extern int nni_pipe_ep_mode(nni_pipe *); +// nni_pipe_dialer_id returns the dialer id for the pipe (or 0 if none). +extern uint32_t nni_pipe_dialer_id(nni_pipe *); // nni_pipe_closed returns true if nni_pipe_close was called. // (This is used by the socket to determine if user closed the pipe diff --git a/src/core/socket.c b/src/core/socket.c index 4cf624e2..894e4fee 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,9 +82,10 @@ struct nni_socket { nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - nni_list s_ctxs; // active contexts (protected by global sock_lk) + nni_list s_listeners; // active listeners + nni_list s_dialers; // active dialers + nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global sock_lk) bool s_closing; // Socket is closing bool s_closed; // Socket closed, protected by global lock @@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); nni_pipe_sock_list_init(&s->s_pipes); - nni_ep_list_init(&s->s_eps); + nni_listener_list_init(&s->s_listeners); + nni_dialer_list_init(&s->s_dialers); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) int nni_sock_shutdown(nni_sock *sock) { - nni_pipe *pipe; - nni_ep * ep; - nni_ep * nep; - nni_ctx * ctx; - nni_ctx * nctx; + nni_pipe * pipe; + nni_dialer * d; + nni_dialer * nd; + nni_listener *l; + nni_listener *nl; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_shutdown(ep); + NNI_LIST_FOREACH (&sock->s_listeners, l) { + nni_listener_shutdown(l); } + NNI_LIST_FOREACH (&sock->s_dialers, d) { + nni_dialer_shutdown(d); + } + nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the endpoint list, attempting to close them. + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nep = nni_list_first(&sock->s_eps); - while ((ep = nep) != NULL) { - nep = nni_list_next(&sock->s_eps, nep); + nl = nni_list_first(&sock->s_listeners); + while ((l = nl) != NULL) { + nl = nni_list_next(&sock->s_listeners, nl); + + if (nni_listener_hold(l) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); + } + } + nd = nni_list_first(&sock->s_dialers); + while ((d = nd) != NULL) { + nd = nni_list_next(&sock->s_dialers, nd); - if (nni_ep_hold(ep) == 0) { + if (nni_dialer_hold(d) == 0) { nni_mtx_unlock(&sock->s_mx); - nni_ep_close(ep); + nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } } @@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock) // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_eps))) { + (!nni_list_empty(&sock->s_listeners)) || + (!nni_list_empty(&sock->s_dialers))) { nni_cv_wait(&sock->s_cv); } @@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s) // Wait for pipes, eps, and contexts to finish closing. nni_mtx_lock(&s->s_mx); - while ( - (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { + while ((!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_dialers)) || + (!nni_list_empty(&s->s_listeners))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *s, nni_ep *ep) +nni_sock_add_listener(nni_sock *s, nni_listener *l) +{ + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_listener_setopt( + l, sopt->name, sopt->data, sopt->sz, sopt->typ); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + + nni_list_append(&s->s_listeners, l); + nni_mtx_unlock(&s->s_mx); + return (0); +} + +int +nni_sock_add_dialer(nni_sock *s, nni_dialer *d) { nni_sockopt *sopt; @@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep) NNI_LIST_FOREACH (&s->s_options, sopt) { int rv; - rv = nni_ep_setopt( - ep, sopt->name, sopt->data, sopt->sz, sopt->typ); + rv = nni_dialer_setopt( + d, sopt->name, sopt->data, sopt->sz, sopt->typ); if ((rv != 0) && (rv != NNG_ENOTSUP)) { nni_mtx_unlock(&s->s_mx); return (rv); } } - nni_list_append(&s->s_eps, ep); + nni_list_append(&s->s_dialers, d); nni_mtx_unlock(&s->s_mx); return (0); } void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) +nni_sock_remove_listener(nni_sock *s, nni_listener *l) { - nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_eps, ep)) { - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_listeners, l)) { + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); } } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) +{ + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_dialers, d)) { + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + } + nni_mtx_unlock(&s->s_mx); } int @@ -948,7 +1007,8 @@ nni_sock_setopt( nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t) { int rv = NNG_ENOTSUP; - nni_ep * ep; + nni_dialer * d; + nni_listener * l; nni_sockopt * optv; nni_sockopt * oldv = NULL; const sock_option * sso; @@ -1042,9 +1102,20 @@ nni_sock_setopt( // transport (other than ENOTSUP) stops the operation // altogether. Its important that transport wide checks // properly pre-validate. - NNI_LIST_FOREACH (&s->s_eps, ep) { + NNI_LIST_FOREACH (&s->s_listeners, l) { + int x; + x = nni_listener_setopt(l, optv->name, optv->data, sz, t); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + NNI_LIST_FOREACH (&s->s_dialers, d) { int x; - x = nni_ep_setopt(ep, optv->name, optv->data, sz, t); + x = nni_dialer_setopt(d, optv->name, optv->data, sz, t); if (x != NNG_ENOTSUP) { if ((rv = x) != 0) { nni_mtx_unlock(&s->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 7c10b195..184cfb64 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,8 +46,11 @@ extern uint32_t nni_sock_id(nni_sock *); extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_ep_add(nni_sock *, nni_ep *); -extern void nni_sock_ep_remove(nni_sock *, nni_ep *); +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); + +extern int nni_sock_add_listener(nni_sock *, nni_listener *); +extern void nni_sock_remove_listener(nni_sock *, nni_listener *); // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since diff --git a/src/core/transport.c b/src/core/transport.c index 4733b6bd..8485d048 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -118,12 +118,28 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ) nni_mtx_lock(&nni_tran_lk); NNI_LIST_FOREACH (&nni_tran_list, t) { - const nni_tran_ep_ops *ep; - const nni_tran_option *o; + const nni_tran_dialer_ops * dops; + const nni_tran_listener_ops *lops; + const nni_tran_option * o; + + // Generally we look for endpoint options. We check both + // dialers and listeners. + dops = t->t_tran.tran_dialer; + for (o = dops->d_options; o && o->o_name != NULL; o++) { + if (strcmp(name, o->o_name) != 0) { + continue; + } + if (o->o_set == NULL) { + nni_mtx_unlock(&nni_tran_lk); + return (NNG_EREADONLY); + } - // Generally we look for endpoint options. - ep = t->t_tran.tran_ep; - for (o = ep->ep_options; o && o->o_name != NULL; o++) { + rv = (o->o_chk != NULL) ? o->o_chk(v, sz, typ) : 0; + nni_mtx_unlock(&nni_tran_lk); + return (rv); + } + lops = t->t_tran.tran_listener; + for (o = lops->l_options; o && o->o_name != NULL; o++) { if (strcmp(name, o->o_name) != 0) { continue; } diff --git a/src/core/transport.h b/src/core/transport.h index e45aa7ec..257d232d 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -11,30 +11,11 @@ #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -// Transport implementation details. Transports must implement the -// interfaces in this file. -struct nni_tran { - // tran_version is the version of the transport ops that this - // transport implements. We only bother to version the main - // ops vector. - uint32_t tran_version; - - // tran_scheme is the transport scheme, such as "tcp" or "inproc". - const char *tran_scheme; - - // tran_ep links our endpoint-specific operations. - const nni_tran_ep_ops *tran_ep; - - // tran_pipe links our pipe-specific operations. - const nni_tran_pipe_ops *tran_pipe; - - // tran_init, if not NULL, is called once during library - // initialization. - int (*tran_init)(void); - - // tran_fini, if not NULL, is called during library deinitialization. - // It should release any global resources, close any open files, etc. - void (*tran_fini)(void); +// Endpoint modes. Currently used by transports. Remove this when we make +// transport dialers and listeners explicit. +enum nni_ep_mode { + NNI_EP_MODE_DIAL = 1, + NNI_EP_MODE_LISTEN = 2, }; // We quite intentionally use a signature where the upper word is nonzero, @@ -48,7 +29,8 @@ struct nni_tran { #define NNI_TRANSPORT_V0 0x54520000 #define NNI_TRANSPORT_V1 0x54520001 #define NNI_TRANSPORT_V2 0x54520002 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V2 +#define NNI_TRANSPORT_V3 0x54520003 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V3 // Option handlers. struct nni_tran_option { @@ -81,40 +63,60 @@ struct nni_tran_option { // For a given endpoint, the framework holds a lock so that each entry // point is run exclusively of the others. (Transports must still guard // against any asynchronous operations they manage themselves, though.) -struct nni_tran_ep_ops { - // ep_init creates a vanilla endpoint. The value created is - // used for the first argument for all other endpoint - // functions. - int (*ep_init)(void **, nni_url *, nni_sock *, int); - // ep_fini frees the resources associated with the endpoint. - // The endpoint will already have been closed. - void (*ep_fini)(void *); +struct nni_tran_dialer_ops { + // d_init creates a vanilla dialer. The value created is + // used for the first argument for all other dialer functions. + int (*d_init)(void **, nni_url *, nni_sock *); + + // d_fini frees the resources associated with the dialer. + // The dialer will already have been closed. + void (*d_fini)(void *); - // ep_connect establishes a connection. It can return errors + // d_connect establishes a connection. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, // NNG_ECONNFAILED, NNG_ETIMEDOUT, and NNG_EPROTO. - void (*ep_connect)(void *, nni_aio *); + void (*d_connect)(void *, nni_aio *); + + // d_close stops the dialer from operating altogether. It + // does not affect pipes that have already been created. It is + // nonblocking. + void (*d_close)(void *); + + // d_options is an array of dialer options. The final + // element must have a NULL name. If this member is NULL, then + // no dialer specific options are available. + nni_tran_option *d_options; +}; + +struct nni_tran_listener_ops { + // l_init creates a vanilla listener. The value created is + // used for the first argument for all other listener functions. + int (*l_init)(void **, nni_url *, nni_sock *); - // ep_bind just does the bind() and listen() work, + // l_fini frees the resources associated with the listener. + // The listener will already have been closed. + void (*l_fini)(void *); + + // l_bind just does the bind() and listen() work, // reserving the address but not creating any connections. // It should return NNG_EADDRINUSE if the address is already // taken. It can also return NNG_EBADADDR for an unsuitable // address, or NNG_EACCESS for permission problems. - int (*ep_bind)(void *); + int (*l_bind)(void *); - // ep_accept accepts an inbound connection. - void (*ep_accept)(void *, nni_aio *); + // l_accept accepts an inbound connection. + void (*l_accept)(void *, nni_aio *); - // ep_close stops the endpoint from operating altogether. It + // l_close stops the listener from operating altogether. It // does not affect pipes that have already been created. It is // nonblocking. - void (*ep_close)(void *); + void (*l_close)(void *); - // ep_options is an array of endpoint options. The final + // l_options is an array of listener options. The final // element must have a NULL name. If this member is NULL, then - // no transport specific options are available. - nni_tran_option *ep_options; + // no dialer specific options are available. + nni_tran_option *l_options; }; // Pipe operations are entry points called by the socket. These may be @@ -168,6 +170,35 @@ struct nni_tran_pipe_ops { nni_tran_option *p_options; }; +// Transport implementation details. Transports must implement the +// interfaces in this file. +struct nni_tran { + // tran_version is the version of the transport ops that this + // transport implements. We only bother to version the main + // ops vector. + uint32_t tran_version; + + // tran_scheme is the transport scheme, such as "tcp" or "inproc". + const char *tran_scheme; + + // tran_dialer links our dialer-specific operations. + const nni_tran_dialer_ops *tran_dialer; + + // tran_listener links our listener-specific operations. + const nni_tran_listener_ops *tran_listener; + + // tran_pipe links our pipe-specific operations. + const nni_tran_pipe_ops *tran_pipe; + + // tran_init, if not NULL, is called once during library + // initialization. + int (*tran_init)(void); + + // tran_fini, if not NULL, is called during library deinitialization. + // It should release any global resources, close any open files, etc. + void (*tran_fini)(void); +}; + // These APIs are used by the framework internally, and not for use by // transport implementations. extern nni_tran *nni_tran_find(nni_url *); |
