aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-27 23:10:29 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-27 23:10:29 -0800
commit2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739 (patch)
tree2907ec3f298b2b38827060623d3f6e2577dda011
parent7b02ddc2d7077439992a10bb69553f89b5ee5903 (diff)
downloadnng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.tar.gz
nng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.tar.bz2
nng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.zip
Introduce nng_device_aio().
This function is like nng_device(), but runs asynchronously. Also, this fixes #1503 nng_device causes nng_close to blocking
-rw-r--r--docs/man/nng_device.3.adoc17
-rw-r--r--include/nng/nng.h10
-rw-r--r--src/core/device.c248
-rw-r--r--src/core/device.h4
-rw-r--r--src/core/socket.c8
-rw-r--r--src/core/socket.h1
-rw-r--r--src/nng.c31
7 files changed, 184 insertions, 135 deletions
diff --git a/docs/man/nng_device.3.adoc b/docs/man/nng_device.3.adoc
index b36d2e80..7151239f 100644
--- a/docs/man/nng_device.3.adoc
+++ b/docs/man/nng_device.3.adoc
@@ -1,6 +1,6 @@
= nng_device(3)
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This document is supplied under the terms of the MIT License, a
@@ -20,14 +20,16 @@ nng_device - message forwarding device
#include <nng/nng.h>
int nng_device(nng_socket s1, nng_socket s2);
+
+void nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2);
----
== DESCRIPTION
-The `nng_device()` function forwards messages received from one
+The `nng_device()` and `nng_device_aio()` functions forward messages received from one
xref:nng_socket.5.adoc[socket] _s1_ to another socket _s2_, and vice versa.
-This function is used to create forwarders, which can be used to create
+These functions are used to create forwarders, which can be used to create
complex network topologies to provide for improved ((horizontal scalability)),
reliability, and isolation.
@@ -38,6 +40,8 @@ such as xref:nng_req_open.3.adoc[`nng_req0_open_raw()`].
The `nng_device()` function does not return until one of the sockets
is closed.
+The `nng_device_aio()` function returns immediately, and operates completely in
+the background.
=== Reflectors
@@ -49,7 +53,7 @@ xref:nng_pair.7.adoc[_pair_] or
xref:nng_bus.7.adoc[_bus_].)
In this case the device acts as a ((reflector)) or loop-back device,
where messages received from the valid socket are merely returned
-back to the sender.
+to the sender.
=== Forwarders
@@ -75,6 +79,9 @@ adjustments to add or remove routing headers as needed.
This allows replies to be
returned to requesters, and responses to be routed back to surveyors.
+The caller of these functions is required to close the sockets when the
+device is stopped.
+
Additionally, some protocols have a maximum ((time-to-live)) to protect
against forwarding loops and especially amplification loops.
In these cases, the default limit (usually 8), ensures that messages will
@@ -88,7 +95,7 @@ IMPORTANT: Not all protocols have support for guarding against forwarding loops,
and even for those that do, forwarding loops can be extremely detrimental
to network performance.
-NOTE: Devices (forwarders and reflectors) act in best effort delivery mode only.
+NOTE: Devices (forwarders and reflectors) act in best-effort delivery mode only.
If a message is received from one socket that cannot be accepted by the
other (due to backpressure or other issues), then the message is discarded.
diff --git a/include/nng/nng.h b/include/nng/nng.h
index d40f5135..bb3b6b14 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -973,8 +973,18 @@ NNG_DECL uint64_t nng_stat_timestamp(nng_stat *);
// Device functionality. This connects two sockets together in a device,
// which means that messages from one side are forwarded to the other.
+// This version is synchronous, which means the caller will block until
+// one of the sockets is closed. Note that caller is responsible for
+// finally closing both sockets when this function returns.
NNG_DECL int nng_device(nng_socket, nng_socket);
+// Asynchronous form of nng_device. When this succeeds, the device is
+// left intact and functioning in the background, until one of the sockets
+// is closed or the application exits. The sockets may be shut down if
+// the device fails, but the caller is responsible for ultimately closing
+// the sockets properly after the device is torn down.
+NNG_DECL void nng_device_aio(nng_aio *, nng_socket, nng_socket);
+
// Symbol name and visibility. TBD. The only symbols that really should
// be directly exported to runtimes IMO are the option symbols. And frankly
// they have enough special logic around them that it might be best not to
diff --git a/src/core/device.c b/src/core/device.c
index 71480bbc..c7266271 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.com>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.com>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -10,107 +10,126 @@
#include "core/nng_impl.h"
-#include <string.h>
+typedef struct device_data_s device_data;
+typedef struct device_path_s device_path;
-typedef struct nni_device_path {
- nni_aio * user; // user aio
- nni_aio * aio;
- nni_sock *src;
- nni_sock *dst;
- int state;
-} nni_device_path;
+struct device_path_s {
+ int state;
+ device_data *d;
+ nni_sock *src;
+ nni_sock *dst;
+ nni_aio aio;
+};
#define NNI_DEVICE_STATE_INIT 0
#define NNI_DEVICE_STATE_RECV 1
#define NNI_DEVICE_STATE_SEND 2
#define NNI_DEVICE_STATE_FINI 3
-typedef struct nni_device_data {
- nni_aio * user;
- int npath;
- nni_device_path paths[2];
- nni_mtx mtx;
- bool running;
-} nni_device_data;
+struct device_data_s {
+ nni_aio *user;
+ int num_paths;
+ int running;
+ int rv;
+ device_path paths[2];
+ nni_reap_node reap;
+};
-typedef struct nni_device_pair nni_device_pair;
+static void device_fini(void *);
+
+static nni_mtx device_mtx = NNI_MTX_INITIALIZER;
+static nni_reap_list device_reap = {
+ .rl_offset = offsetof(device_data, reap),
+ .rl_func = device_fini,
+};
static void
-nni_device_cancel(nni_aio *aio, void *arg, int rv)
+device_fini(void *arg)
{
- nni_device_data *dd = arg;
- // cancellation is the only path to shutting it down.
+ device_data *d = arg;
- nni_mtx_lock(&dd->mtx);
- if ((!dd->running) || (dd->user != aio)) {
- nni_mtx_unlock(&dd->mtx);
- return;
+ for (int i = 0; i < d->num_paths; i++) {
+ nni_aio_stop(&d->paths[i].aio);
}
- dd->running = false;
- dd->user = NULL;
- nni_mtx_unlock(&dd->mtx);
+ NNI_FREE_STRUCT(d);
+}
- nni_sock_shutdown(dd->paths[0].src);
- nni_sock_shutdown(dd->paths[0].dst);
- nni_aio_finish_error(aio, rv);
+static void
+device_cancel(nni_aio *aio, void *arg, int rv)
+{
+ device_data *d = arg;
+ // cancellation is the only path to shutting it down.
+
+ nni_mtx_lock(&device_mtx);
+ if (d->user == aio) {
+ for (int i = 0; i < d->num_paths; i++) {
+ nni_aio_abort(&d->paths[i].aio, rv);
+ }
+ }
+ nni_mtx_unlock(&device_mtx);
}
static void
-nni_device_cb(void *arg)
+device_cb(void *arg)
{
- nni_device_path *p = arg;
- nni_aio * aio = p->aio;
- int rv;
+ device_path *p = arg;
+ device_data *d = p->d;
+ int rv;
- if ((rv = nni_aio_result(aio)) != 0) {
+ if ((rv = nni_aio_result(&p->aio)) != 0) {
+ nni_mtx_lock(&device_mtx);
p->state = NNI_DEVICE_STATE_FINI;
- nni_aio_abort(p->user, rv);
+ d->running--;
+ if (d->rv == 0) {
+ d->rv = rv;
+ }
+ for (int i = 0; i < d->num_paths; i++) {
+ if (p != &d->paths[i]) {
+ nni_aio_abort(&d->paths[i].aio, rv);
+ }
+ }
+ if (d->running == 0) {
+ if (d->user != NULL) {
+ nni_aio_finish_error(d->user, d->rv);
+ d->user = NULL;
+ }
+ nni_sock_rele(d->paths[0].src);
+ nni_sock_rele(d->paths[0].dst);
+
+ nni_reap(&device_reap, d);
+ }
+ nni_mtx_unlock(&device_mtx);
return;
}
switch (p->state) {
case NNI_DEVICE_STATE_INIT:
+ break;
case NNI_DEVICE_STATE_SEND:
p->state = NNI_DEVICE_STATE_RECV;
- nni_sock_recv(p->src, aio);
+ nni_sock_recv(p->src, &p->aio);
break;
case NNI_DEVICE_STATE_RECV:
// Leave the message where it is.
p->state = NNI_DEVICE_STATE_SEND;
- nni_sock_send(p->dst, aio);
+ nni_sock_send(p->dst, &p->aio);
break;
case NNI_DEVICE_STATE_FINI:
break;
}
}
-void
-nni_device_fini(nni_device_data *dd)
+static int
+device_init(device_data **dp, nni_sock *s1, nni_sock *s2)
{
- int i;
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- nni_aio_stop(p->aio);
- }
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- nni_aio_free(p->aio);
- }
- nni_mtx_fini(&dd->mtx);
- NNI_FREE_STRUCT(dd);
-}
-
-int
-nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
-{
- nni_device_data *dd;
- int npath = 2;
- int i;
- bool raw;
- size_t rsz;
+ int num_paths = 2;
+ int i;
+ bool raw;
+ size_t rsz;
+ device_data *d;
// Specifying either of these as null turns the device into
- // a loopback reflector.
+ // a reflector.
if (s1 == NULL) {
s1 = s2;
}
@@ -141,8 +160,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
return (NNG_EINVAL);
}
- // Note we assume that since they peers, we only need to look
- // at the receive flags -- the other side is assumed to be able
+ // Note we assume that since they are peers, we only need to look
+ // at the recv flags -- the other side is assumed to be able
// to send.
if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) {
nni_sock *temp = s1;
@@ -157,82 +176,65 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
// not strictly necessary, but it saves resources and minimizes any
// extra reordering.)
if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) {
- npath = 1;
+ num_paths = 1;
}
- if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) {
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&dd->mtx);
- for (i = 0; i < npath; i++) {
- int rv;
- nni_device_path *p = &dd->paths[i];
- p->src = i == 0 ? s1 : s2;
- p->dst = i == 0 ? s2 : s1;
- p->state = NNI_DEVICE_STATE_INIT;
+ d->num_paths = 0;
+ for (i = 0; i < num_paths; i++) {
+ device_path *p = &d->paths[i];
+ p->src = i == 0 ? s1 : s2;
+ p->dst = i == 0 ? s2 : s1;
+ p->d = d;
+ p->state = NNI_DEVICE_STATE_INIT;
- if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) {
- nni_device_fini(dd);
- return (rv);
- }
+ nni_aio_init(&p->aio, device_cb, p);
- nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE);
+ nni_aio_set_timeout(&p->aio, NNG_DURATION_INFINITE);
}
- dd->npath = npath;
- *dp = dd;
+ nni_sock_hold(d->paths[0].src);
+ nni_sock_hold(d->paths[0].dst);
+
+ d->num_paths = num_paths;
+ *dp = d;
return (0);
}
-void
-nni_device_start(nni_device_data *dd, nni_aio *user)
+static void
+device_start(device_data *d, nni_aio *user)
{
- int i;
- int rv;
-
- if (nni_aio_begin(user) != 0) {
- return;
+ d->user = user;
+ for (int i = 0; i < d->num_paths; i++) {
+ device_path *p = &d->paths[i];
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, &p->aio);
+ d->running++;
}
- nni_mtx_lock(&dd->mtx);
- if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) {
- nni_mtx_unlock(&dd->mtx);
- nni_aio_finish_error(user, rv);
- return;
- }
- dd->user = user;
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- p->user = user;
- p->state = NNI_DEVICE_STATE_INIT;
- }
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- p->state = NNI_DEVICE_STATE_RECV;
- nni_sock_recv(p->src, p->aio);
- }
- dd->running = true;
- nni_mtx_unlock(&dd->mtx);
}
-int
-nni_device(nni_sock *s1, nni_sock *s2)
+void
+nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
{
- nni_device_data *dd;
- nni_aio * aio;
- int rv;
+ device_data *d;
+ int rv;
- if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
- return (rv);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&device_mtx);
+ if ((rv = device_init(&d, s1, s2)) != 0) {
+ nni_mtx_unlock(&device_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- if ((rv = nni_device_init(&dd, s1, s2)) != 0) {
- nni_aio_free(aio);
- return (rv);
+ if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
+ nni_mtx_unlock(&device_mtx);
+ nni_aio_finish_error(aio, rv);
+ nni_reap(&device_reap, d);
}
- nni_device_start(dd, aio);
- nni_aio_wait(aio);
-
- rv = nni_aio_result(aio);
- nni_device_fini(dd);
- nni_aio_free(aio);
- return (rv);
+ device_start(d, aio);
+ nni_mtx_unlock(&device_mtx);
}
diff --git a/src/core/device.h b/src/core/device.h
index 973a4cb7..8d11aeb8 100644
--- a/src/core/device.h
+++ b/src/core/device.h
@@ -1,5 +1,5 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -14,6 +14,6 @@
// It works in both directions. Arguably we should build versions of this
// that are unidirectional, and we could extend this API with user-defined
// filtering functions.
-extern int nni_device(nni_sock *, nni_sock *);
+extern void nni_device(nni_aio *aio, nni_sock *, nni_sock *);
#endif // CORE_DEVICE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 425499ae..75c9eedc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -379,6 +379,14 @@ nni_sock_find(nni_sock **sockp, uint32_t id)
}
void
+nni_sock_hold(nni_sock *s)
+{
+ nni_mtx_lock(&sock_lk);
+ s->s_ref++;
+ nni_mtx_unlock(&sock_lk);
+}
+
+void
nni_sock_rele(nni_sock *s)
{
nni_mtx_lock(&sock_lk);
diff --git a/src/core/socket.h b/src/core/socket.h
index beebfbce..c4037e96 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -12,6 +12,7 @@
#define CORE_SOCKET_H
extern int nni_sock_find(nni_sock **, uint32_t);
+extern void nni_sock_hold(nni_sock *);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, const nni_proto *);
extern void nni_sock_close(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index bc46fea5..c48ccd0b 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1158,8 +1158,8 @@ nng_pipe_notify(nng_socket s, nng_pipe_ev ev, nng_pipe_cb cb, void *arg)
return (0);
}
-int
-nng_device(nng_socket s1, nng_socket s2)
+void
+nng_device_aio(nng_aio *aio, nng_socket s1, nng_socket s2)
{
int rv;
nni_sock *sock1 = NULL;
@@ -1167,23 +1167,44 @@ nng_device(nng_socket s1, nng_socket s2)
if ((s1.id > 0) && (s1.id != (uint32_t) -1)) {
if ((rv = nni_sock_find(&sock1, s1.id)) != 0) {
- return (rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
+ return;
}
}
if (((s2.id > 0) && (s2.id != (uint32_t) -1)) && (s2.id != s1.id)) {
if ((rv = nni_sock_find(&sock2, s2.id)) != 0) {
nni_sock_rele(sock1);
- return (rv);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
+ return;
}
}
- rv = nni_device(sock1, sock2);
+ nni_device(aio, sock1, sock2);
if (sock1 != NULL) {
nni_sock_rele(sock1);
}
if (sock2 != NULL) {
nni_sock_rele(sock2);
}
+}
+
+int
+nng_device(nng_socket s1, nng_socket s2)
+{
+ nni_aio aio;
+ int rv;
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_aio_init(&aio, NULL, NULL);
+ nng_device_aio(&aio, s1, s2);
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
return (rv);
}