diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-08 20:34:26 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-08 21:16:30 -0800 |
| commit | b21d7805523a407a14567017edbdef57ca81781f (patch) | |
| tree | e07f08bdc047ee4dfb057b670766e3de5bf2f981 /src/core | |
| parent | 8479b4c8861c77cfd9eb64e724615605bdd1cbcb (diff) | |
| download | nng-b21d7805523a407a14567017edbdef57ca81781f.tar.gz nng-b21d7805523a407a14567017edbdef57ca81781f.tar.bz2 nng-b21d7805523a407a14567017edbdef57ca81781f.zip | |
fixes #1094 Consider in-lining task and aio
This only does it for rep, but it also has changes that should increase
the overall test coverage for the REP protocol
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 134 | ||||
| -rw-r--r-- | src/core/aio.h | 74 | ||||
| -rw-r--r-- | src/core/device.c | 10 | ||||
| -rw-r--r-- | src/core/dialer.c | 12 | ||||
| -rw-r--r-- | src/core/listener.c | 8 |
5 files changed, 133 insertions, 105 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 9f025f4c..a15bb47b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -61,107 +61,75 @@ static nni_aio *nni_aio_expire_aio; // operations from starting, without waiting for any existing one to // complete, call nni_aio_close. -// An nni_aio is an async I/O handle. -struct nng_aio { - int a_result; // Result code (nng_errno) - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; // Absolute timeout - nni_duration a_timeout; // Relative timeout - - bool a_stop; // shutting down (no new operations) - bool a_sleep; // sleeping with no action - bool a_expire_ok; // expire from sleep is ok - nni_task a_task; - - // Read/write operations. - nni_iov a_iov[8]; - unsigned a_niov; - - // Message operations. - nni_msg *a_msg; - - // User scratch data. Consumers may store values here, which - // must be preserved by providers and the framework. - void *a_user_data[4]; - - // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be - // specified. The semantics of these will vary, and depend on the - // specific operation. - void *a_inputs[4]; - void *a_outputs[4]; - - // Provider-use fields. - nni_aio_cancelfn a_cancel_fn; - void * a_cancel_arg; - nni_list_node a_prov_node; - void * a_prov_extra[4]; // Extra data used by provider - - // Socket address. This turns out to be very useful, as we wind up - // needing socket addresses for numerous connection related routines. - // It would be cleaner to not have this and avoid burning the space, - // but having this hear dramatically simplifies lots of code. - nng_sockaddr a_sockaddr; - - // Expire node. - nni_list_node a_expire_node; -}; - static void nni_aio_expire_add(nni_aio *); -int -nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) +void +nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { - nni_aio *aio; - - if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { - return (NNG_ENOMEM); - } + memset(aio, 0, sizeof(*aio)); nni_task_init(&aio->a_task, NULL, cb, arg); aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; - *aiop = aio; - return (0); } void nni_aio_fini(nni_aio *aio) { - if (aio != NULL) { + nni_aio_cancelfn fn; + void * arg; - nni_aio_cancelfn fn; - void * arg; + // TODO: This probably could just use nni_aio_stop. - // This is like aio_close, but we don't want to dispatch - // the task. And unlike aio_stop, we don't want to wait - // for the task. (Because we implicitly do task_fini.) - nni_mtx_lock(&nni_aio_lk); - fn = aio->a_cancel_fn; - arg = aio->a_cancel_arg; - aio->a_cancel_fn = NULL; - aio->a_cancel_arg = NULL; - aio->a_stop = true; - nni_mtx_unlock(&nni_aio_lk); + // This is like aio_close, but we don't want to dispatch + // the task. And unlike aio_stop, we don't want to wait + // for the task. (Because we implicitly do task_fini.) + nni_mtx_lock(&nni_aio_lk); + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; + nni_mtx_unlock(&nni_aio_lk); - if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); - } + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); + } - // Wait for the aio to be "done"; this ensures that we don't - // destroy an aio from a "normal" completion callback while - // the expiration thread is working. + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. - nni_mtx_lock(&nni_aio_lk); - while (nni_aio_expire_aio == aio) { - if (nni_thr_is_self(&nni_aio_expire_thr)) { - nni_aio_expire_aio = NULL; - break; - } - nni_cv_wait(&nni_aio_expire_cv); + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + // TODO: It should be possible to remove this check! + if (nni_thr_is_self(&nni_aio_expire_thr)) { + nni_aio_expire_aio = NULL; + break; } - nni_mtx_unlock(&nni_aio_lk); + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(&aio->a_task); +} - nni_task_fini(&aio->a_task); +int +nni_aio_alloc(nni_aio **aiop, nni_cb cb, void *arg) +{ + nni_aio *aio; + + if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { + return (NNG_ENOMEM); + } + nni_aio_init(aio, cb, arg); + *aiop = aio; + return (0); +} +void +nni_aio_free(nni_aio *aio) +{ + if (aio != NULL) { + nni_aio_fini(aio); NNI_FREE_STRUCT(aio); } } @@ -189,7 +157,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned niov, const nni_iov *iov) // callback to complete, if still running. It also marks the AIO as // stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other -// callers are not also stopped, before calling nni_aio_fini to release +// callers are not also stopped, before calling nni_aio_free to release // actual memory. void nni_aio_stop(nni_aio *aio) diff --git a/src/core/aio.h b/src/core/aio.h index 83c068b5..c2776bc0 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -22,19 +22,29 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern int nni_aio_init(nni_aio **, nni_cb, void *); +extern void nni_aio_init(nni_aio *, nni_cb, void *arg); -// nni_aio_fini finalizes the aio, releasing resources (locks) -// associated with it. The caller is responsible for ensuring that any -// associated I/O is unscheduled or complete. This is safe to call -// on zero'd memory. +// nni_aio_fini finalizes an aio object, releasing associated resources. +// It waits for the callback to complete. extern void nni_aio_fini(nni_aio *); +// nni_aio_alloc allocates an aio object and initializes it. The callback +// is called with the supplied argument when the operation is complete. +// If NULL is supplied for the callback, then nni_aio_wake is used in its +// place, and the aio is used for the argument. +extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg); + +// nni_aio_free frees the aio, releasing resources (locks) +// associated with it. This is safe to call on zero'd memory. +// This must only be called on an object that was allocated +// with nni_aio_allocate. +extern void nni_aio_free(nni_aio *aio); + // nni_aio_stop cancels any unfinished I/O, running completion callbacks, // but also prevents any new operations from starting (nni_aio_start will -// return NNG_ESTATE). This should be called before nni_aio_fini(). The +// return NNG_ESTATE). This should be called before nni_aio_free(). The // best pattern is to call nni_aio_stop on all linked aios, before calling -// nni_aio_fini on any of them. This function will block until any +// nni_aio_free on any of them. This function will block until any // callbacks are executed, and therefore it should never be executed // from a callback itself. (To abort operations without blocking // use nni_aio_cancel instead.) @@ -162,4 +172,54 @@ extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); + +// An nni_aio is an async I/O handle. The details of this aio structure +// are private to the AIO framework. The structure has the public name +// (nng_aio) so that we minimize the pollution in the public API namespace. +// It is a coding error for anything out side of the AIO framework to access +// any of these members -- the definition is provided here to facilitate +// inlining, but that should be the only use. +struct nng_aio { + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + int a_result; // Result code (nng_errno) + bool a_stop; // Shutting down (no new operations) + bool a_sleep; // Sleeping with no action + bool a_expire_ok; // Expire from sleep is ok + nni_task a_task; + + // Read/write operations. + nni_iov a_iov[8]; + unsigned a_niov; + + // Message operations. + nni_msg *a_msg; + + // User scratch data. Consumers may store values here, which + // must be preserved by providers and the framework. + void *a_user_data[4]; + + // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be + // specified. The semantics of these will vary, and depend on the + // specific operation. + void *a_inputs[4]; + void *a_outputs[4]; + + // Provider-use fields. + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; + nni_list_node a_prov_node; // Linkage on provider list. + void * a_prov_extra[4]; // Extra data used by provider + + // Socket address. This turns out to be very useful, as we wind up + // needing socket addresses for numerous connection related routines. + // It would be cleaner to not have this and avoid burning the space, + // but having this hear dramatically simplifies lots of code. + nng_sockaddr a_sockaddr; + + // Expire node. + nni_list_node a_expire_node; +}; + #endif // CORE_AIO_H diff --git a/src/core/device.c b/src/core/device.c index fee108a8..71480bbc 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -94,7 +94,7 @@ nni_device_fini(nni_device_data *dd) } for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; - nni_aio_fini(p->aio); + nni_aio_free(p->aio); } nni_mtx_fini(&dd->mtx); NNI_FREE_STRUCT(dd); @@ -172,7 +172,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) p->dst = i == 0 ? s2 : s1; p->state = NNI_DEVICE_STATE_INIT; - if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) { + if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) { nni_device_fini(dd); return (rv); } @@ -221,11 +221,11 @@ nni_device(nni_sock *s1, nni_sock *s2) nni_aio * aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { return (rv); } if ((rv = nni_device_init(&dd, s1, s2)) != 0) { - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } nni_device_start(dd, aio); @@ -233,6 +233,6 @@ nni_device(nni_sock *s1, nni_sock *s2) rv = nni_aio_result(aio); nni_device_fini(dd); - nni_aio_fini(aio); + nni_aio_free(aio); return (rv); } diff --git a/src/core/dialer.c b/src/core/dialer.c index e1178783..8a463452 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -60,8 +60,8 @@ nni_dialer_destroy(nni_dialer *d) nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_aio_fini(d->d_con_aio); - nni_aio_fini(d->d_tmo_aio); + nni_aio_free(d->d_con_aio); + nni_aio_free(d->d_tmo_aio); if (d->d_data != NULL) { d->d_ops.d_fini(d->d_data); @@ -204,8 +204,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_mtx_init(&d->d_mtx); dialer_stats_init(d); - if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || - ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || + if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) || + ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { @@ -382,7 +382,7 @@ nni_dialer_start(nni_dialer *d, int flags) if ((flags & NNG_FLAG_NONBLOCK) != 0) { aio = NULL; } else { - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) { nni_atomic_flag_reset(&d->d_started); return (rv); } @@ -397,7 +397,7 @@ nni_dialer_start(nni_dialer *d, int flags) if (aio != NULL) { nni_aio_wait(aio); rv = nni_aio_result(aio); - nni_aio_fini(aio); + nni_aio_free(aio); } return (rv); diff --git a/src/core/listener.c b/src/core/listener.c index d44a5456..87d2c532 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -61,8 +61,8 @@ nni_listener_destroy(nni_listener *l) nni_aio_stop(l->l_acc_aio); nni_aio_stop(l->l_tmo_aio); - nni_aio_fini(l->l_acc_aio); - nni_aio_fini(l->l_tmo_aio); + nni_aio_free(l->l_acc_aio); + nni_aio_free(l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); @@ -196,8 +196,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); listener_stats_init(l); - if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || - ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || + if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) || + ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { |
