From b21d7805523a407a14567017edbdef57ca81781f Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 8 Jan 2020 20:34:26 -0800 Subject: fixes #1094 Consider in-lining task and aio This only does it for rep, but it also has changes that should increase the overall test coverage for the REP protocol --- src/core/aio.c | 134 +++++++++++++-------------------- src/core/aio.h | 74 ++++++++++++++++-- src/core/device.c | 10 +-- src/core/dialer.c | 12 +-- src/core/listener.c | 8 +- src/nng.c | 4 +- src/protocol/bus0/bus.c | 22 +++--- src/protocol/pair0/pair.c | 16 ++-- src/protocol/pair1/pair.c | 20 ++--- src/protocol/pipeline0/pull.c | 8 +- src/protocol/pipeline0/push.c | 12 +-- src/protocol/pubsub0/pub.c | 8 +- src/protocol/pubsub0/sub.c | 4 +- src/protocol/pubsub0/xsub.c | 4 +- src/protocol/reqrep0/rep.c | 80 ++++++++++---------- src/protocol/reqrep0/rep_test.c | 21 ++++++ src/protocol/reqrep0/req.c | 8 +- src/protocol/reqrep0/xrep.c | 20 ++--- src/protocol/reqrep0/xreq.c | 16 ++-- src/protocol/survey0/respond.c | 8 +- src/protocol/survey0/survey.c | 12 +-- src/protocol/survey0/xrespond.c | 20 ++--- src/protocol/survey0/xsurvey.c | 20 ++--- src/supplemental/http/http_client.c | 10 +-- src/supplemental/http/http_conn.c | 8 +- src/supplemental/http/http_server.c | 20 ++--- src/supplemental/tcp/tcp.c | 12 +-- src/supplemental/tls/mbedtls/tls.c | 10 +-- src/supplemental/tls/tls_common.c | 6 +- src/supplemental/websocket/websocket.c | 20 ++--- src/transport/ipc/ipc.c | 22 +++--- src/transport/tcp/tcp.c | 26 +++---- src/transport/tls/tls.c | 32 ++++---- src/transport/ws/websocket.c | 16 ++-- src/transport/zerotier/zerotier.c | 22 +++--- 35 files changed, 395 insertions(+), 350 deletions(-) diff --git a/src/core/aio.c b/src/core/aio.c index 9f025f4c..a15bb47b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -61,107 +61,75 @@ static nni_aio *nni_aio_expire_aio; // operations from starting, without waiting for any existing one to // complete, call nni_aio_close. -// An nni_aio is an async I/O handle. -struct nng_aio { - int a_result; // Result code (nng_errno) - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; // Absolute timeout - nni_duration a_timeout; // Relative timeout - - bool a_stop; // shutting down (no new operations) - bool a_sleep; // sleeping with no action - bool a_expire_ok; // expire from sleep is ok - nni_task a_task; - - // Read/write operations. - nni_iov a_iov[8]; - unsigned a_niov; - - // Message operations. - nni_msg *a_msg; - - // User scratch data. Consumers may store values here, which - // must be preserved by providers and the framework. - void *a_user_data[4]; - - // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be - // specified. The semantics of these will vary, and depend on the - // specific operation. - void *a_inputs[4]; - void *a_outputs[4]; - - // Provider-use fields. - nni_aio_cancelfn a_cancel_fn; - void * a_cancel_arg; - nni_list_node a_prov_node; - void * a_prov_extra[4]; // Extra data used by provider - - // Socket address. This turns out to be very useful, as we wind up - // needing socket addresses for numerous connection related routines. - // It would be cleaner to not have this and avoid burning the space, - // but having this hear dramatically simplifies lots of code. - nng_sockaddr a_sockaddr; - - // Expire node. - nni_list_node a_expire_node; -}; - static void nni_aio_expire_add(nni_aio *); -int -nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) +void +nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { - nni_aio *aio; - - if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { - return (NNG_ENOMEM); - } + memset(aio, 0, sizeof(*aio)); nni_task_init(&aio->a_task, NULL, cb, arg); aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; - *aiop = aio; - return (0); } void nni_aio_fini(nni_aio *aio) { - if (aio != NULL) { + nni_aio_cancelfn fn; + void * arg; - nni_aio_cancelfn fn; - void * arg; + // TODO: This probably could just use nni_aio_stop. - // This is like aio_close, but we don't want to dispatch - // the task. And unlike aio_stop, we don't want to wait - // for the task. (Because we implicitly do task_fini.) - nni_mtx_lock(&nni_aio_lk); - fn = aio->a_cancel_fn; - arg = aio->a_cancel_arg; - aio->a_cancel_fn = NULL; - aio->a_cancel_arg = NULL; - aio->a_stop = true; - nni_mtx_unlock(&nni_aio_lk); + // This is like aio_close, but we don't want to dispatch + // the task. And unlike aio_stop, we don't want to wait + // for the task. (Because we implicitly do task_fini.) + nni_mtx_lock(&nni_aio_lk); + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; + nni_mtx_unlock(&nni_aio_lk); - if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); - } + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); + } - // Wait for the aio to be "done"; this ensures that we don't - // destroy an aio from a "normal" completion callback while - // the expiration thread is working. + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. - nni_mtx_lock(&nni_aio_lk); - while (nni_aio_expire_aio == aio) { - if (nni_thr_is_self(&nni_aio_expire_thr)) { - nni_aio_expire_aio = NULL; - break; - } - nni_cv_wait(&nni_aio_expire_cv); + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + // TODO: It should be possible to remove this check! + if (nni_thr_is_self(&nni_aio_expire_thr)) { + nni_aio_expire_aio = NULL; + break; } - nni_mtx_unlock(&nni_aio_lk); + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(&aio->a_task); +} - nni_task_fini(&aio->a_task); +int +nni_aio_alloc(nni_aio **aiop, nni_cb cb, void *arg) +{ + nni_aio *aio; + + if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { + return (NNG_ENOMEM); + } + nni_aio_init(aio, cb, arg); + *aiop = aio; + return (0); +} +void +nni_aio_free(nni_aio *aio) +{ + if (aio != NULL) { + nni_aio_fini(aio); NNI_FREE_STRUCT(aio); } } @@ -189,7 +157,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned niov, const nni_iov *iov) // callback to complete, if still running. It also marks the AIO as // stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other -// callers are not also stopped, before calling nni_aio_fini to release +// callers are not also stopped, before calling nni_aio_free to release // actual memory. void nni_aio_stop(nni_aio *aio) diff --git a/src/core/aio.h b/src/core/aio.h index 83c068b5..c2776bc0 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -22,19 +22,29 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern int nni_aio_init(nni_aio **, nni_cb, void *); +extern void nni_aio_init(nni_aio *, nni_cb, void *arg); -// nni_aio_fini finalizes the aio, releasing resources (locks) -// associated with it. The caller is responsible for ensuring that any -// associated I/O is unscheduled or complete. This is safe to call -// on zero'd memory. +// nni_aio_fini finalizes an aio object, releasing associated resources. +// It waits for the callback to complete. extern void nni_aio_fini(nni_aio *); +// nni_aio_alloc allocates an aio object and initializes it. The callback +// is called with the supplied argument when the operation is complete. +// If NULL is supplied for the callback, then nni_aio_wake is used in its +// place, and the aio is used for the argument. +extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg); + +// nni_aio_free frees the aio, releasing resources (locks) +// associated with it. This is safe to call on zero'd memory. +// This must only be called on an object that was allocated +// with nni_aio_allocate. +extern void nni_aio_free(nni_aio *aio); + // nni_aio_stop cancels any unfinished I/O, running completion callbacks, // but also prevents any new operations from starting (nni_aio_start will -// return NNG_ESTATE). This should be called before nni_aio_fini(). The +// return NNG_ESTATE). This should be called before nni_aio_free(). The // best pattern is to call nni_aio_stop on all linked aios, before calling -// nni_aio_fini on any of them. This function will block until any +// nni_aio_free on any of them. This function will block until any // callbacks are executed, and therefore it should never be executed // from a callback itself. (To abort operations without blocking // use nni_aio_cancel instead.) @@ -162,4 +172,54 @@ extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); + +// An nni_aio is an async I/O handle. The details of this aio structure +// are private to the AIO framework. The structure has the public name +// (nng_aio) so that we minimize the pollution in the public API namespace. +// It is a coding error for anything out side of the AIO framework to access +// any of these members -- the definition is provided here to facilitate +// inlining, but that should be the only use. +struct nng_aio { + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + int a_result; // Result code (nng_errno) + bool a_stop; // Shutting down (no new operations) + bool a_sleep; // Sleeping with no action + bool a_expire_ok; // Expire from sleep is ok + nni_task a_task; + + // Read/write operations. + nni_iov a_iov[8]; + unsigned a_niov; + + // Message operations. + nni_msg *a_msg; + + // User scratch data. Consumers may store values here, which + // must be preserved by providers and the framework. + void *a_user_data[4]; + + // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be + // specified. The semantics of these will vary, and depend on the + // specific operation. + void *a_inputs[4]; + void *a_outputs[4]; + + // Provider-use fields. + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; + nni_list_node a_prov_node; // Linkage on provider list. + void * a_prov_extra[4]; // Extra data used by provider + + // Socket address. This turns out to be very useful, as we wind up + // needing socket addresses for numerous connection related routines. + // It would be cleaner to not have this and avoid burning the space, + // but having this hear dramatically simplifies lots of code. + nng_sockaddr a_sockaddr; + + // Expire node. + nni_list_node a_expire_node; +}; + #endif // CORE_AIO_H diff --git a/src/core/device.c b/src/core/device.c index fee108a8..71480bbc 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -94,7 +94,7 @@ nni_device_fini(nni_device_data *dd) } for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; - nni_aio_fini(p->aio); + nni_aio_free(p->aio); } nni_mtx_fini(&dd->mtx); NNI_FREE_STRUCT(dd); @@ -172,7 +172,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) p->dst = i == 0 ? s2 : s1; p->state = NNI_DEVICE_STATE_INIT; - if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) { nni_device_fini(dd); return (rv); } @@ -221,11 +221,11 @@ nni_device(nni_sock *s1, nni_sock *s2) nni_aio * aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { return (rv); } if ((rv = nni_device_init(&dd, s1, s2)) != 0) { - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } nni_device_start(dd, aio); @@ -233,6 +233,6 @@ nni_device(nni_sock *s1, nni_sock *s2) rv = nni_aio_result(aio); nni_device_fini(dd); - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } diff --git a/src/core/dialer.c b/src/core/dialer.c index e1178783..8a463452 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -60,8 +60,8 @@ nni_dialer_destroy(nni_dialer *d) nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_aio_fini(d->d_con_aio); - nni_aio_fini(d->d_tmo_aio); + nni_aio_free(d->d_con_aio); + nni_aio_free(d->d_tmo_aio); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); @@ -204,8 +204,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_mtx_init(&d->d_mtx); dialer_stats_init(d); - if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || - ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { @@ -382,7 +382,7 @@ nni_dialer_start(nni_dialer *d, int flags) if ((flags & NNG_FLAG_NONBLOCK) != 0) { aio = NULL; } else { - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_atomic_flag_reset(&d->d_started); return (rv); } @@ -397,7 +397,7 @@ nni_dialer_start(nni_dialer *d, int flags) if (aio != NULL) { nni_aio_wait(aio); rv = nni_aio_result(aio); - nni_aio_fini(aio); + nni_aio_free(aio); } return (rv); diff --git a/src/core/listener.c b/src/core/listener.c index d44a5456..87d2c532 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -61,8 +61,8 @@ nni_listener_destroy(nni_listener *l) nni_aio_stop(l->l_acc_aio); nni_aio_stop(l->l_tmo_aio); - nni_aio_fini(l->l_acc_aio); - nni_aio_fini(l->l_tmo_aio); + nni_aio_free(l->l_acc_aio); + nni_aio_free(l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); @@ -196,8 +196,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); listener_stats_init(l); - if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || - ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { diff --git a/src/nng.c b/src/nng.c index e601f785..b3179988 100644 --- a/src/nng.c +++ b/src/nng.c @@ -1116,7 +1116,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + if ((rv = nni_aio_alloc(&aio, (nni_cb) cb, arg)) == 0) { nng_aio_set_timeout(aio, NNG_DURATION_DEFAULT); *app = aio; } @@ -1126,7 +1126,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) void nng_aio_free(nng_aio *aio) { - nni_aio_fini(aio); + nni_aio_free(aio); } void diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index afb12ef6..dea228a1 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -68,7 +68,7 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -80,7 +80,7 @@ bus0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -99,7 +99,7 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -142,10 +142,10 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); nni_mtx_fini(&p->mtx); } @@ -159,10 +159,10 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { bus0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 860ac17f..730e5f5e 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -88,10 +88,10 @@ pair0_pipe_fini(void *arg) { pair0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); } static int @@ -100,10 +100,10 @@ pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock) pair0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair0_putq_cb, p)) != 0)) { pair0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 2838cb5d..b3b64a79 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -69,7 +69,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -88,7 +88,7 @@ pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { pair1_sock_fini(s); return (rv); } @@ -147,10 +147,10 @@ pair1_pipe_fini(void *arg) { pair1_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); nni_msgq_fini(p->sendq); } @@ -161,10 +161,10 @@ pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { pair1_pipe_fini(p); return (NNG_ENOMEM); } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 64b47cef..8feb08b8 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -73,8 +73,8 @@ pull0_pipe_fini(void *arg) { pull0_pipe *p = arg; - nni_aio_fini(p->putq_aio); - nni_aio_fini(p->recv_aio); + nni_aio_free(p->putq_aio); + nni_aio_free(p->recv_aio); } static int @@ -83,8 +83,8 @@ pull0_pipe_init(void *arg, nni_pipe *pipe, void *s) pull0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) { pull0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 5a932ece..90c94af9 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -89,9 +89,9 @@ push0_pipe_fini(void *arg) { push0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_getq); } static int @@ -100,9 +100,9 @@ push0_pipe_init(void *arg, nni_pipe *pipe, void *s) push0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) { push0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index a42e95ff..9b995c33 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -111,8 +111,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -130,8 +130,8 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { pub0_pipe_fini(p); return (rv); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 56da98f8..c5b84313 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -265,7 +265,7 @@ sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -274,7 +274,7 @@ sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) sub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) { sub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c index be300df4..baa4f8eb 100644 --- a/src/protocol/pubsub0/xsub.c +++ b/src/protocol/pubsub0/xsub.c @@ -85,7 +85,7 @@ xsub0_pipe_fini(void *arg) { xsub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -94,7 +94,7 @@ xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) xsub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) { xsub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index a715ab59..a29c3120 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -48,14 +48,14 @@ struct rep0_ctx { // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_mtx lk; - int ttl; - nni_idhash * pipes; - nni_list recvpipes; // list of pipes with data to receive - nni_list recvq; - rep0_ctx ctx; - nni_pollable readable; - nni_pollable writable; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + rep0_ctx ctx; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -63,8 +63,8 @@ struct rep0_pipe { nni_pipe * pipe; rep0_sock * rep; uint32_t id; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable list linkage nni_list sendq; // contexts waiting to send bool busy; @@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); nni_aio_set_msg(aio, NULL); @@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg) rep0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int rep0_pipe_init(void *arg, nni_pipe *pipe, void *s) { rep0_pipe *p = arg; - int rv; - if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { - rep0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p); NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); @@ -329,7 +325,7 @@ rep0_pipe_start(void *arg) } // By definition, we have not received a request yet on this pipe, // so it cannot cause us to become writable. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -340,8 +336,8 @@ rep0_pipe_close(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { @@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); @@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->lk); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } @@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg) size_t len; int hops; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); @@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->pipe); return; } @@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } // schedule another receive - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 879d6ae4..f339e68d 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -272,6 +272,26 @@ test_rep_close_pipe_during_send(void) TEST_NNG_PASS(nng_close(rep)); } +void +test_rep_ctx_recv_aio_stopped(void) +{ + nng_socket rep; + nng_ctx ctx; + nng_aio * aio; + + TEST_NNG_PASS(nng_rep0_open(&rep)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + TEST_NNG_PASS(nng_ctx_open(&ctx, rep)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED); + TEST_NNG_PASS(nng_ctx_close(ctx)); + TEST_NNG_PASS(nng_close(rep)); + nng_aio_free(aio); +} + void test_rep_close_pipe_context_send(void) { @@ -424,6 +444,7 @@ TEST_LIST = { { "rep double recv", test_rep_double_recv }, { "rep close pipe before send", test_rep_close_pipe_before_send }, { "rep close pipe during send", test_rep_close_pipe_during_send }, + { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped }, { "rep close pipe context send", test_rep_close_pipe_context_send }, { "rep close context send", test_rep_close_context_send }, { "rep recv garbage", test_rep_recv_garbage }, diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 33629abc..14da7143 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -184,8 +184,8 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -194,8 +194,8 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) req0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) { req0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 48f74075..308c0f0e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -62,7 +62,7 @@ xrep0_sock_fini(void *arg) { xrep0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->lk); } @@ -75,7 +75,7 @@ xrep0_sock_init(void *arg, nni_sock *sock) nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { xrep0_sock_fini(s); return (rv); } @@ -120,10 +120,10 @@ xrep0_pipe_fini(void *arg) { xrep0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -146,10 +146,10 @@ xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s) // willing to receive replies. Something to think about for the // future.) if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { xrep0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 7455c986..15652f4f 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -96,10 +96,10 @@ xreq0_pipe_fini(void *arg) { xreq0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -108,10 +108,10 @@ xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s) xreq0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) { xreq0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index b4ffc917..06010d99 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -281,8 +281,8 @@ resp0_pipe_fini(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); } static int @@ -291,8 +291,8 @@ resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) resp0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { resp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 8aa05dd4..35a14de7 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -286,9 +286,9 @@ surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -303,9 +303,9 @@ surv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // is best effort, and a deep queue doesn't really do much for us. // Note that surveys can be *outstanding*, but not yet put on the wire. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { surv0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 66b340ee..6318fe8b 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -62,7 +62,7 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -75,7 +75,7 @@ xresp0_sock_init(void *arg, nni_sock *nsock) nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { xresp0_sock_fini(s); return (rv); } @@ -119,10 +119,10 @@ xresp0_pipe_fini(void *arg) { xresp0_pipe *p = arg; - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -133,10 +133,10 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) { xresp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 43c83793..86f912a2 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -60,7 +60,7 @@ xsurv0_sock_fini(void *arg) { xsurv0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -70,7 +70,7 @@ xsurv0_sock_init(void *arg, nni_sock *nsock) xsurv0_sock *s = arg; int rv; - if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { xsurv0_sock_fini(s); return (rv); } @@ -116,10 +116,10 @@ xsurv0_pipe_fini(void *arg) { xsurv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -136,10 +136,10 @@ xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // an expiration with them, so that we could discard any that are // not delivered before their expiration date. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xsurv0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { xsurv0_pipe_fini(p); return (rv); } diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 3c60bd46..ecba84ae 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -91,7 +91,7 @@ http_dial_cb(void *arg) void nni_http_client_fini(nni_http_client *c) { - nni_aio_fini(c->aio); + nni_aio_free(c->aio); nng_stream_dialer_free(c->dialer); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -132,7 +132,7 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) return (rv); } - if ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0) { + if ((rv = nni_aio_alloc(&c->aio, http_dial_cb, c)) != 0) { nni_http_client_fini(c); return (rv); } @@ -251,7 +251,7 @@ http_txn_reap(void *arg) } } nni_http_chunks_free(txn->chunks); - nni_aio_fini(txn->aio); + nni_aio_free(txn->aio); NNI_FREE_STRUCT(txn); } @@ -405,7 +405,7 @@ nni_http_transact_conn( nni_aio_finish_error(aio, NNG_ENOMEM); return; } - if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) { + if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) { NNI_FREE_STRUCT(txn); nni_aio_finish_error(aio, rv); return; @@ -450,7 +450,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req, nni_aio_finish_error(aio, NNG_ENOMEM); return; } - if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) { + if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) { NNI_FREE_STRUCT(txn); nni_aio_finish_error(aio, rv); return; diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 1fc2c34e..583f5eee 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -690,8 +690,8 @@ nni_http_conn_fini(nni_http_conn *conn) } nni_mtx_unlock(&conn->mtx); - nni_aio_fini(conn->wr_aio); - nni_aio_fini(conn->rd_aio); + nni_aio_free(conn->wr_aio); + nni_aio_free(conn->rd_aio); nni_free(conn->rd_buf, conn->rd_bufsz); nni_mtx_fini(&conn->mtx); NNI_FREE_STRUCT(conn); @@ -716,8 +716,8 @@ http_init(nni_http_conn **connp, nng_stream *data) } conn->rd_bufsz = HTTP_BUFSIZE; - if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || - ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { + if (((rv = nni_aio_alloc(&conn->wr_aio, http_wr_cb, conn)) != 0) || + ((rv = nni_aio_alloc(&conn->rd_aio, http_rd_cb, conn)) != 0)) { nni_http_conn_fini(conn); return (rv); } diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 06a93553..9a4ff2af 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -243,10 +243,10 @@ http_sconn_reap(void *arg) } nni_http_req_free(sc->req); nni_http_res_free(sc->res); - nni_aio_fini(sc->rxaio); - nni_aio_fini(sc->txaio); - nni_aio_fini(sc->txdataio); - nni_aio_fini(sc->cbaio); + nni_aio_free(sc->rxaio); + nni_aio_free(sc->txaio); + nni_aio_free(sc->txdataio); + nni_aio_free(sc->cbaio); // Now it is safe to release our reference on the server. nni_mtx_lock(&s->mtx); @@ -746,11 +746,11 @@ http_sconn_init(http_sconn **scp, nng_stream *stream) } if (((rv = nni_http_req_alloc(&sc->req, NULL)) != 0) || - ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) != + ((rv = nni_aio_alloc(&sc->rxaio, http_sconn_rxdone, sc)) != 0) || + ((rv = nni_aio_alloc(&sc->txaio, http_sconn_txdone, sc)) != 0) || + ((rv = nni_aio_alloc(&sc->txdataio, http_sconn_txdatdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) { + ((rv = nni_aio_alloc(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) { // Can't even accept the incoming request. Hard close. http_sconn_close(sc); return (rv); @@ -838,7 +838,7 @@ http_server_fini(nni_http_server *s) nni_mtx_unlock(&s->errors_mtx); nni_mtx_fini(&s->errors_mtx); - nni_aio_fini(s->accaio); + nni_aio_free(s->accaio); nni_mtx_fini(&s->mtx); nni_strfree(s->hostname); NNI_FREE_STRUCT(s); @@ -874,7 +874,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url) nni_mtx_init(&s->errors_mtx); NNI_LIST_INIT(&s->errors, http_error, node); - if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->accaio, http_server_acccb, s)) != 0) { http_server_fini(s); return (rv); } diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c index 02d5d6ce..dd6f28ff 100644 --- a/src/supplemental/tcp/tcp.c +++ b/src/supplemental/tcp/tcp.c @@ -157,8 +157,8 @@ tcp_dialer_free(void *arg) nni_aio_stop(d->resaio); nni_aio_stop(d->conaio); - nni_aio_fini(d->resaio); - nni_aio_fini(d->conaio); + nni_aio_free(d->resaio); + nni_aio_free(d->conaio); if (d->d != NULL) { nni_tcp_dialer_close(d->d); @@ -234,8 +234,8 @@ tcp_dialer_alloc(tcp_dialer **dp) nni_aio_list_init(&d->resaios); nni_aio_list_init(&d->conaios); - if (((rv = nni_aio_init(&d->resaio, tcp_dial_res_cb, d)) != 0) || - ((rv = nni_aio_init(&d->conaio, tcp_dial_con_cb, d)) != 0) || + if (((rv = nni_aio_alloc(&d->resaio, tcp_dial_res_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->conaio, tcp_dial_con_cb, d)) != 0) || ((rv = nni_tcp_dialer_init(&d->d)) != 0)) { tcp_dialer_free(d); return (rv); @@ -446,11 +446,11 @@ nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url) nni_aio_wait(aio); if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } nni_aio_get_sockaddr(aio, &sa); - nni_aio_fini(aio); + nni_aio_free(aio); return (tcp_listener_alloc_addr(lp, &sa)); } diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index d49c9de5..b0ac6ec5 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -290,12 +290,12 @@ tls_reap(void *arg) } nni_aio_stop(tls->tcp_send); nni_aio_stop(tls->tcp_recv); - nni_aio_fini(tls->com.aio); + nni_aio_free(tls->com.aio); // And finalize / free everything. nng_stream_free(tls->tcp); - nni_aio_fini(tls->tcp_send); - nni_aio_fini(tls->tcp_recv); + nni_aio_free(tls->tcp_send); + nni_aio_free(tls->tcp_recv); mbedtls_ssl_free(&tls->ctx); nng_tls_config_free(tls->com.cfg); @@ -349,8 +349,8 @@ nni_tls_start(nng_stream *arg, nng_stream *tcp) tp->tcp = tcp; - if (((rv = nni_aio_init(&tp->tcp_send, tls_send_cb, tp)) != 0) || - ((rv = nni_aio_init(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) { + if (((rv = nni_aio_alloc(&tp->tcp_send, tls_send_cb, tp)) != 0) || + ((rv = nni_aio_alloc(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) { return (rv); } diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index 39a4bff0..7cccc1e9 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Devolutions // @@ -110,7 +110,7 @@ tls_dialer_dial(void *arg, nng_aio *aio) } com = (void *) tls; - if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { + if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); nng_stream_free(tls); return; @@ -394,7 +394,7 @@ tls_listener_accept(void *arg, nng_aio *aio) return; } com = (void *) tls; - if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { + if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); nng_stream_free(tls); return; diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index c7d3622c..df4a0834 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1230,11 +1230,11 @@ ws_fini(void *arg) nni_strfree(ws->reqhdrs); nni_strfree(ws->reshdrs); - nni_aio_fini(ws->rxaio); - nni_aio_fini(ws->txaio); - nni_aio_fini(ws->closeaio); - nni_aio_fini(ws->httpaio); - nni_aio_fini(ws->connaio); + nni_aio_free(ws->rxaio); + nni_aio_free(ws->txaio); + nni_aio_free(ws->closeaio); + nni_aio_free(ws->httpaio); + nni_aio_free(ws->connaio); nni_mtx_fini(&ws->mtx); NNI_FREE_STRUCT(ws); } @@ -1411,11 +1411,11 @@ ws_init(nni_ws **wsp) nni_aio_list_init(&ws->sendq); nni_aio_list_init(&ws->recvq); - if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->connaio, ws_conn_cb, ws)) != 0)) { + if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) { ws_fini(ws); return (rv); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 0d8f12ae..4d5db82d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -145,9 +145,9 @@ ipctran_pipe_fini(void *arg) } nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->conn); if (p->rxmsg) { nni_msg_free(p->rxmsg); @@ -177,9 +177,9 @@ ipctran_pipe_alloc(ipctran_pipe **pipep) return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) { ipctran_pipe_fini(p); return (rv); } @@ -699,8 +699,8 @@ ipctran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -840,7 +840,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) } ep->ndialer = ndialer; - if (((rv = nni_aio_init(&ep->connaio, ipctran_dial_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, ipctran_dial_cb, ep)) != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { ipctran_ep_fini(ep); return (rv); @@ -863,8 +863,8 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) } ep->nlistener = nlistener; - if (((rv = nni_aio_init(&ep->connaio, ipctran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, ipctran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { ipctran_ep_fini(ep); return (rv); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 7748cd8f..32df5102 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -146,9 +146,9 @@ tcptran_pipe_fini(void *arg) nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); @@ -176,9 +176,9 @@ tcptran_pipe_alloc(tcptran_pipe **pipep) return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) { tcptran_pipe_fini(p); return (rv); } @@ -652,8 +652,8 @@ tcptran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -736,7 +736,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) memcpy(src, surl->u_hostname, len); src[len] = '\0'; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_free(src, len + 1); return (rv); } @@ -746,7 +746,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) if ((rv = nni_aio_result(aio)) == 0) { nni_aio_get_sockaddr(aio, sa); } - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(src, len + 1); return (rv); } @@ -904,7 +904,7 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) ep->ndialer = ndialer; if ((rv != 0) || - ((rv = nni_aio_init(&ep->connaio, tcptran_dial_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->connaio, tcptran_dial_cb, ep)) != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) { tcptran_ep_fini(ep); return (rv); @@ -942,8 +942,8 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) } ep->nlistener = nlistener; - if (((rv = nni_aio_init(&ep->connaio, tcptran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, tcptran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, tcptran_timer_cb, ep)) != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { tcptran_ep_fini(ep); return (rv); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 58b43f2c..7869c380 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Devolutions // @@ -151,9 +151,9 @@ tlstran_pipe_fini(void *arg) } nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->tls); nni_msg_free(p->rxmsg); NNI_FREE_STRUCT(p); @@ -170,9 +170,9 @@ tlstran_pipe_alloc(tlstran_pipe **pipep) } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) { tlstran_pipe_fini(p); return (rv); } @@ -626,8 +626,8 @@ tlstran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -709,7 +709,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) memcpy(src, surl->u_hostname, len); src[len] = '\0'; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_free(src, len + 1); return (rv); } @@ -719,7 +719,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) if ((rv = nni_aio_result(aio)) == 0) { nni_aio_get_sockaddr(aio, sa); } - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(src, len + 1); return (rv); } @@ -872,7 +872,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) } if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) || - ((rv = nni_aio_init(&ep->connaio, tlstran_dial_cb, ep)) != 0)) { + ((rv = nni_aio_alloc(&ep->connaio, tlstran_dial_cb, ep)) != 0)) { return (rv); } ep->authmode = NNG_TLS_AUTH_MODE_REQUIRED; @@ -923,8 +923,8 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) || - ((rv = nni_aio_init(&ep->connaio, tlstran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) { + ((rv = nni_aio_alloc(&ep->connaio, tlstran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) { return (rv); } @@ -942,7 +942,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) // be worse than the cost of just waiting here. We always recommend // using local IP addresses rather than names when possible. - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { tlstran_ep_fini(ep); return (rv); } @@ -950,7 +950,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) nni_tcp_resolv(host, url->u_port, af, 1, aio); nni_aio_wait(aio); rv = nni_aio_result(aio); - nni_aio_fini(aio); + nni_aio_free(aio); if ((rv != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) || diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 3424480a..e70b1fd4 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,8 +205,8 @@ wstran_pipe_fini(void *arg) { ws_pipe *p = arg; - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); nng_stream_free(p->ws); nni_mtx_fini(&p->mtx); @@ -238,8 +238,8 @@ wstran_pipe_alloc(ws_pipe **pipep, void *ws) nni_mtx_init(&p->mtx); // Initialize AIOs. - if (((rv = nni_aio_init(&p->txaio, wstran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) { wstran_pipe_fini(p); return (rv); } @@ -381,7 +381,7 @@ wstran_dialer_fini(void *arg) nni_aio_stop(d->connaio); nng_stream_dialer_free(d->dialer); - nni_aio_fini(d->connaio); + nni_aio_free(d->connaio); nni_mtx_fini(&d->mtx); NNI_FREE_STRUCT(d); } @@ -393,7 +393,7 @@ wstran_listener_fini(void *arg) nni_aio_stop(l->accaio); nng_stream_listener_free(l->listener); - nni_aio_fini(l->accaio); + nni_aio_free(l->accaio); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -515,7 +515,7 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) nni_sock_peer_name(s)); if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) || - ((rv = nni_aio_init(&d->connaio, wstran_connect_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) || ((rv = nng_stream_dialer_set_bool( d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_dialer_set_string( @@ -551,7 +551,7 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) nni_sock_proto_name(s)); if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) || - ((rv = nni_aio_init(&l->accaio, wstran_accept_cb, l)) != 0) || + ((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) || ((rv = nng_stream_listener_set_bool( l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_listener_set_string( diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 027d46c2..9d166ad2 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1327,12 +1327,12 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, return (-1); } - if (nni_aio_init(&aio, NULL, NULL) != 0) { + if (nni_aio_alloc(&aio, NULL, NULL) != 0) { // Out of memory return (-1); } if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) { - nni_aio_fini(aio); + nni_aio_free(aio); return (-1); } @@ -1359,7 +1359,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, // care which. (There may be a few thread context switches, but // none of them are going to have to wait for some unbounded time.) nni_aio_wait(aio); - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(hdr, hdr->len + sizeof(*hdr)); return (0); @@ -1406,8 +1406,8 @@ zt_node_destroy(zt_node *ztn) if (ztn->zn_flock != NULL) { nni_file_unlock(ztn->zn_flock); } - nni_aio_fini(ztn->zn_rcv4_aio); - nni_aio_fini(ztn->zn_rcv6_aio); + nni_aio_free(ztn->zn_rcv4_aio); + nni_aio_free(ztn->zn_rcv6_aio); nni_idhash_fini(ztn->zn_eps); nni_idhash_fini(ztn->zn_lpipes); nni_idhash_fini(ztn->zn_rpipes); @@ -1440,8 +1440,8 @@ zt_node_create(zt_node **ztnp, const char *path) NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link); NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link); nni_cv_init(&ztn->zn_bgcv, &zt_lk); - nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); - nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); + nni_aio_alloc(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); + nni_aio_alloc(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) || ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) { @@ -1642,7 +1642,7 @@ zt_pipe_fini(void *arg) zt_pipe *p = arg; zt_node *ztn = p->zp_ztn; - nni_aio_fini(p->zp_ping_aio); + nni_aio_free(p->zp_ping_aio); // This tosses the connection details and all state. nni_mtx_lock(&zt_lk); @@ -1705,7 +1705,7 @@ zt_pipe_alloc( rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); } if ((rv != 0) || - ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { zt_pipe_reap(p); return (rv); } @@ -2078,7 +2078,7 @@ zt_ep_fini(void *arg) { zt_ep *ep = arg; nni_aio_stop(ep->ze_creq_aio); - nni_aio_fini(ep->ze_creq_aio); + nni_aio_free(ep->ze_creq_aio); NNI_FREE_STRUCT(ep); } @@ -2154,7 +2154,7 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer, nni_aio_list_init(&ep->ze_aios); - rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); + rv = nni_aio_alloc(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); if (rv != 0) { zt_ep_fini(ep); return (rv); -- cgit v1.2.3-70-g09d2