diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 219 | ||||
| -rw-r--r-- | src/core/aio.h | 7 |
2 files changed, 147 insertions, 79 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index add51897..cda46671 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,13 +11,17 @@ #include "core/nng_impl.h" #include <string.h> -static nni_mtx nni_aio_lk; -// These are used for expiration. -static nni_cv nni_aio_expire_cv; -static bool nni_aio_expire_exit; -static nni_thr nni_aio_expire_thr; -static nni_list nni_aio_expire_list; -static nni_aio *nni_aio_expire_aio; +struct nni_aio_expire_q { + nni_mtx eq_mtx; + nni_cv eq_cv; + nni_list eq_list; + nni_aio *eq_aio; // currently expiring (task dispatch) + nni_thr eq_thr; + bool eq_exit; +}; + +static nni_aio_expire_q **nni_aio_expire_q_list; +static int nni_aio_expire_q_cnt; // Design notes. // @@ -33,8 +37,13 @@ static nni_aio *nni_aio_expire_aio; // free to examine the aio for list membership, etc. The provider must // not call finish more than once though. // -// A single lock, nni_aio_lk, is used to protect the flags on the AIO, -// as well as the expire list on the AIOs. We will not permit an AIO +// We use an array of expiration queues, each with it's own lock and +// condition variable, and expiration thread. By default this is one +// per CPU core present -- the goal being to reduce overall pressure +// caused by a single lock. The number of queues (and threads) can +// be tuned using the NNG_EXPIRE_THREADS tunable. +// +// We will not permit an AIO // to be marked done if an expiration is outstanding. // // In order to synchronize with the expiration, we record the aio as @@ -97,6 +106,8 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) nni_task_init(&aio->a_task, NULL, cb, arg); aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; + aio->a_expire_q = + nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt]; } void @@ -104,22 +115,23 @@ nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; void * arg; + nni_aio_expire_q *eq = aio->a_expire_q; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) // We also wait if the aio is being expired. - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); aio->a_stop = true; nni_list_node_remove(&aio->a_expire_node); - while (nni_aio_expire_aio == aio) { - nni_cv_wait(&nni_aio_expire_cv); + while (eq->eq_aio == aio) { + nni_cv_wait(&eq->eq_cv); } fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECLOSED); @@ -189,15 +201,16 @@ nni_aio_stop(nni_aio *aio) if (aio != NULL) { nni_aio_cancel_fn fn; void * arg; + nni_aio_expire_q *eq = aio->a_expire_q; - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); nni_list_node_remove(&aio->a_expire_node); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; aio->a_stop = true; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECANCELED); @@ -213,15 +226,16 @@ nni_aio_close(nni_aio *aio) if (aio != NULL) { nni_aio_cancel_fn fn; void * arg; + nni_aio_expire_q *eq = aio->a_expire_q; - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); nni_list_node_remove(&aio->a_expire_node); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; aio->a_stop = true; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECLOSED); @@ -307,8 +321,9 @@ nni_aio_begin(nni_aio *aio) // a bug in the caller. These checks are not technically thread // safe in the event that they are false. Users of race detectors // checks may wish ignore or suppress these checks. + nni_aio_expire_q *eq = aio->a_expire_q; - aio_safe_lock(&nni_aio_lk); + aio_safe_lock(&eq->eq_mtx); NNI_ASSERT(!nni_aio_list_active(aio)); NNI_ASSERT(aio->a_cancel_fn == NULL); @@ -323,9 +338,9 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_cancel_fn = NULL; - aio_safe_unlock(&nni_aio_lk); + aio_safe_unlock(&eq->eq_mtx); - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); // We should not reschedule anything at this point. if (aio->a_stop) { aio->a_result = NNG_ECANCELED; @@ -333,19 +348,21 @@ nni_aio_begin(nni_aio *aio) aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; aio->a_expire_ok = false; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (NNG_ECANCELED); } nni_task_prep(&aio->a_task); - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); return (0); } int nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { + nni_aio_expire_q *eq = aio->a_expire_q; + if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { @@ -362,10 +379,10 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } } - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop) { nni_task_abort(&aio->a_task); - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); } @@ -378,7 +395,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { nni_aio_expire_add(aio); } - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); return (0); } @@ -389,13 +406,15 @@ nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; void * arg; + nni_aio_expire_q *eq = aio->a_expire_q; - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); + nni_list_node_remove(&aio->a_expire_node); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); // Stop any I/O at the provider level. if (fn != NULL) { @@ -409,10 +428,11 @@ static void nni_aio_finish_impl( nni_aio *aio, int rv, size_t count, nni_msg *msg, bool sync) { - nni_mtx_lock(&nni_aio_lk); + nni_aio_expire_q *eq = aio->a_expire_q; - nni_list_node_remove(&aio->a_expire_node); + nni_mtx_lock(&eq->eq_mtx); + nni_list_node_remove(&aio->a_expire_node); aio->a_result = rv; aio->a_count = count; aio->a_cancel_fn = NULL; @@ -423,7 +443,7 @@ nni_aio_finish_impl( aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); if (sync) { nni_task_exec(&aio->a_task); @@ -485,7 +505,7 @@ nni_aio_list_active(nni_aio *aio) static void nni_aio_expire_add(nni_aio *aio) { - nni_list *list = &nni_aio_expire_list; + nni_list *list = &aio->a_expire_q->eq_list; nni_aio * prev; // This is a reverse walk of the list. We're more likely to find @@ -501,22 +521,23 @@ nni_aio_expire_add(nni_aio *aio) // This has the shortest time, so insert at the start. nni_list_prepend(list, aio); // And, as we are the latest, kick the thing. - nni_cv_wake(&nni_aio_expire_cv); + nni_cv_wake(&aio->a_expire_q->eq_cv); } } static void -nni_aio_expire_loop(void *unused) +nni_aio_expire_loop(void *arg) { - nni_list *list = &nni_aio_expire_list; - nni_time now; - - NNI_ARG_UNUSED(unused); + nni_aio_expire_q *q = arg; + nni_list * list = &q->eq_list; + nni_mtx * mtx = &q->eq_mtx; + nni_cv * cv = &q->eq_cv; + nni_time now; nni_thr_set_name(NULL, "nng:aio:expire"); now = nni_clock(); - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(mtx); for (;;) { nni_aio *aio; @@ -524,12 +545,12 @@ nni_aio_expire_loop(void *unused) if ((aio = nni_list_first(list)) == NULL) { - if (nni_aio_expire_exit) { - nni_mtx_unlock(&nni_aio_lk); + if (q->eq_exit) { + nni_mtx_unlock(mtx); return; } - nni_cv_wait(&nni_aio_expire_cv); + nni_cv_wait(cv); now = nni_clock(); continue; @@ -537,7 +558,7 @@ nni_aio_expire_loop(void *unused) if (now < aio->a_expire) { // Unexpired; the list is ordered, so we just wait. - nni_cv_until(&nni_aio_expire_cv, aio->a_expire); + nni_cv_until(cv, aio->a_expire); now = nni_clock(); continue; } @@ -547,29 +568,29 @@ nni_aio_expire_loop(void *unused) nni_list_remove(list, aio); rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - nni_aio_cancel_fn fn = aio->a_cancel_fn; - void * arg = aio->a_cancel_arg; + nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; + void * cancel_arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. - nni_aio_expire_aio = aio; + q->eq_aio = aio; // We let the cancel function handle the completion. // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to it's natural conclusion. - nni_mtx_unlock(&nni_aio_lk); - fn(aio, arg, rv); + nni_mtx_unlock(mtx); + cancel_fn(aio, cancel_arg, rv); // Get updated time before reacquiring lock. now = nni_clock(); - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(mtx); - nni_aio_expire_aio = NULL; - nni_cv_wake(&nni_aio_expire_cv); + q->eq_aio = NULL; + nni_cv_wake(cv); } } @@ -642,16 +663,17 @@ static void nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { NNI_ARG_UNUSED(arg); + nni_aio_expire_q *eq = aio->a_expire_q; - nni_mtx_lock(&nni_aio_lk); + nni_mtx_lock(&eq->eq_mtx); if (!aio->a_sleep) { - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); return; } aio->a_sleep = false; nni_list_node_remove(&aio->a_expire_node); - nni_mtx_unlock(&nni_aio_lk); + nni_mtx_unlock(&eq->eq_mtx); nni_aio_finish_error(aio, rv); } @@ -685,43 +707,86 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } } -void -nni_aio_sys_fini(void) +static void +nni_aio_expire_q_free(nni_aio_expire_q *eq) { - nni_mtx *mtx = &nni_aio_lk; - nni_cv * cv = &nni_aio_expire_cv; - nni_thr *thr = &nni_aio_expire_thr; + if (eq == NULL) { + return; + } + if (!eq->eq_exit) { + nni_mtx_lock(&eq->eq_mtx); + eq->eq_exit = true; + nni_cv_wake(&eq->eq_cv); + nni_mtx_unlock(&eq->eq_mtx); + } - if (!nni_aio_expire_exit) { - nni_mtx_lock(mtx); - nni_aio_expire_exit = true; - nni_cv_wake(cv); - nni_mtx_unlock(mtx); + nni_thr_fini(&eq->eq_thr); + nni_cv_fini(&eq->eq_cv); + nni_mtx_fini(&eq->eq_mtx); + NNI_FREE_STRUCT(eq); +} + +static nni_aio_expire_q * +nni_aio_expire_q_alloc(void) +{ + nni_aio_expire_q *eq; + + if ((eq = NNI_ALLOC_STRUCT(eq)) == NULL) { + return (NULL); + } + nni_mtx_init(&eq->eq_mtx); + nni_cv_init(&eq->eq_cv, &eq->eq_mtx); + NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node); + eq->eq_exit = false; + + if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) { + nni_aio_expire_q_free(eq); + return (NULL); } - nni_thr_fini(thr); - nni_cv_fini(cv); - nni_mtx_fini(mtx); + nni_thr_run(&eq->eq_thr); + return (eq); +} + +void +nni_aio_sys_fini(void) +{ + for (int i = 0; i < nni_aio_expire_q_cnt; i++) { + nni_aio_expire_q_free(nni_aio_expire_q_list[i]); + } + nni_free(nni_aio_expire_q_list, + sizeof(nni_aio_expire_q *) * nni_aio_expire_q_cnt); + nni_aio_expire_q_cnt = 0; + nni_aio_expire_q_list = NULL; } int nni_aio_sys_init(void) { - int rv; - nni_thr *thr = &nni_aio_expire_thr; + int num_thr; - NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node); - nni_mtx_init(&nni_aio_lk); - nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk); + // We create a thread per CPU core for expiration by default. +#ifndef NNG_EXPIRE_THREADS + num_thr = nni_plat_ncpu(); +#else + num_thr = NNG_EXPIRE_THREADS; +#endif + if (num_thr > 256) { + num_thr = 256; + } - nni_aio_expire_exit = false; - rv = nni_thr_init(thr, nni_aio_expire_loop, NULL); - if (rv != 0) { - nni_aio_sys_fini(); - return (rv); + nni_aio_expire_q_list = + nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); + nni_aio_expire_q_cnt = num_thr; + for (int i = 0; i < num_thr; i++) { + nni_aio_expire_q *eq; + if ((eq = nni_aio_expire_q_alloc()) == NULL) { + nni_aio_sys_fini(); + return (NNG_ENOMEM); + } + nni_aio_expire_q_list[i] = eq; } - nni_thr_run(thr); return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index e108e4b8..ba231bd1 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -165,6 +165,8 @@ extern void nni_sleep_aio(nni_duration, nni_aio *); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); +typedef struct nni_aio_expire_q nni_aio_expire_q; + // An nni_aio is an async I/O handle. The details of this aio structure // are private to the AIO framework. The structure has the public name // (nng_aio) so that we minimize the pollution in the public API namespace. @@ -200,8 +202,9 @@ struct nng_aio { nni_list_node a_prov_node; // Linkage on provider list. void * a_prov_extra[2]; // Extra data used by provider - nni_list_node a_expire_node; // Expiration node - nni_reap_node a_reap_node; + nni_aio_expire_q *a_expire_q; + nni_list_node a_expire_node; // Expiration node + nni_reap_node a_reap_node; }; #endif // CORE_AIO_H |
