From 2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 27 Dec 2021 23:10:29 -0800 Subject: Introduce nng_device_aio(). This function is like nng_device(), but runs asynchronously. Also, this fixes #1503 nng_device causes nng_close to blocking --- docs/man/nng_device.3.adoc | 17 +++- include/nng/nng.h | 10 ++ src/core/device.c | 248 +++++++++++++++++++++++---------------------- src/core/device.h | 4 +- src/core/socket.c | 8 ++ src/core/socket.h | 1 + src/nng.c | 31 +++++- 7 files changed, 184 insertions(+), 135 deletions(-) diff --git a/docs/man/nng_device.3.adoc b/docs/man/nng_device.3.adoc index b36d2e80..7151239f 100644 --- a/docs/man/nng_device.3.adoc +++ b/docs/man/nng_device.3.adoc @@ -1,6 +1,6 @@ = nng_device(3) // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This document is supplied under the terms of the MIT License, a @@ -20,14 +20,16 @@ nng_device - message forwarding device #include int nng_device(nng_socket s1, nng_socket s2); + +void nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2); ---- == DESCRIPTION -The `nng_device()` function forwards messages received from one +The `nng_device()` and `nng_device_aio()` functions forward messages received from one xref:nng_socket.5.adoc[socket] _s1_ to another socket _s2_, and vice versa. -This function is used to create forwarders, which can be used to create +These functions are used to create forwarders, which can be used to create complex network topologies to provide for improved ((horizontal scalability)), reliability, and isolation. @@ -38,6 +40,8 @@ such as xref:nng_req_open.3.adoc[`nng_req0_open_raw()`]. The `nng_device()` function does not return until one of the sockets is closed. +The `nng_device_aio()` function returns immediately, and operates completely in +the background. === Reflectors @@ -49,7 +53,7 @@ xref:nng_pair.7.adoc[_pair_] or xref:nng_bus.7.adoc[_bus_].) In this case the device acts as a ((reflector)) or loop-back device, where messages received from the valid socket are merely returned -back to the sender. +to the sender. === Forwarders @@ -75,6 +79,9 @@ adjustments to add or remove routing headers as needed. This allows replies to be returned to requesters, and responses to be routed back to surveyors. +The caller of these functions is required to close the sockets when the +device is stopped. + Additionally, some protocols have a maximum ((time-to-live)) to protect against forwarding loops and especially amplification loops. In these cases, the default limit (usually 8), ensures that messages will @@ -88,7 +95,7 @@ IMPORTANT: Not all protocols have support for guarding against forwarding loops, and even for those that do, forwarding loops can be extremely detrimental to network performance. -NOTE: Devices (forwarders and reflectors) act in best effort delivery mode only. +NOTE: Devices (forwarders and reflectors) act in best-effort delivery mode only. If a message is received from one socket that cannot be accepted by the other (due to backpressure or other issues), then the message is discarded. diff --git a/include/nng/nng.h b/include/nng/nng.h index d40f5135..bb3b6b14 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -973,8 +973,18 @@ NNG_DECL uint64_t nng_stat_timestamp(nng_stat *); // Device functionality. This connects two sockets together in a device, // which means that messages from one side are forwarded to the other. +// This version is synchronous, which means the caller will block until +// one of the sockets is closed. Note that caller is responsible for +// finally closing both sockets when this function returns. NNG_DECL int nng_device(nng_socket, nng_socket); +// Asynchronous form of nng_device. When this succeeds, the device is +// left intact and functioning in the background, until one of the sockets +// is closed or the application exits. The sockets may be shut down if +// the device fails, but the caller is responsible for ultimately closing +// the sockets properly after the device is torn down. +NNG_DECL void nng_device_aio(nng_aio *, nng_socket, nng_socket); + // Symbol name and visibility. TBD. The only symbols that really should // be directly exported to runtimes IMO are the option symbols. And frankly // they have enough special logic around them that it might be best not to diff --git a/src/core/device.c b/src/core/device.c index 71480bbc..c7266271 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -10,107 +10,126 @@ #include "core/nng_impl.h" -#include +typedef struct device_data_s device_data; +typedef struct device_path_s device_path; -typedef struct nni_device_path { - nni_aio * user; // user aio - nni_aio * aio; - nni_sock *src; - nni_sock *dst; - int state; -} nni_device_path; +struct device_path_s { + int state; + device_data *d; + nni_sock *src; + nni_sock *dst; + nni_aio aio; +}; #define NNI_DEVICE_STATE_INIT 0 #define NNI_DEVICE_STATE_RECV 1 #define NNI_DEVICE_STATE_SEND 2 #define NNI_DEVICE_STATE_FINI 3 -typedef struct nni_device_data { - nni_aio * user; - int npath; - nni_device_path paths[2]; - nni_mtx mtx; - bool running; -} nni_device_data; +struct device_data_s { + nni_aio *user; + int num_paths; + int running; + int rv; + device_path paths[2]; + nni_reap_node reap; +}; -typedef struct nni_device_pair nni_device_pair; +static void device_fini(void *); + +static nni_mtx device_mtx = NNI_MTX_INITIALIZER; +static nni_reap_list device_reap = { + .rl_offset = offsetof(device_data, reap), + .rl_func = device_fini, +}; static void -nni_device_cancel(nni_aio *aio, void *arg, int rv) +device_fini(void *arg) { - nni_device_data *dd = arg; - // cancellation is the only path to shutting it down. + device_data *d = arg; - nni_mtx_lock(&dd->mtx); - if ((!dd->running) || (dd->user != aio)) { - nni_mtx_unlock(&dd->mtx); - return; + for (int i = 0; i < d->num_paths; i++) { + nni_aio_stop(&d->paths[i].aio); } - dd->running = false; - dd->user = NULL; - nni_mtx_unlock(&dd->mtx); + NNI_FREE_STRUCT(d); +} - nni_sock_shutdown(dd->paths[0].src); - nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(aio, rv); +static void +device_cancel(nni_aio *aio, void *arg, int rv) +{ + device_data *d = arg; + // cancellation is the only path to shutting it down. + + nni_mtx_lock(&device_mtx); + if (d->user == aio) { + for (int i = 0; i < d->num_paths; i++) { + nni_aio_abort(&d->paths[i].aio, rv); + } + } + nni_mtx_unlock(&device_mtx); } static void -nni_device_cb(void *arg) +device_cb(void *arg) { - nni_device_path *p = arg; - nni_aio * aio = p->aio; - int rv; + device_path *p = arg; + device_data *d = p->d; + int rv; - if ((rv = nni_aio_result(aio)) != 0) { + if ((rv = nni_aio_result(&p->aio)) != 0) { + nni_mtx_lock(&device_mtx); p->state = NNI_DEVICE_STATE_FINI; - nni_aio_abort(p->user, rv); + d->running--; + if (d->rv == 0) { + d->rv = rv; + } + for (int i = 0; i < d->num_paths; i++) { + if (p != &d->paths[i]) { + nni_aio_abort(&d->paths[i].aio, rv); + } + } + if (d->running == 0) { + if (d->user != NULL) { + nni_aio_finish_error(d->user, d->rv); + d->user = NULL; + } + nni_sock_rele(d->paths[0].src); + nni_sock_rele(d->paths[0].dst); + + nni_reap(&device_reap, d); + } + nni_mtx_unlock(&device_mtx); return; } switch (p->state) { case NNI_DEVICE_STATE_INIT: + break; case NNI_DEVICE_STATE_SEND: p->state = NNI_DEVICE_STATE_RECV; - nni_sock_recv(p->src, aio); + nni_sock_recv(p->src, &p->aio); break; case NNI_DEVICE_STATE_RECV: // Leave the message where it is. p->state = NNI_DEVICE_STATE_SEND; - nni_sock_send(p->dst, aio); + nni_sock_send(p->dst, &p->aio); break; case NNI_DEVICE_STATE_FINI: break; } } -void -nni_device_fini(nni_device_data *dd) +static int +device_init(device_data **dp, nni_sock *s1, nni_sock *s2) { - int i; - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - nni_aio_stop(p->aio); - } - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - nni_aio_free(p->aio); - } - nni_mtx_fini(&dd->mtx); - NNI_FREE_STRUCT(dd); -} - -int -nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) -{ - nni_device_data *dd; - int npath = 2; - int i; - bool raw; - size_t rsz; + int num_paths = 2; + int i; + bool raw; + size_t rsz; + device_data *d; // Specifying either of these as null turns the device into - // a loopback reflector. + // a reflector. if (s1 == NULL) { s1 = s2; } @@ -141,8 +160,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) return (NNG_EINVAL); } - // Note we assume that since they peers, we only need to look - // at the receive flags -- the other side is assumed to be able + // Note we assume that since they are peers, we only need to look + // at the recv flags -- the other side is assumed to be able // to send. if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) { nni_sock *temp = s1; @@ -157,82 +176,65 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) // not strictly necessary, but it saves resources and minimizes any // extra reordering.) if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) { - npath = 1; + num_paths = 1; } - if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&dd->mtx); - for (i = 0; i < npath; i++) { - int rv; - nni_device_path *p = &dd->paths[i]; - p->src = i == 0 ? s1 : s2; - p->dst = i == 0 ? s2 : s1; - p->state = NNI_DEVICE_STATE_INIT; + d->num_paths = 0; + for (i = 0; i < num_paths; i++) { + device_path *p = &d->paths[i]; + p->src = i == 0 ? s1 : s2; + p->dst = i == 0 ? s2 : s1; + p->d = d; + p->state = NNI_DEVICE_STATE_INIT; - if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) { - nni_device_fini(dd); - return (rv); - } + nni_aio_init(&p->aio, device_cb, p); - nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE); + nni_aio_set_timeout(&p->aio, NNG_DURATION_INFINITE); } - dd->npath = npath; - *dp = dd; + nni_sock_hold(d->paths[0].src); + nni_sock_hold(d->paths[0].dst); + + d->num_paths = num_paths; + *dp = d; return (0); } -void -nni_device_start(nni_device_data *dd, nni_aio *user) +static void +device_start(device_data *d, nni_aio *user) { - int i; - int rv; - - if (nni_aio_begin(user) != 0) { - return; + d->user = user; + for (int i = 0; i < d->num_paths; i++) { + device_path *p = &d->paths[i]; + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, &p->aio); + d->running++; } - nni_mtx_lock(&dd->mtx); - if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) { - nni_mtx_unlock(&dd->mtx); - nni_aio_finish_error(user, rv); - return; - } - dd->user = user; - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - p->user = user; - p->state = NNI_DEVICE_STATE_INIT; - } - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - p->state = NNI_DEVICE_STATE_RECV; - nni_sock_recv(p->src, p->aio); - } - dd->running = true; - nni_mtx_unlock(&dd->mtx); } -int -nni_device(nni_sock *s1, nni_sock *s2) +void +nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2) { - nni_device_data *dd; - nni_aio * aio; - int rv; + device_data *d; + int rv; - if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { - return (rv); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&device_mtx); + if ((rv = device_init(&d, s1, s2)) != 0) { + nni_mtx_unlock(&device_mtx); + nni_aio_finish_error(aio, rv); + return; } - if ((rv = nni_device_init(&dd, s1, s2)) != 0) { - nni_aio_free(aio); - return (rv); + if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) { + nni_mtx_unlock(&device_mtx); + nni_aio_finish_error(aio, rv); + nni_reap(&device_reap, d); } - nni_device_start(dd, aio); - nni_aio_wait(aio); - - rv = nni_aio_result(aio); - nni_device_fini(dd); - nni_aio_free(aio); - return (rv); + device_start(d, aio); + nni_mtx_unlock(&device_mtx); } diff --git a/src/core/device.h b/src/core/device.h index 973a4cb7..8d11aeb8 100644 --- a/src/core/device.h +++ b/src/core/device.h @@ -1,5 +1,5 @@ // -// Copyright 2017 Garrett D'Amore +// Copyright 2021 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -14,6 +14,6 @@ // It works in both directions. Arguably we should build versions of this // that are unidirectional, and we could extend this API with user-defined // filtering functions. -extern int nni_device(nni_sock *, nni_sock *); +extern void nni_device(nni_aio *aio, nni_sock *, nni_sock *); #endif // CORE_DEVICE_H diff --git a/src/core/socket.c b/src/core/socket.c index 425499ae..75c9eedc 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -378,6 +378,14 @@ nni_sock_find(nni_sock **sockp, uint32_t id) return (rv); } +void +nni_sock_hold(nni_sock *s) +{ + nni_mtx_lock(&sock_lk); + s->s_ref++; + nni_mtx_unlock(&sock_lk); +} + void nni_sock_rele(nni_sock *s) { diff --git a/src/core/socket.h b/src/core/socket.h index beebfbce..c4037e96 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -12,6 +12,7 @@ #define CORE_SOCKET_H extern int nni_sock_find(nni_sock **, uint32_t); +extern void nni_sock_hold(nni_sock *); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, const nni_proto *); extern void nni_sock_close(nni_sock *); diff --git a/src/nng.c b/src/nng.c index bc46fea5..c48ccd0b 100644 --- a/src/nng.c +++ b/src/nng.c @@ -1158,8 +1158,8 @@ nng_pipe_notify(nng_socket s, nng_pipe_ev ev, nng_pipe_cb cb, void *arg) return (0); } -int -nng_device(nng_socket s1, nng_socket s2) +void +nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2) { int rv; nni_sock *sock1 = NULL; @@ -1167,23 +1167,44 @@ nng_device(nng_socket s1, nng_socket s2) if ((s1.id > 0) && (s1.id != (uint32_t) -1)) { if ((rv = nni_sock_find(&sock1, s1.id)) != 0) { - return (rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } + return; } } if (((s2.id > 0) && (s2.id != (uint32_t) -1)) && (s2.id != s1.id)) { if ((rv = nni_sock_find(&sock2, s2.id)) != 0) { nni_sock_rele(sock1); - return (rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } + return; } } - rv = nni_device(sock1, sock2); + nni_device(aio, sock1, sock2); if (sock1 != NULL) { nni_sock_rele(sock1); } if (sock2 != NULL) { nni_sock_rele(sock2); } +} + +int +nng_device(nng_socket s1, nng_socket s2) +{ + nni_aio aio; + int rv; + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_aio_init(&aio, NULL, NULL); + nng_device_aio(&aio, s1, s2); + nni_aio_wait(&aio); + rv = nni_aio_result(&aio); + nni_aio_fini(&aio); return (rv); } -- cgit v1.2.3-70-g09d2