diff options
Diffstat (limited to 'src/core/device.c')
| -rw-r--r-- | src/core/device.c | 248 |
1 files changed, 125 insertions, 123 deletions
diff --git a/src/core/device.c b/src/core/device.c index 71480bbc..c7266271 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.com> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.com> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -10,107 +10,126 @@ #include "core/nng_impl.h" -#include <string.h> +typedef struct device_data_s device_data; +typedef struct device_path_s device_path; -typedef struct nni_device_path { - nni_aio * user; // user aio - nni_aio * aio; - nni_sock *src; - nni_sock *dst; - int state; -} nni_device_path; +struct device_path_s { + int state; + device_data *d; + nni_sock *src; + nni_sock *dst; + nni_aio aio; +}; #define NNI_DEVICE_STATE_INIT 0 #define NNI_DEVICE_STATE_RECV 1 #define NNI_DEVICE_STATE_SEND 2 #define NNI_DEVICE_STATE_FINI 3 -typedef struct nni_device_data { - nni_aio * user; - int npath; - nni_device_path paths[2]; - nni_mtx mtx; - bool running; -} nni_device_data; +struct device_data_s { + nni_aio *user; + int num_paths; + int running; + int rv; + device_path paths[2]; + nni_reap_node reap; +}; -typedef struct nni_device_pair nni_device_pair; +static void device_fini(void *); + +static nni_mtx device_mtx = NNI_MTX_INITIALIZER; +static nni_reap_list device_reap = { + .rl_offset = offsetof(device_data, reap), + .rl_func = device_fini, +}; static void -nni_device_cancel(nni_aio *aio, void *arg, int rv) +device_fini(void *arg) { - nni_device_data *dd = arg; - // cancellation is the only path to shutting it down. + device_data *d = arg; - nni_mtx_lock(&dd->mtx); - if ((!dd->running) || (dd->user != aio)) { - nni_mtx_unlock(&dd->mtx); - return; + for (int i = 0; i < d->num_paths; i++) { + nni_aio_stop(&d->paths[i].aio); } - dd->running = false; - dd->user = NULL; - nni_mtx_unlock(&dd->mtx); + NNI_FREE_STRUCT(d); +} - nni_sock_shutdown(dd->paths[0].src); - nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(aio, rv); +static void +device_cancel(nni_aio *aio, void *arg, int rv) +{ + device_data *d = arg; + // cancellation is the only path to shutting it down. + + nni_mtx_lock(&device_mtx); + if (d->user == aio) { + for (int i = 0; i < d->num_paths; i++) { + nni_aio_abort(&d->paths[i].aio, rv); + } + } + nni_mtx_unlock(&device_mtx); } static void -nni_device_cb(void *arg) +device_cb(void *arg) { - nni_device_path *p = arg; - nni_aio * aio = p->aio; - int rv; + device_path *p = arg; + device_data *d = p->d; + int rv; - if ((rv = nni_aio_result(aio)) != 0) { + if ((rv = nni_aio_result(&p->aio)) != 0) { + nni_mtx_lock(&device_mtx); p->state = NNI_DEVICE_STATE_FINI; - nni_aio_abort(p->user, rv); + d->running--; + if (d->rv == 0) { + d->rv = rv; + } + for (int i = 0; i < d->num_paths; i++) { + if (p != &d->paths[i]) { + nni_aio_abort(&d->paths[i].aio, rv); + } + } + if (d->running == 0) { + if (d->user != NULL) { + nni_aio_finish_error(d->user, d->rv); + d->user = NULL; + } + nni_sock_rele(d->paths[0].src); + nni_sock_rele(d->paths[0].dst); + + nni_reap(&device_reap, d); + } + nni_mtx_unlock(&device_mtx); return; } switch (p->state) { case NNI_DEVICE_STATE_INIT: + break; case NNI_DEVICE_STATE_SEND: p->state = NNI_DEVICE_STATE_RECV; - nni_sock_recv(p->src, aio); + nni_sock_recv(p->src, &p->aio); break; case NNI_DEVICE_STATE_RECV: // Leave the message where it is. p->state = NNI_DEVICE_STATE_SEND; - nni_sock_send(p->dst, aio); + nni_sock_send(p->dst, &p->aio); break; case NNI_DEVICE_STATE_FINI: break; } } -void -nni_device_fini(nni_device_data *dd) +static int +device_init(device_data **dp, nni_sock *s1, nni_sock *s2) { - int i; - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - nni_aio_stop(p->aio); - } - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - nni_aio_free(p->aio); - } - nni_mtx_fini(&dd->mtx); - NNI_FREE_STRUCT(dd); -} - -int -nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) -{ - nni_device_data *dd; - int npath = 2; - int i; - bool raw; - size_t rsz; + int num_paths = 2; + int i; + bool raw; + size_t rsz; + device_data *d; // Specifying either of these as null turns the device into - // a loopback reflector. + // a reflector. if (s1 == NULL) { s1 = s2; } @@ -141,8 +160,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) return (NNG_EINVAL); } - // Note we assume that since they peers, we only need to look - // at the receive flags -- the other side is assumed to be able + // Note we assume that since they are peers, we only need to look + // at the recv flags -- the other side is assumed to be able // to send. if ((nni_sock_flags(s1) & NNI_PROTO_FLAG_RCV) == 0) { nni_sock *temp = s1; @@ -157,82 +176,65 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) // not strictly necessary, but it saves resources and minimizes any // extra reordering.) if (((nni_sock_flags(s2) & NNI_PROTO_FLAG_RCV) == 0) || (s1 == s2)) { - npath = 1; + num_paths = 1; } - if ((dd = NNI_ALLOC_STRUCT(dd)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&dd->mtx); - for (i = 0; i < npath; i++) { - int rv; - nni_device_path *p = &dd->paths[i]; - p->src = i == 0 ? s1 : s2; - p->dst = i == 0 ? s2 : s1; - p->state = NNI_DEVICE_STATE_INIT; + d->num_paths = 0; + for (i = 0; i < num_paths; i++) { + device_path *p = &d->paths[i]; + p->src = i == 0 ? s1 : s2; + p->dst = i == 0 ? s2 : s1; + p->d = d; + p->state = NNI_DEVICE_STATE_INIT; - if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) { - nni_device_fini(dd); - return (rv); - } + nni_aio_init(&p->aio, device_cb, p); - nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE); + nni_aio_set_timeout(&p->aio, NNG_DURATION_INFINITE); } - dd->npath = npath; - *dp = dd; + nni_sock_hold(d->paths[0].src); + nni_sock_hold(d->paths[0].dst); + + d->num_paths = num_paths; + *dp = d; return (0); } -void -nni_device_start(nni_device_data *dd, nni_aio *user) +static void +device_start(device_data *d, nni_aio *user) { - int i; - int rv; - - if (nni_aio_begin(user) != 0) { - return; + d->user = user; + for (int i = 0; i < d->num_paths; i++) { + device_path *p = &d->paths[i]; + p->state = NNI_DEVICE_STATE_RECV; + nni_sock_recv(p->src, &p->aio); + d->running++; } - nni_mtx_lock(&dd->mtx); - if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) { - nni_mtx_unlock(&dd->mtx); - nni_aio_finish_error(user, rv); - return; - } - dd->user = user; - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - p->user = user; - p->state = NNI_DEVICE_STATE_INIT; - } - for (i = 0; i < dd->npath; i++) { - nni_device_path *p = &dd->paths[i]; - p->state = NNI_DEVICE_STATE_RECV; - nni_sock_recv(p->src, p->aio); - } - dd->running = true; - nni_mtx_unlock(&dd->mtx); } -int -nni_device(nni_sock *s1, nni_sock *s2) +void +nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2) { - nni_device_data *dd; - nni_aio * aio; - int rv; + device_data *d; + int rv; - if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { - return (rv); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&device_mtx); + if ((rv = device_init(&d, s1, s2)) != 0) { + nni_mtx_unlock(&device_mtx); + nni_aio_finish_error(aio, rv); + return; } - if ((rv = nni_device_init(&dd, s1, s2)) != 0) { - nni_aio_free(aio); - return (rv); + if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) { + nni_mtx_unlock(&device_mtx); + nni_aio_finish_error(aio, rv); + nni_reap(&device_reap, d); } - nni_device_start(dd, aio); - nni_aio_wait(aio); - - rv = nni_aio_result(aio); - nni_device_fini(dd); - nni_aio_free(aio); - return (rv); + device_start(d, aio); + nni_mtx_unlock(&device_mtx); } |
