aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/device.c258
-rw-r--r--src/core/socket.c69
-rw-r--r--src/nng.c6
3 files changed, 169 insertions, 164 deletions
diff --git a/src/core/device.c b/src/core/device.c
index 22ec086e..2e000d7e 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -12,136 +12,204 @@
#include <string.h>
-struct nni_device_pair {
- nni_thr thrs[2];
- nni_sock *socks[2];
- int err[2];
-};
+typedef struct nni_device_path {
+ nni_aio * user; // user aio
+ nni_aio * aio;
+ nni_sock *src;
+ nni_sock *dst;
+ int state;
+} nni_device_path;
+
+#define NNI_DEVICE_STATE_INIT 0
+#define NNI_DEVICE_STATE_RECV 1
+#define NNI_DEVICE_STATE_SEND 2
+#define NNI_DEVICE_STATE_FINI 3
+
+typedef struct nni_device_data {
+ nni_aio * user;
+ int npath;
+ nni_device_path paths[2];
+ nni_mtx mtx;
+ int running;
+} nni_device_data;
typedef struct nni_device_pair nni_device_pair;
-static int
-nni_device_loop(nni_sock *from, nni_sock *to)
+static void
+nni_device_cancel(nni_aio *aio, int rv)
{
- nni_msg *msg;
- int rv = 0;
-
- for (;;) {
- // Take messages sock[0], and send to sock[1].
- // If an error occurs, we close both sockets.
- if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) {
- break;
- }
- if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) {
- nni_msg_free(msg);
- break;
- }
+ nni_device_data *dd = aio->a_prov_data;
+ // cancellation is the only path to shutting it down.
+
+ nni_mtx_lock(&dd->mtx);
+ if (dd->running == 0) {
+ nni_mtx_unlock(&dd->mtx);
+ return;
}
+ dd->running = 0;
+ nni_mtx_unlock(&dd->mtx);
- return (rv);
+ nni_sock_shutdown(dd->paths[0].src);
+ nni_sock_shutdown(dd->paths[0].dst);
+ nni_aio_finish_error(dd->user, rv);
}
static void
-nni_device_fwd(void *p)
+nni_device_cb(void *arg)
{
- nni_device_pair *pair = p;
+ nni_device_path *p = arg;
+ nni_aio * aio = p->aio;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ p->state = NNI_DEVICE_STATE_FINI;
+ nni_aio_cancel(p->user, rv);
+ return;
+ }
- pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]);
- nni_sock_shutdown(pair->socks[0]);
- nni_sock_shutdown(pair->socks[1]);
+ switch (p->state) {
+ case NNI_DEVICE_STATE_INIT:
+ case NNI_DEVICE_STATE_SEND:
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, aio);
+ break;
+ case NNI_DEVICE_STATE_RECV:
+ // Leave the message where it is.
+ p->state = NNI_DEVICE_STATE_SEND;
+ nni_sock_send(p->dst, aio);
+ break;
+ case NNI_DEVICE_STATE_FINI:
+ break;
+ }
}
-static void
-nni_device_rev(void *p)
+void
+nni_device_fini(nni_device_data *dd)
{
- nni_device_pair *pair = p;
-
- pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]);
- nni_sock_shutdown(pair->socks[0]);
- nni_sock_shutdown(pair->socks[1]);
+ int i;
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ nni_aio_stop(p->aio);
+ }
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ nni_aio_fini(p->aio);
+ }
+ nni_mtx_fini(&dd->mtx);
+ NNI_FREE_STRUCT(dd);
}
int
-nni_device(nni_sock *sock1, nni_sock *sock2)
+nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
{
- nni_device_pair pair;
- int rv;
- nni_duration never = -1;
- size_t sz;
-
- memset(&pair, 0, sizeof(pair));
- pair.socks[0] = sock1;
- pair.socks[1] = sock2;
-
- if (sock1 == NULL) {
- sock1 = sock2;
+ nni_device_data *dd;
+ int npath = 2;
+ int i;
+
+ // Specifying either of these as null turns the device into
+ // a loopback reflector.
+ if (s1 == NULL) {
+ s1 = s2;
}
- if (sock2 == NULL) {
- sock2 = sock1;
+ if (s2 == NULL) {
+ s2 = s1;
}
- if ((sock1 == NULL) || (sock2 == NULL)) {
- rv = NNG_EINVAL;
- goto out;
+ // At least one of the sockets must be valid.
+ if ((s1 == NULL) || (s2 == NULL)) {
+ return (NNG_EINVAL);
}
- if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) ||
- (nni_sock_peer(sock2) != nni_sock_proto(sock1))) {
- rv = NNG_EINVAL;
- goto out;
+ if ((nni_sock_peer(s1) != nni_sock_proto(s2)) ||
+ (nni_sock_peer(s2) != nni_sock_proto(s1))) {
+ return (NNG_EINVAL);
}
- // No timeouts.
- sz = sizeof(never);
- if ((nni_sock_setopt(sock1, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock1, NNG_OPT_SENDTIMEO, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, NNG_OPT_SENDTIMEO, &never, sz) != 0)) {
- // This should never happen.
- rv = NNG_EINVAL;
- goto out;
+ // Note we assume that since they peers, we only need to look
+ // at the receive flags -- the other side is assumed to be able
+ // to send.
+ if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) {
+ nni_sock *temp = s1;
+ s1 = s2;
+ s2 = temp;
}
- pair.socks[0] = sock1;
- pair.socks[1] = sock2;
+ NNI_ASSERT((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) != 0);
- if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) {
- goto out;
+ // Only run one forwarder if the protocols are not bidirectional, or
+ // if the source and destination sockets are identical. (The latter is
+ // not strictly necessary, but it saves resources and minimizes any
+ // extra reordering.)
+ if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) {
+ npath = 1;
}
- if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) {
- nni_thr_fini(&pair.thrs[0]);
- goto out;
- }
- if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) &&
- ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) {
- nni_thr_run(&pair.thrs[0]);
+
+ if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) {
+ return (NNG_ENOMEM);
}
- // If the sockets are the same, then its a simple one way forwarder,
- // and we don't need two workers (but would be harmless if we did it).
- if ((sock1 != sock2) &&
- ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) &&
- ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) {
- nni_thr_run(&pair.thrs[1]);
+ nni_mtx_init(&dd->mtx);
+
+ for (i = 0; i < npath; i++) {
+ int rv;
+ nni_device_path *p = &dd->paths[i];
+ p->src = i == 0 ? s1 : s2;
+ p->dst = i == 0 ? s2 : s1;
+ p->state = NNI_DEVICE_STATE_INIT;
+
+ if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) {
+ nni_device_fini(dd);
+ return (rv);
+ }
+
+ nni_aio_set_timeout(p->aio, NNI_TIME_NEVER);
}
+ dd->npath = npath;
+ *dp = dd;
+ return (0);
+}
- // This blocks on both threads (though if we didn't start one, that
- // will return immediately.)
- nni_thr_fini(&pair.thrs[0]);
- nni_thr_fini(&pair.thrs[1]);
+void
+nni_device_start(nni_device_data *dd, nni_aio *user)
+{
+ int i;
- nni_sock_rele(sock1);
- if (sock1 != sock2) {
- nni_sock_rele(sock2);
+ nni_mtx_lock(&dd->mtx);
+ dd->user = user;
+ if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
+ nni_mtx_unlock(&dd->mtx);
+ return;
}
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ p->user = user;
+ p->state = NNI_DEVICE_STATE_INIT;
+ }
+ for (i = 0; i < dd->npath; i++) {
+ nni_device_path *p = &dd->paths[i];
+ p->state = NNI_DEVICE_STATE_RECV;
+ nni_sock_recv(p->src, p->aio);
+ }
+ dd->running = 1;
+ nni_mtx_unlock(&dd->mtx);
+}
+
+int
+nni_device(nni_sock *s1, nni_sock *s2)
+{
+ nni_device_data *dd;
+ nni_aio * aio;
+ int rv;
- rv = pair.err[0];
- if (rv == 0) {
- rv = pair.err[1];
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
}
- if (rv == 0) {
- // This can happen if neither thread ran. Shouldn't happen
- // really.
- rv = NNG_EINVAL;
+ if (nni_device_init(&dd, s1, s2) != 0) {
+ nni_aio_fini(aio);
+ return (rv);
}
+ nni_device_start(dd, aio);
+ nni_aio_wait(aio);
-out:
+ rv = nni_aio_result(aio);
+ nni_device_fini(dd);
+ nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 8fb8e67b..c9b70ccb 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -798,38 +798,6 @@ nni_sock_send(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_send(sock->s_data, aio);
}
-int
-nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
-{
- int rv;
- nni_time expire;
- nni_time timeo = sock->s_sndtimeo;
- nni_aio *aio;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
- if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (timeo == NNI_TIME_NEVER) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += timeo;
- }
- nni_aio_set_timeout(aio, expire);
- nni_aio_set_msg(aio, msg);
-
- nni_sock_send(sock, aio);
- nni_aio_wait(aio);
-
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
-
- return (rv);
-}
-
void
nni_sock_recv(nni_sock *sock, nni_aio *aio)
{
@@ -837,43 +805,6 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
-int
-nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
-{
- int rv;
- nni_msg *msg;
- nni_time expire;
- nni_time timeo = sock->s_rcvtimeo;
- nni_aio *aio;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
- if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (timeo == NNI_TIME_NEVER) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += timeo;
- }
-
- nni_aio_set_timeout(aio, expire);
- nni_sock_recv(sock, aio);
- nni_aio_wait(aio);
-
- if ((rv = nni_aio_result(aio)) == 0) {
- msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
-
- *msgp = msg;
- }
-
- nni_aio_fini(aio);
- return (rv);
-}
-
// nni_sock_protocol returns the socket's 16-bit protocol number.
uint16_t
nni_sock_proto(nni_sock *sock)
diff --git a/src/nng.c b/src/nng.c
index 39206941..bd403896 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -679,6 +679,12 @@ nng_device(nng_socket s1, nng_socket s2)
}
rv = nni_device(sock1, sock2);
+ if (sock1 != NULL) {
+ nni_sock_rele(sock1);
+ }
+ if (sock2 != NULL) {
+ nni_sock_rele(sock2);
+ }
return (rv);
}