aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-26 11:14:25 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-26 13:00:21 -0700
commit46ca4756a09d015298b310cd482f2e39d9a034db (patch)
tree2a18c46a1511505edb8386ac75878e984021a8cd
parent9cbdeda1d0a9074bd65da2aaf9c87b79545a1590 (diff)
downloadnng-46ca4756a09d015298b310cd482f2e39d9a034db.tar.gz
nng-46ca4756a09d015298b310cd482f2e39d9a034db.tar.bz2
nng-46ca4756a09d015298b310cd482f2e39d9a034db.zip
fixes #46 make device() use aios directly
This eliminates the separate threads used for devices, letting them benefit from the new aio framework. It also eliminates the legacy nni_sock_sendmsg and nni_sock_recvmsg internal APIs. It would appear that there is an opportunity here to provide asynchronous device support out to userland as well, exposing an aio to them. That work is deferred to later.
-rw-r--r--src/core/device.c258
-rw-r--r--src/core/socket.c69
-rw-r--r--src/nng.c6
3 files changed, 169 insertions, 164 deletions
diff --git a/src/core/device.c b/src/core/device.c
index 22ec086e..2e000d7e 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -12,136 +12,204 @@
#include <string.h>
-struct nni_device_pair {
- nni_thr thrs[2];
- nni_sock *socks[2];
- int err[2];
-};
+typedef struct nni_device_path {
+ nni_aio * user; // user aio
+ nni_aio * aio;
+ nni_sock *src;
+ nni_sock *dst;
+ int state;
+} nni_device_path;
+
+#define NNI_DEVICE_STATE_INIT 0
+#define NNI_DEVICE_STATE_RECV 1
+#define NNI_DEVICE_STATE_SEND 2
+#define NNI_DEVICE_STATE_FINI 3
+
+typedef struct nni_device_data {
+ nni_aio * user;
+ int npath;
+ nni_device_path paths[2];
+ nni_mtx mtx;
+ int running;
+} nni_device_data;
typedef struct nni_device_pair nni_device_pair;
-static int
-nni_device_loop(nni_sock *from, nni_sock *to)
+static void
+nni_device_cancel(nni_aio *aio, int rv)
{
- nni_msg *msg;
- int rv = 0;
-
- for (;;) {
- // Take messages sock[0], and send to sock[1].
- // If an error occurs, we close both sockets.
- if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) {
- break;
- }
- if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) {
- nni_msg_free(msg);
- break;
- }
+ nni_device_data *dd = aio->a_prov_data;
+ // cancellation is the only path to shutting it down.
+
+ nni_mtx_lock(&dd->mtx);
+ if (dd->running == 0) {
+ nni_mtx_unlock(&dd->mtx);
+ return;
}
+ dd->running = 0;
+ nni_mtx_unlock(&dd->mtx);
- return (rv);
+ nni_sock_shutdown(dd->paths[0].src);
+ nni_sock_shutdown(dd->paths[0].dst);
+ nni_aio_finish_error(dd->user, rv);
}
static void
-nni_device_fwd(void *p)
+nni_device_cb(void *arg)
{
- nni_device_pair *pair = p;
+ nni_device_path *p = arg;
+ nni_aio * aio = p->aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ p->state = NNI_DEVICE_STATE_FINI;
+ nni_aio_cancel(p->user, rv);
+ return;
+ }
- pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]);
- nni_sock_shutdown(pair->socks[0]);
- nni_sock_shutdown(pair->socks[1]);
+ switch (p->state) {
+ case NNI_DEVICE_STATE_INIT:
+ case NNI_DEVICE_STATE_SEND:
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, aio);
+ break;
+ case NNI_DEVICE_STATE_RECV:
+ // Leave the message where it is.
+ p->state = NNI_DEVICE_STATE_SEND;
+ nni_sock_send(p->dst, aio);
+ break;
+ case NNI_DEVICE_STATE_FINI:
+ break;
+ }
}
-static void
-nni_device_rev(void *p)
+void
+nni_device_fini(nni_device_data *dd)
{
- nni_device_pair *pair = p;
-
- pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]);
- nni_sock_shutdown(pair->socks[0]);
- nni_sock_shutdown(pair->socks[1]);
+ int i;
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ nni_aio_stop(p->aio);
+ }
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ nni_aio_fini(p->aio);
+ }
+ nni_mtx_fini(&dd->mtx);
+ NNI_FREE_STRUCT(dd);
}
int
-nni_device(nni_sock *sock1, nni_sock *sock2)
+nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
{
- nni_device_pair pair;
- int rv;
- nni_duration never = -1;
- size_t sz;
-
- memset(&pair, 0, sizeof(pair));
- pair.socks[0] = sock1;
- pair.socks[1] = sock2;
-
- if (sock1 == NULL) {
- sock1 = sock2;
+ nni_device_data *dd;
+ int npath = 2;
+ int i;
+
+ // Specifying either of these as null turns the device into
+ // a loopback reflector.
+ if (s1 == NULL) {
+ s1 = s2;
}
- if (sock2 == NULL) {
- sock2 = sock1;
+ if (s2 == NULL) {
+ s2 = s1;
}
- if ((sock1 == NULL) || (sock2 == NULL)) {
- rv = NNG_EINVAL;
- goto out;
+ // At least one of the sockets must be valid.
+ if ((s1 == NULL) || (s2 == NULL)) {
+ return (NNG_EINVAL);
}
- if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) ||
- (nni_sock_peer(sock2) != nni_sock_proto(sock1))) {
- rv = NNG_EINVAL;
- goto out;
+ if ((nni_sock_peer(s1) != nni_sock_proto(s2)) ||
+ (nni_sock_peer(s2) != nni_sock_proto(s1))) {
+ return (NNG_EINVAL);
}
- // No timeouts.
- sz = sizeof(never);
- if ((nni_sock_setopt(sock1, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock1, NNG_OPT_SENDTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, NNG_OPT_SENDTIMEO, &never, sz) != 0)) {
- // This should never happen.
- rv = NNG_EINVAL;
- goto out;
+ // Note we assume that since they peers, we only need to look
+ // at the receive flags -- the other side is assumed to be able
+ // to send.
+ if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) {
+ nni_sock *temp = s1;
+ s1 = s2;
+ s2 = temp;
}
- pair.socks[0] = sock1;
- pair.socks[1] = sock2;
+ NNI_ASSERT((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) != 0);
- if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) {
- goto out;
+ // Only run one forwarder if the protocols are not bidirectional, or
+ // if the source and destination sockets are identical. (The latter is
+ // not strictly necessary, but it saves resources and minimizes any
+ // extra reordering.)
+ if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) {
+ npath = 1;
}
- if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) {
- nni_thr_fini(&pair.thrs[0]);
- goto out;
- }
- if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) &&
- ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) {
- nni_thr_run(&pair.thrs[0]);
+
+ if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) {
+ return (NNG_ENOMEM);
}
- // If the sockets are the same, then its a simple one way forwarder,
- // and we don't need two workers (but would be harmless if we did it).
- if ((sock1 != sock2) &&
- ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) &&
- ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) {
- nni_thr_run(&pair.thrs[1]);
+ nni_mtx_init(&dd->mtx);
+
+ for (i = 0; i < npath; i++) {
+ int rv;
+ nni_device_path *p = &dd->paths[i];
+ p->src = i == 0 ? s1 : s2;
+ p->dst = i == 0 ? s2 : s1;
+ p->state = NNI_DEVICE_STATE_INIT;
+
+ if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) {
+ nni_device_fini(dd);
+ return (rv);
+ }
+
+ nni_aio_set_timeout(p->aio, NNI_TIME_NEVER);
}
+ dd->npath = npath;
+ *dp = dd;
+ return (0);
+}
- // This blocks on both threads (though if we didn't start one, that
- // will return immediately.)
- nni_thr_fini(&pair.thrs[0]);
- nni_thr_fini(&pair.thrs[1]);
+void
+nni_device_start(nni_device_data *dd, nni_aio *user)
+{
+ int i;
- nni_sock_rele(sock1);
- if (sock1 != sock2) {
- nni_sock_rele(sock2);
+ nni_mtx_lock(&dd->mtx);
+ dd->user = user;
+ if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
+ nni_mtx_unlock(&dd->mtx);
+ return;
}
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ p->user = user;
+ p->state = NNI_DEVICE_STATE_INIT;
+ }
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, p->aio);
+ }
+ dd->running = 1;
+ nni_mtx_unlock(&dd->mtx);
+}
+
+int
+nni_device(nni_sock *s1, nni_sock *s2)
+{
+ nni_device_data *dd;
+ nni_aio * aio;
+ int rv;
- rv = pair.err[0];
- if (rv == 0) {
- rv = pair.err[1];
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
}
- if (rv == 0) {
- // This can happen if neither thread ran. Shouldn't happen
- // really.
- rv = NNG_EINVAL;
+ if (nni_device_init(&dd, s1, s2) != 0) {
+ nni_aio_fini(aio);
+ return (rv);
}
+ nni_device_start(dd, aio);
+ nni_aio_wait(aio);
-out:
+ rv = nni_aio_result(aio);
+ nni_device_fini(dd);
+ nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 8fb8e67b..c9b70ccb 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -798,38 +798,6 @@ nni_sock_send(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_send(sock->s_data, aio);
}
-int
-nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
-{
- int rv;
- nni_time expire;
- nni_time timeo = sock->s_sndtimeo;
- nni_aio *aio;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
- if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (timeo == NNI_TIME_NEVER) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += timeo;
- }
- nni_aio_set_timeout(aio, expire);
- nni_aio_set_msg(aio, msg);
-
- nni_sock_send(sock, aio);
- nni_aio_wait(aio);
-
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
-
- return (rv);
-}
-
void
nni_sock_recv(nni_sock *sock, nni_aio *aio)
{
@@ -837,43 +805,6 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
-int
-nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
-{
- int rv;
- nni_msg *msg;
- nni_time expire;
- nni_time timeo = sock->s_rcvtimeo;
- nni_aio *aio;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
- if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (timeo == NNI_TIME_NEVER) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += timeo;
- }
-
- nni_aio_set_timeout(aio, expire);
- nni_sock_recv(sock, aio);
- nni_aio_wait(aio);
-
- if ((rv = nni_aio_result(aio)) == 0) {
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
-
- *msgp = msg;
- }
-
- nni_aio_fini(aio);
- return (rv);
-}
-
// nni_sock_protocol returns the socket's 16-bit protocol number.
uint16_t
nni_sock_proto(nni_sock *sock)
diff --git a/src/nng.c b/src/nng.c
index 39206941..bd403896 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -679,6 +679,12 @@ nng_device(nng_socket s1, nng_socket s2)
}
rv = nni_device(sock1, sock2);
+ if (sock1 != NULL) {
+ nni_sock_rele(sock1);
+ }
+ if (sock2 != NULL) {
+ nni_sock_rele(sock2);
+ }
return (rv);
}