diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/device.c | 258 | ||||
| -rw-r--r-- | src/core/socket.c | 69 |
2 files changed, 163 insertions, 164 deletions
diff --git a/src/core/device.c b/src/core/device.c index 22ec086e..2e000d7e 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -12,136 +12,204 @@ #include <string.h> -struct nni_device_pair { - nni_thr thrs[2]; - nni_sock *socks[2]; - int err[2]; -}; +typedef struct nni_device_path { + nni_aio * user; // user aio + nni_aio * aio; + nni_sock *src; + nni_sock *dst; + int state; +} nni_device_path; + +#define NNI_DEVICE_STATE_INIT 0 +#define NNI_DEVICE_STATE_RECV 1 +#define NNI_DEVICE_STATE_SEND 2 +#define NNI_DEVICE_STATE_FINI 3 + +typedef struct nni_device_data { + nni_aio * user; + int npath; + nni_device_path paths[2]; + nni_mtx mtx; + int running; +} nni_device_data; typedef struct nni_device_pair nni_device_pair; -static int -nni_device_loop(nni_sock *from, nni_sock *to) +static void +nni_device_cancel(nni_aio *aio, int rv) { - nni_msg *msg; - int rv = 0; - - for (;;) { - // Take messages sock[0], and send to sock[1]. - // If an error occurs, we close both sockets. - if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) { - break; - } - if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) { - nni_msg_free(msg); - break; - } + nni_device_data *dd = aio->a_prov_data; + // cancellation is the only path to shutting it down. + + nni_mtx_lock(&dd->mtx); + if (dd->running == 0) { + nni_mtx_unlock(&dd->mtx); + return; } + dd->running = 0; + nni_mtx_unlock(&dd->mtx); - return (rv); + nni_sock_shutdown(dd->paths[0].src); + nni_sock_shutdown(dd->paths[0].dst); + nni_aio_finish_error(dd->user, rv); } static void -nni_device_fwd(void *p) +nni_device_cb(void *arg) { - nni_device_pair *pair = p; + nni_device_path *p = arg; + nni_aio * aio = p->aio; + int rv; + + if ((rv = nni_aio_result(aio)) != 0) { + p->state = NNI_DEVICE_STATE_FINI; + nni_aio_cancel(p->user, rv); + return; + } - pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]); - nni_sock_shutdown(pair->socks[0]); - nni_sock_shutdown(pair->socks[1]); + switch (p->state) { + case NNI_DEVICE_STATE_INIT: + case NNI_DEVICE_STATE_SEND: + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, aio); + break; + case NNI_DEVICE_STATE_RECV: + // Leave the message where it is. + p->state = NNI_DEVICE_STATE_SEND; + nni_sock_send(p->dst, aio); + break; + case NNI_DEVICE_STATE_FINI: + break; + } } -static void -nni_device_rev(void *p) +void +nni_device_fini(nni_device_data *dd) { - nni_device_pair *pair = p; - - pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]); - nni_sock_shutdown(pair->socks[0]); - nni_sock_shutdown(pair->socks[1]); + int i; + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + nni_aio_stop(p->aio); + } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + nni_aio_fini(p->aio); + } + nni_mtx_fini(&dd->mtx); + NNI_FREE_STRUCT(dd); } int -nni_device(nni_sock *sock1, nni_sock *sock2) +nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) { - nni_device_pair pair; - int rv; - nni_duration never = -1; - size_t sz; - - memset(&pair, 0, sizeof(pair)); - pair.socks[0] = sock1; - pair.socks[1] = sock2; - - if (sock1 == NULL) { - sock1 = sock2; + nni_device_data *dd; + int npath = 2; + int i; + + // Specifying either of these as null turns the device into + // a loopback reflector. + if (s1 == NULL) { + s1 = s2; } - if (sock2 == NULL) { - sock2 = sock1; + if (s2 == NULL) { + s2 = s1; } - if ((sock1 == NULL) || (sock2 == NULL)) { - rv = NNG_EINVAL; - goto out; + // At least one of the sockets must be valid. + if ((s1 == NULL) || (s2 == NULL)) { + return (NNG_EINVAL); } - if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) || - (nni_sock_peer(sock2) != nni_sock_proto(sock1))) { - rv = NNG_EINVAL; - goto out; + if ((nni_sock_peer(s1) != nni_sock_proto(s2)) || + (nni_sock_peer(s2) != nni_sock_proto(s1))) { + return (NNG_EINVAL); } - // No timeouts. - sz = sizeof(never); - if ((nni_sock_setopt(sock1, NNG_OPT_RECVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_RECVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock1, NNG_OPT_SENDTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_SENDTIMEO, &never, sz) != 0)) { - // This should never happen. - rv = NNG_EINVAL; - goto out; + // Note we assume that since they peers, we only need to look + // at the receive flags -- the other side is assumed to be able + // to send. + if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) { + nni_sock *temp = s1; + s1 = s2; + s2 = temp; } - pair.socks[0] = sock1; - pair.socks[1] = sock2; + NNI_ASSERT((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) != 0); - if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) { - goto out; + // Only run one forwarder if the protocols are not bidirectional, or + // if the source and destination sockets are identical. (The latter is + // not strictly necessary, but it saves resources and minimizes any + // extra reordering.) + if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) { + npath = 1; } - if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) { - nni_thr_fini(&pair.thrs[0]); - goto out; - } - if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) && - ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) { - nni_thr_run(&pair.thrs[0]); + + if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) { + return (NNG_ENOMEM); } - // If the sockets are the same, then its a simple one way forwarder, - // and we don't need two workers (but would be harmless if we did it). - if ((sock1 != sock2) && - ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) && - ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) { - nni_thr_run(&pair.thrs[1]); + nni_mtx_init(&dd->mtx); + + for (i = 0; i < npath; i++) { + int rv; + nni_device_path *p = &dd->paths[i]; + p->src = i == 0 ? s1 : s2; + p->dst = i == 0 ? s2 : s1; + p->state = NNI_DEVICE_STATE_INIT; + + if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) { + nni_device_fini(dd); + return (rv); + } + + nni_aio_set_timeout(p->aio, NNI_TIME_NEVER); } + dd->npath = npath; + *dp = dd; + return (0); +} - // This blocks on both threads (though if we didn't start one, that - // will return immediately.) - nni_thr_fini(&pair.thrs[0]); - nni_thr_fini(&pair.thrs[1]); +void +nni_device_start(nni_device_data *dd, nni_aio *user) +{ + int i; - nni_sock_rele(sock1); - if (sock1 != sock2) { - nni_sock_rele(sock2); + nni_mtx_lock(&dd->mtx); + dd->user = user; + if (nni_aio_start(user, nni_device_cancel, dd) != 0) { + nni_mtx_unlock(&dd->mtx); + return; } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + p->user = user; + p->state = NNI_DEVICE_STATE_INIT; + } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, p->aio); + } + dd->running = 1; + nni_mtx_unlock(&dd->mtx); +} + +int +nni_device(nni_sock *s1, nni_sock *s2) +{ + nni_device_data *dd; + nni_aio * aio; + int rv; - rv = pair.err[0]; - if (rv == 0) { - rv = pair.err[1]; + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); } - if (rv == 0) { - // This can happen if neither thread ran. Shouldn't happen - // really. - rv = NNG_EINVAL; + if (nni_device_init(&dd, s1, s2) != 0) { + nni_aio_fini(aio); + return (rv); } + nni_device_start(dd, aio); + nni_aio_wait(aio); -out: + rv = nni_aio_result(aio); + nni_device_fini(dd); + nni_aio_fini(aio); return (rv); } diff --git a/src/core/socket.c b/src/core/socket.c index 8fb8e67b..c9b70ccb 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -798,38 +798,6 @@ nni_sock_send(nni_sock *sock, nni_aio *aio) sock->s_sock_ops.sock_send(sock->s_data, aio); } -int -nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) -{ - int rv; - nni_time expire; - nni_time timeo = sock->s_sndtimeo; - nni_aio *aio; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - - if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { - expire = NNI_TIME_ZERO; - } else if (timeo == NNI_TIME_NEVER) { - expire = NNI_TIME_NEVER; - } else { - expire = nni_clock(); - expire += timeo; - } - nni_aio_set_timeout(aio, expire); - nni_aio_set_msg(aio, msg); - - nni_sock_send(sock, aio); - nni_aio_wait(aio); - - rv = nni_aio_result(aio); - nni_aio_fini(aio); - - return (rv); -} - void nni_sock_recv(nni_sock *sock, nni_aio *aio) { @@ -837,43 +805,6 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio) sock->s_sock_ops.sock_recv(sock->s_data, aio); } -int -nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) -{ - int rv; - nni_msg *msg; - nni_time expire; - nni_time timeo = sock->s_rcvtimeo; - nni_aio *aio; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - - if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { - expire = NNI_TIME_ZERO; - } else if (timeo == NNI_TIME_NEVER) { - expire = NNI_TIME_NEVER; - } else { - expire = nni_clock(); - expire += timeo; - } - - nni_aio_set_timeout(aio, expire); - nni_sock_recv(sock, aio); - nni_aio_wait(aio); - - if ((rv = nni_aio_result(aio)) == 0) { - msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - - *msgp = msg; - } - - nni_aio_fini(aio); - return (rv); -} - // nni_sock_protocol returns the socket's 16-bit protocol number. uint16_t nni_sock_proto(nni_sock *sock) |
