diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-08 20:34:26 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-08 21:16:30 -0800 |
| commit | b21d7805523a407a14567017edbdef57ca81781f (patch) | |
| tree | e07f08bdc047ee4dfb057b670766e3de5bf2f981 /src | |
| parent | 8479b4c8861c77cfd9eb64e724615605bdd1cbcb (diff) | |
| download | nng-b21d7805523a407a14567017edbdef57ca81781f.tar.gz nng-b21d7805523a407a14567017edbdef57ca81781f.tar.bz2 nng-b21d7805523a407a14567017edbdef57ca81781f.zip | |
fixes #1094 Consider in-lining task and aio
This only does it for rep, but it also has changes that should increase
the overall test coverage for the REP protocol
Diffstat (limited to 'src')
35 files changed, 395 insertions, 350 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 9f025f4c..a15bb47b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -61,107 +61,75 @@ static nni_aio *nni_aio_expire_aio; // operations from starting, without waiting for any existing one to // complete, call nni_aio_close. -// An nni_aio is an async I/O handle. -struct nng_aio { - int a_result; // Result code (nng_errno) - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; // Absolute timeout - nni_duration a_timeout; // Relative timeout - - bool a_stop; // shutting down (no new operations) - bool a_sleep; // sleeping with no action - bool a_expire_ok; // expire from sleep is ok - nni_task a_task; - - // Read/write operations. - nni_iov a_iov[8]; - unsigned a_niov; - - // Message operations. - nni_msg *a_msg; - - // User scratch data. Consumers may store values here, which - // must be preserved by providers and the framework. - void *a_user_data[4]; - - // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be - // specified. The semantics of these will vary, and depend on the - // specific operation. - void *a_inputs[4]; - void *a_outputs[4]; - - // Provider-use fields. - nni_aio_cancelfn a_cancel_fn; - void * a_cancel_arg; - nni_list_node a_prov_node; - void * a_prov_extra[4]; // Extra data used by provider - - // Socket address. This turns out to be very useful, as we wind up - // needing socket addresses for numerous connection related routines. - // It would be cleaner to not have this and avoid burning the space, - // but having this hear dramatically simplifies lots of code. - nng_sockaddr a_sockaddr; - - // Expire node. - nni_list_node a_expire_node; -}; - static void nni_aio_expire_add(nni_aio *); -int -nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) +void +nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { - nni_aio *aio; - - if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { - return (NNG_ENOMEM); - } + memset(aio, 0, sizeof(*aio)); nni_task_init(&aio->a_task, NULL, cb, arg); aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; - *aiop = aio; - return (0); } void nni_aio_fini(nni_aio *aio) { - if (aio != NULL) { + nni_aio_cancelfn fn; + void * arg; - nni_aio_cancelfn fn; - void * arg; + // TODO: This probably could just use nni_aio_stop. - // This is like aio_close, but we don't want to dispatch - // the task. And unlike aio_stop, we don't want to wait - // for the task. (Because we implicitly do task_fini.) - nni_mtx_lock(&nni_aio_lk); - fn = aio->a_cancel_fn; - arg = aio->a_cancel_arg; - aio->a_cancel_fn = NULL; - aio->a_cancel_arg = NULL; - aio->a_stop = true; - nni_mtx_unlock(&nni_aio_lk); + // This is like aio_close, but we don't want to dispatch + // the task. And unlike aio_stop, we don't want to wait + // for the task. (Because we implicitly do task_fini.) + nni_mtx_lock(&nni_aio_lk); + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; + nni_mtx_unlock(&nni_aio_lk); - if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); - } + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); + } - // Wait for the aio to be "done"; this ensures that we don't - // destroy an aio from a "normal" completion callback while - // the expiration thread is working. + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. - nni_mtx_lock(&nni_aio_lk); - while (nni_aio_expire_aio == aio) { - if (nni_thr_is_self(&nni_aio_expire_thr)) { - nni_aio_expire_aio = NULL; - break; - } - nni_cv_wait(&nni_aio_expire_cv); + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + // TODO: It should be possible to remove this check! + if (nni_thr_is_self(&nni_aio_expire_thr)) { + nni_aio_expire_aio = NULL; + break; } - nni_mtx_unlock(&nni_aio_lk); + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(&aio->a_task); +} - nni_task_fini(&aio->a_task); +int +nni_aio_alloc(nni_aio **aiop, nni_cb cb, void *arg) +{ + nni_aio *aio; + + if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { + return (NNG_ENOMEM); + } + nni_aio_init(aio, cb, arg); + *aiop = aio; + return (0); +} +void +nni_aio_free(nni_aio *aio) +{ + if (aio != NULL) { + nni_aio_fini(aio); NNI_FREE_STRUCT(aio); } } @@ -189,7 +157,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned niov, const nni_iov *iov) // callback to complete, if still running. It also marks the AIO as // stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other -// callers are not also stopped, before calling nni_aio_fini to release +// callers are not also stopped, before calling nni_aio_free to release // actual memory. void nni_aio_stop(nni_aio *aio) diff --git a/src/core/aio.h b/src/core/aio.h index 83c068b5..c2776bc0 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -22,19 +22,29 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern int nni_aio_init(nni_aio **, nni_cb, void *); +extern void nni_aio_init(nni_aio *, nni_cb, void *arg); -// nni_aio_fini finalizes the aio, releasing resources (locks) -// associated with it. The caller is responsible for ensuring that any -// associated I/O is unscheduled or complete. This is safe to call -// on zero'd memory. +// nni_aio_fini finalizes an aio object, releasing associated resources. +// It waits for the callback to complete. extern void nni_aio_fini(nni_aio *); +// nni_aio_alloc allocates an aio object and initializes it. The callback +// is called with the supplied argument when the operation is complete. +// If NULL is supplied for the callback, then nni_aio_wake is used in its +// place, and the aio is used for the argument. +extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg); + +// nni_aio_free frees the aio, releasing resources (locks) +// associated with it. This is safe to call on zero'd memory. +// This must only be called on an object that was allocated +// with nni_aio_allocate. +extern void nni_aio_free(nni_aio *aio); + // nni_aio_stop cancels any unfinished I/O, running completion callbacks, // but also prevents any new operations from starting (nni_aio_start will -// return NNG_ESTATE). This should be called before nni_aio_fini(). The +// return NNG_ESTATE). This should be called before nni_aio_free(). The // best pattern is to call nni_aio_stop on all linked aios, before calling -// nni_aio_fini on any of them. This function will block until any +// nni_aio_free on any of them. This function will block until any // callbacks are executed, and therefore it should never be executed // from a callback itself. (To abort operations without blocking // use nni_aio_cancel instead.) @@ -162,4 +172,54 @@ extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); + +// An nni_aio is an async I/O handle. The details of this aio structure +// are private to the AIO framework. The structure has the public name +// (nng_aio) so that we minimize the pollution in the public API namespace. +// It is a coding error for anything out side of the AIO framework to access +// any of these members -- the definition is provided here to facilitate +// inlining, but that should be the only use. +struct nng_aio { + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + int a_result; // Result code (nng_errno) + bool a_stop; // Shutting down (no new operations) + bool a_sleep; // Sleeping with no action + bool a_expire_ok; // Expire from sleep is ok + nni_task a_task; + + // Read/write operations. + nni_iov a_iov[8]; + unsigned a_niov; + + // Message operations. + nni_msg *a_msg; + + // User scratch data. Consumers may store values here, which + // must be preserved by providers and the framework. + void *a_user_data[4]; + + // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be + // specified. The semantics of these will vary, and depend on the + // specific operation. + void *a_inputs[4]; + void *a_outputs[4]; + + // Provider-use fields. + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; + nni_list_node a_prov_node; // Linkage on provider list. + void * a_prov_extra[4]; // Extra data used by provider + + // Socket address. This turns out to be very useful, as we wind up + // needing socket addresses for numerous connection related routines. + // It would be cleaner to not have this and avoid burning the space, + // but having this hear dramatically simplifies lots of code. + nng_sockaddr a_sockaddr; + + // Expire node. + nni_list_node a_expire_node; +}; + #endif // CORE_AIO_H diff --git a/src/core/device.c b/src/core/device.c index fee108a8..71480bbc 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -94,7 +94,7 @@ nni_device_fini(nni_device_data *dd) } for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; - nni_aio_fini(p->aio); + nni_aio_free(p->aio); } nni_mtx_fini(&dd->mtx); NNI_FREE_STRUCT(dd); @@ -172,7 +172,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) p->dst = i == 0 ? s2 : s1; p->state = NNI_DEVICE_STATE_INIT; - if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) { nni_device_fini(dd); return (rv); } @@ -221,11 +221,11 @@ nni_device(nni_sock *s1, nni_sock *s2) nni_aio * aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { return (rv); } if ((rv = nni_device_init(&dd, s1, s2)) != 0) { - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } nni_device_start(dd, aio); @@ -233,6 +233,6 @@ nni_device(nni_sock *s1, nni_sock *s2) rv = nni_aio_result(aio); nni_device_fini(dd); - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } diff --git a/src/core/dialer.c b/src/core/dialer.c index e1178783..8a463452 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -60,8 +60,8 @@ nni_dialer_destroy(nni_dialer *d) nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_aio_fini(d->d_con_aio); - nni_aio_fini(d->d_tmo_aio); + nni_aio_free(d->d_con_aio); + nni_aio_free(d->d_tmo_aio); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); @@ -204,8 +204,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_mtx_init(&d->d_mtx); dialer_stats_init(d); - if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || - ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { @@ -382,7 +382,7 @@ nni_dialer_start(nni_dialer *d, int flags) if ((flags & NNG_FLAG_NONBLOCK) != 0) { aio = NULL; } else { - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_atomic_flag_reset(&d->d_started); return (rv); } @@ -397,7 +397,7 @@ nni_dialer_start(nni_dialer *d, int flags) if (aio != NULL) { nni_aio_wait(aio); rv = nni_aio_result(aio); - nni_aio_fini(aio); + nni_aio_free(aio); } return (rv); diff --git a/src/core/listener.c b/src/core/listener.c index d44a5456..87d2c532 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -61,8 +61,8 @@ nni_listener_destroy(nni_listener *l) nni_aio_stop(l->l_acc_aio); nni_aio_stop(l->l_tmo_aio); - nni_aio_fini(l->l_acc_aio); - nni_aio_fini(l->l_tmo_aio); + nni_aio_free(l->l_acc_aio); + nni_aio_free(l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); @@ -196,8 +196,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); listener_stats_init(l); - if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || - ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { @@ -1116,7 +1116,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + if ((rv = nni_aio_alloc(&aio, (nni_cb) cb, arg)) == 0) { nng_aio_set_timeout(aio, NNG_DURATION_DEFAULT); *app = aio; } @@ -1126,7 +1126,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) void nng_aio_free(nng_aio *aio) { - nni_aio_fini(aio); + nni_aio_free(aio); } void diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index afb12ef6..dea228a1 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -68,7 +68,7 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -80,7 +80,7 @@ bus0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -99,7 +99,7 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { bus0_sock_fini(s); return (rv); } @@ -142,10 +142,10 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); nni_mtx_fini(&p->mtx); } @@ -159,10 +159,10 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { bus0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 860ac17f..730e5f5e 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -88,10 +88,10 @@ pair0_pipe_fini(void *arg) { pair0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); } static int @@ -100,10 +100,10 @@ pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock) pair0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair0_putq_cb, p)) != 0)) { pair0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 2838cb5d..b3b64a79 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -69,7 +69,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -88,7 +88,7 @@ pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) { pair1_sock_fini(s); return (rv); } @@ -147,10 +147,10 @@ pair1_pipe_fini(void *arg) { pair1_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); nni_msgq_fini(p->sendq); } @@ -161,10 +161,10 @@ pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { pair1_pipe_fini(p); return (NNG_ENOMEM); } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 64b47cef..8feb08b8 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -73,8 +73,8 @@ pull0_pipe_fini(void *arg) { pull0_pipe *p = arg; - nni_aio_fini(p->putq_aio); - nni_aio_fini(p->recv_aio); + nni_aio_free(p->putq_aio); + nni_aio_free(p->recv_aio); } static int @@ -83,8 +83,8 @@ pull0_pipe_init(void *arg, nni_pipe *pipe, void *s) pull0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) { pull0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 5a932ece..90c94af9 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -89,9 +89,9 @@ push0_pipe_fini(void *arg) { push0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_getq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_getq); } static int @@ -100,9 +100,9 @@ push0_pipe_init(void *arg, nni_pipe *pipe, void *s) push0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) { push0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index a42e95ff..9b995c33 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -111,8 +111,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -130,8 +130,8 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { pub0_pipe_fini(p); return (rv); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 56da98f8..c5b84313 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -265,7 +265,7 @@ sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -274,7 +274,7 @@ sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) sub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) { sub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c index be300df4..baa4f8eb 100644 --- a/src/protocol/pubsub0/xsub.c +++ b/src/protocol/pubsub0/xsub.c @@ -85,7 +85,7 @@ xsub0_pipe_fini(void *arg) { xsub0_pipe *p = arg; - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_recv); } static int @@ -94,7 +94,7 @@ xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s) xsub0_pipe *p = arg; int rv; - if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) { xsub0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index a715ab59..a29c3120 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -48,14 +48,14 @@ struct rep0_ctx { // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_mtx lk; - int ttl; - nni_idhash * pipes; - nni_list recvpipes; // list of pipes with data to receive - nni_list recvq; - rep0_ctx ctx; - nni_pollable readable; - nni_pollable writable; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + rep0_ctx ctx; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -63,8 +63,8 @@ struct rep0_pipe { nni_pipe * pipe; rep0_sock * rep; uint32_t id; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable list linkage nni_list sendq; // contexts waiting to send bool busy; @@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); nni_aio_set_msg(aio, NULL); @@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg) rep0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int rep0_pipe_init(void *arg, nni_pipe *pipe, void *s) { rep0_pipe *p = arg; - int rv; - if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { - rep0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p); NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); @@ -329,7 +325,7 @@ rep0_pipe_start(void *arg) } // By definition, we have not received a request yet on this pipe, // so it cannot cause us to become writable. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -340,8 +336,8 @@ rep0_pipe_close(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { @@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); @@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->lk); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } @@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg) size_t len; int hops; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); @@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->pipe); return; } @@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } // schedule another receive - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 879d6ae4..f339e68d 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -273,6 +273,26 @@ test_rep_close_pipe_during_send(void) } void +test_rep_ctx_recv_aio_stopped(void) +{ + nng_socket rep; + nng_ctx ctx; + nng_aio * aio; + + TEST_NNG_PASS(nng_rep0_open(&rep)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + TEST_NNG_PASS(nng_ctx_open(&ctx, rep)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED); + TEST_NNG_PASS(nng_ctx_close(ctx)); + TEST_NNG_PASS(nng_close(rep)); + nng_aio_free(aio); +} + +void test_rep_close_pipe_context_send(void) { nng_socket rep; @@ -424,6 +444,7 @@ TEST_LIST = { { "rep double recv", test_rep_double_recv }, { "rep close pipe before send", test_rep_close_pipe_before_send }, { "rep close pipe during send", test_rep_close_pipe_during_send }, + { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped }, { "rep close pipe context send", test_rep_close_pipe_context_send }, { "rep close context send", test_rep_close_context_send }, { "rep recv garbage", test_rep_recv_garbage }, diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 33629abc..14da7143 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -184,8 +184,8 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -194,8 +194,8 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) req0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) { req0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 48f74075..308c0f0e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -62,7 +62,7 @@ xrep0_sock_fini(void *arg) { xrep0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->lk); } @@ -75,7 +75,7 @@ xrep0_sock_init(void *arg, nni_sock *sock) nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { xrep0_sock_fini(s); return (rv); } @@ -120,10 +120,10 @@ xrep0_pipe_fini(void *arg) { xrep0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -146,10 +146,10 @@ xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s) // willing to receive replies. Something to think about for the // future.) if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { xrep0_pipe_fini(p); return (rv); } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 7455c986..15652f4f 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -96,10 +96,10 @@ xreq0_pipe_fini(void *arg) { xreq0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_send); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_send); } static int @@ -108,10 +108,10 @@ xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s) xreq0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) { xreq0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index b4ffc917..06010d99 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -281,8 +281,8 @@ resp0_pipe_fini(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); } static int @@ -291,8 +291,8 @@ resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) resp0_pipe *p = arg; int rv; - if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { resp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 8aa05dd4..35a14de7 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -286,9 +286,9 @@ surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -303,9 +303,9 @@ surv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // is best effort, and a deep queue doesn't really do much for us. // Note that surveys can be *outstanding*, but not yet put on the wire. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { surv0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 66b340ee..6318fe8b 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -62,7 +62,7 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -75,7 +75,7 @@ xresp0_sock_init(void *arg, nni_sock *nsock) nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { + ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { xresp0_sock_fini(s); return (rv); } @@ -119,10 +119,10 @@ xresp0_pipe_fini(void *arg) { xresp0_pipe *p = arg; - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_free(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); nni_msgq_fini(p->sendq); } @@ -133,10 +133,10 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s) int rv; if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) { xresp0_pipe_fini(p); return (rv); } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 43c83793..86f912a2 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -60,7 +60,7 @@ xsurv0_sock_fini(void *arg) { xsurv0_sock *s = arg; - nni_aio_fini(s->aio_getq); + nni_aio_free(s->aio_getq); nni_mtx_fini(&s->mtx); } @@ -70,7 +70,7 @@ xsurv0_sock_init(void *arg, nni_sock *nsock) xsurv0_sock *s = arg; int rv; - if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { xsurv0_sock_fini(s); return (rv); } @@ -116,10 +116,10 @@ xsurv0_pipe_fini(void *arg) { xsurv0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); + nni_aio_free(p->aio_getq); + nni_aio_free(p->aio_send); + nni_aio_free(p->aio_recv); + nni_aio_free(p->aio_putq); nni_msgq_fini(p->sendq); } @@ -136,10 +136,10 @@ xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s) // an expiration with them, so that we could discard any that are // not delivered before their expiration date. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_send, xsurv0_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { xsurv0_pipe_fini(p); return (rv); } diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 3c60bd46..ecba84ae 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -91,7 +91,7 @@ http_dial_cb(void *arg) void nni_http_client_fini(nni_http_client *c) { - nni_aio_fini(c->aio); + nni_aio_free(c->aio); nng_stream_dialer_free(c->dialer); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -132,7 +132,7 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) return (rv); } - if ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0) { + if ((rv = nni_aio_alloc(&c->aio, http_dial_cb, c)) != 0) { nni_http_client_fini(c); return (rv); } @@ -251,7 +251,7 @@ http_txn_reap(void *arg) } } nni_http_chunks_free(txn->chunks); - nni_aio_fini(txn->aio); + nni_aio_free(txn->aio); NNI_FREE_STRUCT(txn); } @@ -405,7 +405,7 @@ nni_http_transact_conn( nni_aio_finish_error(aio, NNG_ENOMEM); return; } - if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) { + if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) { NNI_FREE_STRUCT(txn); nni_aio_finish_error(aio, rv); return; @@ -450,7 +450,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req, nni_aio_finish_error(aio, NNG_ENOMEM); return; } - if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) { + if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) { NNI_FREE_STRUCT(txn); nni_aio_finish_error(aio, rv); return; diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 1fc2c34e..583f5eee 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -690,8 +690,8 @@ nni_http_conn_fini(nni_http_conn *conn) } nni_mtx_unlock(&conn->mtx); - nni_aio_fini(conn->wr_aio); - nni_aio_fini(conn->rd_aio); + nni_aio_free(conn->wr_aio); + nni_aio_free(conn->rd_aio); nni_free(conn->rd_buf, conn->rd_bufsz); nni_mtx_fini(&conn->mtx); NNI_FREE_STRUCT(conn); @@ -716,8 +716,8 @@ http_init(nni_http_conn **connp, nng_stream *data) } conn->rd_bufsz = HTTP_BUFSIZE; - if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) || - ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) { + if (((rv = nni_aio_alloc(&conn->wr_aio, http_wr_cb, conn)) != 0) || + ((rv = nni_aio_alloc(&conn->rd_aio, http_rd_cb, conn)) != 0)) { nni_http_conn_fini(conn); return (rv); } diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 06a93553..9a4ff2af 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -243,10 +243,10 @@ http_sconn_reap(void *arg) } nni_http_req_free(sc->req); nni_http_res_free(sc->res); - nni_aio_fini(sc->rxaio); - nni_aio_fini(sc->txaio); - nni_aio_fini(sc->txdataio); - nni_aio_fini(sc->cbaio); + nni_aio_free(sc->rxaio); + nni_aio_free(sc->txaio); + nni_aio_free(sc->txdataio); + nni_aio_free(sc->cbaio); // Now it is safe to release our reference on the server. nni_mtx_lock(&s->mtx); @@ -746,11 +746,11 @@ http_sconn_init(http_sconn **scp, nng_stream *stream) } if (((rv = nni_http_req_alloc(&sc->req, NULL)) != 0) || - ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) != + ((rv = nni_aio_alloc(&sc->rxaio, http_sconn_rxdone, sc)) != 0) || + ((rv = nni_aio_alloc(&sc->txaio, http_sconn_txdone, sc)) != 0) || + ((rv = nni_aio_alloc(&sc->txdataio, http_sconn_txdatdone, sc)) != 0) || - ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) { + ((rv = nni_aio_alloc(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) { // Can't even accept the incoming request. Hard close. http_sconn_close(sc); return (rv); @@ -838,7 +838,7 @@ http_server_fini(nni_http_server *s) nni_mtx_unlock(&s->errors_mtx); nni_mtx_fini(&s->errors_mtx); - nni_aio_fini(s->accaio); + nni_aio_free(s->accaio); nni_mtx_fini(&s->mtx); nni_strfree(s->hostname); NNI_FREE_STRUCT(s); @@ -874,7 +874,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url) nni_mtx_init(&s->errors_mtx); NNI_LIST_INIT(&s->errors, http_error, node); - if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) { + if ((rv = nni_aio_alloc(&s->accaio, http_server_acccb, s)) != 0) { http_server_fini(s); return (rv); } diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c index 02d5d6ce..dd6f28ff 100644 --- a/src/supplemental/tcp/tcp.c +++ b/src/supplemental/tcp/tcp.c @@ -157,8 +157,8 @@ tcp_dialer_free(void *arg) nni_aio_stop(d->resaio); nni_aio_stop(d->conaio); - nni_aio_fini(d->resaio); - nni_aio_fini(d->conaio); + nni_aio_free(d->resaio); + nni_aio_free(d->conaio); if (d->d != NULL) { nni_tcp_dialer_close(d->d); @@ -234,8 +234,8 @@ tcp_dialer_alloc(tcp_dialer **dp) nni_aio_list_init(&d->resaios); nni_aio_list_init(&d->conaios); - if (((rv = nni_aio_init(&d->resaio, tcp_dial_res_cb, d)) != 0) || - ((rv = nni_aio_init(&d->conaio, tcp_dial_con_cb, d)) != 0) || + if (((rv = nni_aio_alloc(&d->resaio, tcp_dial_res_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->conaio, tcp_dial_con_cb, d)) != 0) || ((rv = nni_tcp_dialer_init(&d->d)) != 0)) { tcp_dialer_free(d); return (rv); @@ -446,11 +446,11 @@ nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url) nni_aio_wait(aio); if ((rv = nni_aio_result(aio)) != 0) { - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } nni_aio_get_sockaddr(aio, &sa); - nni_aio_fini(aio); + nni_aio_free(aio); return (tcp_listener_alloc_addr(lp, &sa)); } diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index d49c9de5..b0ac6ec5 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -290,12 +290,12 @@ tls_reap(void *arg) } nni_aio_stop(tls->tcp_send); nni_aio_stop(tls->tcp_recv); - nni_aio_fini(tls->com.aio); + nni_aio_free(tls->com.aio); // And finalize / free everything. nng_stream_free(tls->tcp); - nni_aio_fini(tls->tcp_send); - nni_aio_fini(tls->tcp_recv); + nni_aio_free(tls->tcp_send); + nni_aio_free(tls->tcp_recv); mbedtls_ssl_free(&tls->ctx); nng_tls_config_free(tls->com.cfg); @@ -349,8 +349,8 @@ nni_tls_start(nng_stream *arg, nng_stream *tcp) tp->tcp = tcp; - if (((rv = nni_aio_init(&tp->tcp_send, tls_send_cb, tp)) != 0) || - ((rv = nni_aio_init(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) { + if (((rv = nni_aio_alloc(&tp->tcp_send, tls_send_cb, tp)) != 0) || + ((rv = nni_aio_alloc(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) { return (rv); } diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index 39a4bff0..7cccc1e9 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -110,7 +110,7 @@ tls_dialer_dial(void *arg, nng_aio *aio) } com = (void *) tls; - if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { + if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); nng_stream_free(tls); return; @@ -394,7 +394,7 @@ tls_listener_accept(void *arg, nng_aio *aio) return; } com = (void *) tls; - if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { + if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); nng_stream_free(tls); return; diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index c7d3622c..df4a0834 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1230,11 +1230,11 @@ ws_fini(void *arg) nni_strfree(ws->reqhdrs); nni_strfree(ws->reshdrs); - nni_aio_fini(ws->rxaio); - nni_aio_fini(ws->txaio); - nni_aio_fini(ws->closeaio); - nni_aio_fini(ws->httpaio); - nni_aio_fini(ws->connaio); + nni_aio_free(ws->rxaio); + nni_aio_free(ws->txaio); + nni_aio_free(ws->closeaio); + nni_aio_free(ws->httpaio); + nni_aio_free(ws->connaio); nni_mtx_fini(&ws->mtx); NNI_FREE_STRUCT(ws); } @@ -1411,11 +1411,11 @@ ws_init(nni_ws **wsp) nni_aio_list_init(&ws->sendq); nni_aio_list_init(&ws->recvq); - if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->connaio, ws_conn_cb, ws)) != 0)) { + if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) || + ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) { ws_fini(ws); return (rv); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 0d8f12ae..4d5db82d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -145,9 +145,9 @@ ipctran_pipe_fini(void *arg) } nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->conn); if (p->rxmsg) { nni_msg_free(p->rxmsg); @@ -177,9 +177,9 @@ ipctran_pipe_alloc(ipctran_pipe **pipep) return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) { ipctran_pipe_fini(p); return (rv); } @@ -699,8 +699,8 @@ ipctran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -840,7 +840,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) } ep->ndialer = ndialer; - if (((rv = nni_aio_init(&ep->connaio, ipctran_dial_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, ipctran_dial_cb, ep)) != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { ipctran_ep_fini(ep); return (rv); @@ -863,8 +863,8 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) } ep->nlistener = nlistener; - if (((rv = nni_aio_init(&ep->connaio, ipctran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, ipctran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { ipctran_ep_fini(ep); return (rv); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 7748cd8f..32df5102 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -146,9 +146,9 @@ tcptran_pipe_fini(void *arg) nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); @@ -176,9 +176,9 @@ tcptran_pipe_alloc(tcptran_pipe **pipep) return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) { tcptran_pipe_fini(p); return (rv); } @@ -652,8 +652,8 @@ tcptran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -736,7 +736,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) memcpy(src, surl->u_hostname, len); src[len] = '\0'; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_free(src, len + 1); return (rv); } @@ -746,7 +746,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) if ((rv = nni_aio_result(aio)) == 0) { nni_aio_get_sockaddr(aio, sa); } - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(src, len + 1); return (rv); } @@ -904,7 +904,7 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) ep->ndialer = ndialer; if ((rv != 0) || - ((rv = nni_aio_init(&ep->connaio, tcptran_dial_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->connaio, tcptran_dial_cb, ep)) != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) { tcptran_ep_fini(ep); return (rv); @@ -942,8 +942,8 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) } ep->nlistener = nlistener; - if (((rv = nni_aio_init(&ep->connaio, tcptran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->connaio, tcptran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, tcptran_timer_cb, ep)) != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { tcptran_ep_fini(ep); return (rv); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 58b43f2c..7869c380 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -151,9 +151,9 @@ tlstran_pipe_fini(void *arg) } nni_mtx_unlock(&ep->mtx); } - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); - nni_aio_fini(p->negoaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + nni_aio_free(p->negoaio); nng_stream_free(p->tls); nni_msg_free(p->rxmsg); NNI_FREE_STRUCT(p); @@ -170,9 +170,9 @@ tlstran_pipe_alloc(tlstran_pipe **pipep) } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, tlstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) { tlstran_pipe_fini(p); return (rv); } @@ -626,8 +626,8 @@ tlstran_ep_fini(void *arg) nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_fini(ep->timeaio); - nni_aio_fini(ep->connaio); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -709,7 +709,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) memcpy(src, surl->u_hostname, len); src[len] = '\0'; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_free(src, len + 1); return (rv); } @@ -719,7 +719,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) if ((rv = nni_aio_result(aio)) == 0) { nni_aio_get_sockaddr(aio, sa); } - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(src, len + 1); return (rv); } @@ -872,7 +872,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) } if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) || - ((rv = nni_aio_init(&ep->connaio, tlstran_dial_cb, ep)) != 0)) { + ((rv = nni_aio_alloc(&ep->connaio, tlstran_dial_cb, ep)) != 0)) { return (rv); } ep->authmode = NNG_TLS_AUTH_MODE_REQUIRED; @@ -923,8 +923,8 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) || - ((rv = nni_aio_init(&ep->connaio, tlstran_accept_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) { + ((rv = nni_aio_alloc(&ep->connaio, tlstran_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) { return (rv); } @@ -942,7 +942,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) // be worse than the cost of just waiting here. We always recommend // using local IP addresses rather than names when possible. - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { tlstran_ep_fini(ep); return (rv); } @@ -950,7 +950,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) nni_tcp_resolv(host, url->u_port, af, 1, aio); nni_aio_wait(aio); rv = nni_aio_result(aio); - nni_aio_fini(aio); + nni_aio_free(aio); if ((rv != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) || diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 3424480a..e70b1fd4 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,8 +205,8 @@ wstran_pipe_fini(void *arg) { ws_pipe *p = arg; - nni_aio_fini(p->rxaio); - nni_aio_fini(p->txaio); + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); nng_stream_free(p->ws); nni_mtx_fini(&p->mtx); @@ -238,8 +238,8 @@ wstran_pipe_alloc(ws_pipe **pipep, void *ws) nni_mtx_init(&p->mtx); // Initialize AIOs. - if (((rv = nni_aio_init(&p->txaio, wstran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) { + if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) { wstran_pipe_fini(p); return (rv); } @@ -381,7 +381,7 @@ wstran_dialer_fini(void *arg) nni_aio_stop(d->connaio); nng_stream_dialer_free(d->dialer); - nni_aio_fini(d->connaio); + nni_aio_free(d->connaio); nni_mtx_fini(&d->mtx); NNI_FREE_STRUCT(d); } @@ -393,7 +393,7 @@ wstran_listener_fini(void *arg) nni_aio_stop(l->accaio); nng_stream_listener_free(l->listener); - nni_aio_fini(l->accaio); + nni_aio_free(l->accaio); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -515,7 +515,7 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) nni_sock_peer_name(s)); if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) || - ((rv = nni_aio_init(&d->connaio, wstran_connect_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) || ((rv = nng_stream_dialer_set_bool( d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_dialer_set_string( @@ -551,7 +551,7 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) nni_sock_proto_name(s)); if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) || - ((rv = nni_aio_init(&l->accaio, wstran_accept_cb, l)) != 0) || + ((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) || ((rv = nng_stream_listener_set_bool( l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_listener_set_string( diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 027d46c2..9d166ad2 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1327,12 +1327,12 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, return (-1); } - if (nni_aio_init(&aio, NULL, NULL) != 0) { + if (nni_aio_alloc(&aio, NULL, NULL) != 0) { // Out of memory return (-1); } if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) { - nni_aio_fini(aio); + nni_aio_free(aio); return (-1); } @@ -1359,7 +1359,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, // care which. (There may be a few thread context switches, but // none of them are going to have to wait for some unbounded time.) nni_aio_wait(aio); - nni_aio_fini(aio); + nni_aio_free(aio); nni_free(hdr, hdr->len + sizeof(*hdr)); return (0); @@ -1406,8 +1406,8 @@ zt_node_destroy(zt_node *ztn) if (ztn->zn_flock != NULL) { nni_file_unlock(ztn->zn_flock); } - nni_aio_fini(ztn->zn_rcv4_aio); - nni_aio_fini(ztn->zn_rcv6_aio); + nni_aio_free(ztn->zn_rcv4_aio); + nni_aio_free(ztn->zn_rcv6_aio); nni_idhash_fini(ztn->zn_eps); nni_idhash_fini(ztn->zn_lpipes); nni_idhash_fini(ztn->zn_rpipes); @@ -1440,8 +1440,8 @@ zt_node_create(zt_node **ztnp, const char *path) NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link); NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link); nni_cv_init(&ztn->zn_bgcv, &zt_lk); - nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); - nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); + nni_aio_alloc(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); + nni_aio_alloc(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) || ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) { @@ -1642,7 +1642,7 @@ zt_pipe_fini(void *arg) zt_pipe *p = arg; zt_node *ztn = p->zp_ztn; - nni_aio_fini(p->zp_ping_aio); + nni_aio_free(p->zp_ping_aio); // This tosses the connection details and all state. nni_mtx_lock(&zt_lk); @@ -1705,7 +1705,7 @@ zt_pipe_alloc( rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); } if ((rv != 0) || - ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { + ((rv = nni_aio_alloc(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { zt_pipe_reap(p); return (rv); } @@ -2078,7 +2078,7 @@ zt_ep_fini(void *arg) { zt_ep *ep = arg; nni_aio_stop(ep->ze_creq_aio); - nni_aio_fini(ep->ze_creq_aio); + nni_aio_free(ep->ze_creq_aio); NNI_FREE_STRUCT(ep); } @@ -2154,7 +2154,7 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer, nni_aio_list_init(&ep->ze_aios); - rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); + rv = nni_aio_alloc(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); if (rv != 0) { zt_ep_fini(ep); return (rv); |
