From 46ca4756a09d015298b310cd482f2e39d9a034db Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 26 Oct 2017 11:14:25 -0700 Subject: fixes #46 make device() use aios directly This eliminates the separate threads used for devices, letting them benefit from the new aio framework. It also eliminates the legacy nni_sock_sendmsg and nni_sock_recvmsg internal APIs. It would appear that there is an opportunity here to provide asynchronous device support out to userland as well, exposing an aio to them. That work is deferred to later. --- src/core/device.c | 258 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 163 insertions(+), 95 deletions(-) (limited to 'src/core/device.c') diff --git a/src/core/device.c b/src/core/device.c index 22ec086e..2e000d7e 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -12,136 +12,204 @@ #include -struct nni_device_pair { - nni_thr thrs[2]; - nni_sock *socks[2]; - int err[2]; -}; +typedef struct nni_device_path { + nni_aio * user; // user aio + nni_aio * aio; + nni_sock *src; + nni_sock *dst; + int state; +} nni_device_path; + +#define NNI_DEVICE_STATE_INIT 0 +#define NNI_DEVICE_STATE_RECV 1 +#define NNI_DEVICE_STATE_SEND 2 +#define NNI_DEVICE_STATE_FINI 3 + +typedef struct nni_device_data { + nni_aio * user; + int npath; + nni_device_path paths[2]; + nni_mtx mtx; + int running; +} nni_device_data; typedef struct nni_device_pair nni_device_pair; -static int -nni_device_loop(nni_sock *from, nni_sock *to) +static void +nni_device_cancel(nni_aio *aio, int rv) { - nni_msg *msg; - int rv = 0; - - for (;;) { - // Take messages sock[0], and send to sock[1]. - // If an error occurs, we close both sockets. - if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) { - break; - } - if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) { - nni_msg_free(msg); - break; - } + nni_device_data *dd = aio->a_prov_data; + // cancellation is the only path to shutting it down. + + nni_mtx_lock(&dd->mtx); + if (dd->running == 0) { + nni_mtx_unlock(&dd->mtx); + return; } + dd->running = 0; + nni_mtx_unlock(&dd->mtx); - return (rv); + nni_sock_shutdown(dd->paths[0].src); + nni_sock_shutdown(dd->paths[0].dst); + nni_aio_finish_error(dd->user, rv); } static void -nni_device_fwd(void *p) +nni_device_cb(void *arg) { - nni_device_pair *pair = p; + nni_device_path *p = arg; + nni_aio * aio = p->aio; + int rv; + + if ((rv = nni_aio_result(aio)) != 0) { + p->state = NNI_DEVICE_STATE_FINI; + nni_aio_cancel(p->user, rv); + return; + } - pair->err[0] = nni_device_loop(pair->socks[0], pair->socks[1]); - nni_sock_shutdown(pair->socks[0]); - nni_sock_shutdown(pair->socks[1]); + switch (p->state) { + case NNI_DEVICE_STATE_INIT: + case NNI_DEVICE_STATE_SEND: + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, aio); + break; + case NNI_DEVICE_STATE_RECV: + // Leave the message where it is. + p->state = NNI_DEVICE_STATE_SEND; + nni_sock_send(p->dst, aio); + break; + case NNI_DEVICE_STATE_FINI: + break; + } } -static void -nni_device_rev(void *p) +void +nni_device_fini(nni_device_data *dd) { - nni_device_pair *pair = p; - - pair->err[1] = nni_device_loop(pair->socks[1], pair->socks[0]); - nni_sock_shutdown(pair->socks[0]); - nni_sock_shutdown(pair->socks[1]); + int i; + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + nni_aio_stop(p->aio); + } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + nni_aio_fini(p->aio); + } + nni_mtx_fini(&dd->mtx); + NNI_FREE_STRUCT(dd); } int -nni_device(nni_sock *sock1, nni_sock *sock2) +nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) { - nni_device_pair pair; - int rv; - nni_duration never = -1; - size_t sz; - - memset(&pair, 0, sizeof(pair)); - pair.socks[0] = sock1; - pair.socks[1] = sock2; - - if (sock1 == NULL) { - sock1 = sock2; + nni_device_data *dd; + int npath = 2; + int i; + + // Specifying either of these as null turns the device into + // a loopback reflector. + if (s1 == NULL) { + s1 = s2; } - if (sock2 == NULL) { - sock2 = sock1; + if (s2 == NULL) { + s2 = s1; } - if ((sock1 == NULL) || (sock2 == NULL)) { - rv = NNG_EINVAL; - goto out; + // At least one of the sockets must be valid. + if ((s1 == NULL) || (s2 == NULL)) { + return (NNG_EINVAL); } - if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) || - (nni_sock_peer(sock2) != nni_sock_proto(sock1))) { - rv = NNG_EINVAL; - goto out; + if ((nni_sock_peer(s1) != nni_sock_proto(s2)) || + (nni_sock_peer(s2) != nni_sock_proto(s1))) { + return (NNG_EINVAL); } - // No timeouts. - sz = sizeof(never); - if ((nni_sock_setopt(sock1, NNG_OPT_RECVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_RECVTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock1, NNG_OPT_SENDTIMEO, &never, sz) != 0) || - (nni_sock_setopt(sock2, NNG_OPT_SENDTIMEO, &never, sz) != 0)) { - // This should never happen. - rv = NNG_EINVAL; - goto out; + // Note we assume that since they peers, we only need to look + // at the receive flags -- the other side is assumed to be able + // to send. + if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) { + nni_sock *temp = s1; + s1 = s2; + s2 = temp; } - pair.socks[0] = sock1; - pair.socks[1] = sock2; + NNI_ASSERT((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) != 0); - if ((rv = nni_thr_init(&pair.thrs[0], nni_device_fwd, &pair)) != 0) { - goto out; + // Only run one forwarder if the protocols are not bidirectional, or + // if the source and destination sockets are identical. (The latter is + // not strictly necessary, but it saves resources and minimizes any + // extra reordering.) + if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) { + npath = 1; } - if ((rv = nni_thr_init(&pair.thrs[1], nni_device_rev, &pair)) != 0) { - nni_thr_fini(&pair.thrs[0]); - goto out; - } - if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) && - ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) { - nni_thr_run(&pair.thrs[0]); + + if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) { + return (NNG_ENOMEM); } - // If the sockets are the same, then its a simple one way forwarder, - // and we don't need two workers (but would be harmless if we did it). - if ((sock1 != sock2) && - ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) && - ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) { - nni_thr_run(&pair.thrs[1]); + nni_mtx_init(&dd->mtx); + + for (i = 0; i < npath; i++) { + int rv; + nni_device_path *p = &dd->paths[i]; + p->src = i == 0 ? s1 : s2; + p->dst = i == 0 ? s2 : s1; + p->state = NNI_DEVICE_STATE_INIT; + + if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) { + nni_device_fini(dd); + return (rv); + } + + nni_aio_set_timeout(p->aio, NNI_TIME_NEVER); } + dd->npath = npath; + *dp = dd; + return (0); +} - // This blocks on both threads (though if we didn't start one, that - // will return immediately.) - nni_thr_fini(&pair.thrs[0]); - nni_thr_fini(&pair.thrs[1]); +void +nni_device_start(nni_device_data *dd, nni_aio *user) +{ + int i; - nni_sock_rele(sock1); - if (sock1 != sock2) { - nni_sock_rele(sock2); + nni_mtx_lock(&dd->mtx); + dd->user = user; + if (nni_aio_start(user, nni_device_cancel, dd) != 0) { + nni_mtx_unlock(&dd->mtx); + return; } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + p->user = user; + p->state = NNI_DEVICE_STATE_INIT; + } + for (i = 0; i < dd->npath; i++) { + nni_device_path *p = &dd->paths[i]; + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, p->aio); + } + dd->running = 1; + nni_mtx_unlock(&dd->mtx); +} + +int +nni_device(nni_sock *s1, nni_sock *s2) +{ + nni_device_data *dd; + nni_aio * aio; + int rv; - rv = pair.err[0]; - if (rv == 0) { - rv = pair.err[1]; + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); } - if (rv == 0) { - // This can happen if neither thread ran. Shouldn't happen - // really. - rv = NNG_EINVAL; + if (nni_device_init(&dd, s1, s2) != 0) { + nni_aio_fini(aio); + return (rv); } + nni_device_start(dd, aio); + nni_aio_wait(aio); -out: + rv = nni_aio_result(aio); + nni_device_fini(dd); + nni_aio_fini(aio); return (rv); } -- cgit v1.2.3-70-g09d2