aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-27 23:10:29 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-27 23:10:29 -0800
commit2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739 (patch)
tree2907ec3f298b2b38827060623d3f6e2577dda011 /src/core
parent7b02ddc2d7077439992a10bb69553f89b5ee5903 (diff)
downloadnng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.tar.gz
nng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.tar.bz2
nng-2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739.zip
Introduce nng_device_aio().
This function is like nng_device(), but runs asynchronously. Also, this fixes #1503 nng_device causes nng_close to blocking
Diffstat (limited to 'src/core')
-rw-r--r--src/core/device.c248
-rw-r--r--src/core/device.h4
-rw-r--r--src/core/socket.c8
-rw-r--r--src/core/socket.h1
4 files changed, 136 insertions, 125 deletions
diff --git a/src/core/device.c b/src/core/device.c
index 71480bbc..c7266271 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.com>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.com>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -10,107 +10,126 @@
#include "core/nng_impl.h"
-#include <string.h>
+typedef struct device_data_s device_data;
+typedef struct device_path_s device_path;
-typedef struct nni_device_path {
- nni_aio * user; // user aio
- nni_aio * aio;
- nni_sock *src;
- nni_sock *dst;
- int state;
-} nni_device_path;
+struct device_path_s {
+ int state;
+ device_data *d;
+ nni_sock *src;
+ nni_sock *dst;
+ nni_aio aio;
+};
#define NNI_DEVICE_STATE_INIT 0
#define NNI_DEVICE_STATE_RECV 1
#define NNI_DEVICE_STATE_SEND 2
#define NNI_DEVICE_STATE_FINI 3
-typedef struct nni_device_data {
- nni_aio * user;
- int npath;
- nni_device_path paths[2];
- nni_mtx mtx;
- bool running;
-} nni_device_data;
+struct device_data_s {
+ nni_aio *user;
+ int num_paths;
+ int running;
+ int rv;
+ device_path paths[2];
+ nni_reap_node reap;
+};
-typedef struct nni_device_pair nni_device_pair;
+static void device_fini(void *);
+
+static nni_mtx device_mtx = NNI_MTX_INITIALIZER;
+static nni_reap_list device_reap = {
+ .rl_offset = offsetof(device_data, reap),
+ .rl_func = device_fini,
+};
static void
-nni_device_cancel(nni_aio *aio, void *arg, int rv)
+device_fini(void *arg)
{
- nni_device_data *dd = arg;
- // cancellation is the only path to shutting it down.
+ device_data *d = arg;
- nni_mtx_lock(&dd->mtx);
- if ((!dd->running) || (dd->user != aio)) {
- nni_mtx_unlock(&dd->mtx);
- return;
+ for (int i = 0; i < d->num_paths; i++) {
+ nni_aio_stop(&d->paths[i].aio);
}
- dd->running = false;
- dd->user = NULL;
- nni_mtx_unlock(&dd->mtx);
+ NNI_FREE_STRUCT(d);
+}
- nni_sock_shutdown(dd->paths[0].src);
- nni_sock_shutdown(dd->paths[0].dst);
- nni_aio_finish_error(aio, rv);
+static void
+device_cancel(nni_aio *aio, void *arg, int rv)
+{
+ device_data *d = arg;
+ // cancellation is the only path to shutting it down.
+
+ nni_mtx_lock(&device_mtx);
+ if (d->user == aio) {
+ for (int i = 0; i < d->num_paths; i++) {
+ nni_aio_abort(&d->paths[i].aio, rv);
+ }
+ }
+ nni_mtx_unlock(&device_mtx);
}
static void
-nni_device_cb(void *arg)
+device_cb(void *arg)
{
- nni_device_path *p = arg;
- nni_aio * aio = p->aio;
- int rv;
+ device_path *p = arg;
+ device_data *d = p->d;
+ int rv;
- if ((rv = nni_aio_result(aio)) != 0) {
+ if ((rv = nni_aio_result(&p->aio)) != 0) {
+ nni_mtx_lock(&device_mtx);
p->state = NNI_DEVICE_STATE_FINI;
- nni_aio_abort(p->user, rv);
+ d->running--;
+ if (d->rv == 0) {
+ d->rv = rv;
+ }
+ for (int i = 0; i < d->num_paths; i++) {
+ if (p != &d->paths[i]) {
+ nni_aio_abort(&d->paths[i].aio, rv);
+ }
+ }
+ if (d->running == 0) {
+ if (d->user != NULL) {
+ nni_aio_finish_error(d->user, d->rv);
+ d->user = NULL;
+ }
+ nni_sock_rele(d->paths[0].src);
+ nni_sock_rele(d->paths[0].dst);
+
+ nni_reap(&device_reap, d);
+ }
+ nni_mtx_unlock(&device_mtx);
return;
}
switch (p->state) {
case NNI_DEVICE_STATE_INIT:
+ break;
case NNI_DEVICE_STATE_SEND:
p->state = NNI_DEVICE_STATE_RECV;
- nni_sock_recv(p->src, aio);
+ nni_sock_recv(p->src, &p->aio);
break;
case NNI_DEVICE_STATE_RECV:
// Leave the message where it is.
p->state = NNI_DEVICE_STATE_SEND;
- nni_sock_send(p->dst, aio);
+ nni_sock_send(p->dst, &p->aio);
break;
case NNI_DEVICE_STATE_FINI:
break;
}
}
-void
-nni_device_fini(nni_device_data *dd)
+static int
+device_init(device_data **dp, nni_sock *s1, nni_sock *s2)
{
- int i;
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- nni_aio_stop(p->aio);
- }
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- nni_aio_free(p->aio);
- }
- nni_mtx_fini(&dd->mtx);
- NNI_FREE_STRUCT(dd);
-}
-
-int
-nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
-{
- nni_device_data *dd;
- int npath = 2;
- int i;
- bool raw;
- size_t rsz;
+ int num_paths = 2;
+ int i;
+ bool raw;
+ size_t rsz;
+ device_data *d;
// Specifying either of these as null turns the device into
- // a loopback reflector.
+ // a reflector.
if (s1 == NULL) {
s1 = s2;
}
@@ -141,8 +160,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
return (NNG_EINVAL);
}
- // Note we assume that since they peers, we only need to look
- // at the receive flags -- the other side is assumed to be able
+ // Note we assume that since they are peers, we only need to look
+ // at the recv flags -- the other side is assumed to be able
// to send.
if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) {
nni_sock *temp = s1;
@@ -157,82 +176,65 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
// not strictly necessary, but it saves resources and minimizes any
// extra reordering.)
if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) {
- npath = 1;
+ num_paths = 1;
}
- if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) {
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&dd->mtx);
- for (i = 0; i < npath; i++) {
- int rv;
- nni_device_path *p = &dd->paths[i];
- p->src = i == 0 ? s1 : s2;
- p->dst = i == 0 ? s2 : s1;
- p->state = NNI_DEVICE_STATE_INIT;
+ d->num_paths = 0;
+ for (i = 0; i < num_paths; i++) {
+ device_path *p = &d->paths[i];
+ p->src = i == 0 ? s1 : s2;
+ p->dst = i == 0 ? s2 : s1;
+ p->d = d;
+ p->state = NNI_DEVICE_STATE_INIT;
- if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) {
- nni_device_fini(dd);
- return (rv);
- }
+ nni_aio_init(&p->aio, device_cb, p);
- nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE);
+ nni_aio_set_timeout(&p->aio, NNG_DURATION_INFINITE);
}
- dd->npath = npath;
- *dp = dd;
+ nni_sock_hold(d->paths[0].src);
+ nni_sock_hold(d->paths[0].dst);
+
+ d->num_paths = num_paths;
+ *dp = d;
return (0);
}
-void
-nni_device_start(nni_device_data *dd, nni_aio *user)
+static void
+device_start(device_data *d, nni_aio *user)
{
- int i;
- int rv;
-
- if (nni_aio_begin(user) != 0) {
- return;
+ d->user = user;
+ for (int i = 0; i < d->num_paths; i++) {
+ device_path *p = &d->paths[i];
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, &p->aio);
+ d->running++;
}
- nni_mtx_lock(&dd->mtx);
- if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) {
- nni_mtx_unlock(&dd->mtx);
- nni_aio_finish_error(user, rv);
- return;
- }
- dd->user = user;
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- p->user = user;
- p->state = NNI_DEVICE_STATE_INIT;
- }
- for (i = 0; i < dd->npath; i++) {
- nni_device_path *p = &dd->paths[i];
- p->state = NNI_DEVICE_STATE_RECV;
- nni_sock_recv(p->src, p->aio);
- }
- dd->running = true;
- nni_mtx_unlock(&dd->mtx);
}
-int
-nni_device(nni_sock *s1, nni_sock *s2)
+void
+nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
{
- nni_device_data *dd;
- nni_aio * aio;
- int rv;
+ device_data *d;
+ int rv;
- if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
- return (rv);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&device_mtx);
+ if ((rv = device_init(&d, s1, s2)) != 0) {
+ nni_mtx_unlock(&device_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- if ((rv = nni_device_init(&dd, s1, s2)) != 0) {
- nni_aio_free(aio);
- return (rv);
+ if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
+ nni_mtx_unlock(&device_mtx);
+ nni_aio_finish_error(aio, rv);
+ nni_reap(&device_reap, d);
}
- nni_device_start(dd, aio);
- nni_aio_wait(aio);
-
- rv = nni_aio_result(aio);
- nni_device_fini(dd);
- nni_aio_free(aio);
- return (rv);
+ device_start(d, aio);
+ nni_mtx_unlock(&device_mtx);
}
diff --git a/src/core/device.h b/src/core/device.h
index 973a4cb7..8d11aeb8 100644
--- a/src/core/device.h
+++ b/src/core/device.h
@@ -1,5 +1,5 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -14,6 +14,6 @@
// It works in both directions. Arguably we should build versions of this
// that are unidirectional, and we could extend this API with user-defined
// filtering functions.
-extern int nni_device(nni_sock *, nni_sock *);
+extern void nni_device(nni_aio *aio, nni_sock *, nni_sock *);
#endif // CORE_DEVICE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 425499ae..75c9eedc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -379,6 +379,14 @@ nni_sock_find(nni_sock **sockp, uint32_t id)
}
void
+nni_sock_hold(nni_sock *s)
+{
+ nni_mtx_lock(&sock_lk);
+ s->s_ref++;
+ nni_mtx_unlock(&sock_lk);
+}
+
+void
nni_sock_rele(nni_sock *s)
{
nni_mtx_lock(&sock_lk);
diff --git a/src/core/socket.h b/src/core/socket.h
index beebfbce..c4037e96 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -12,6 +12,7 @@
#define CORE_SOCKET_H
extern int nni_sock_find(nni_sock **, uint32_t);
+extern void nni_sock_hold(nni_sock *);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, const nni_proto *);
extern void nni_sock_close(nni_sock *);