aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c219
-rw-r--r--src/core/aio.h7
2 files changed, 147 insertions, 79 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index add51897..cda46671 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -11,13 +11,17 @@
#include "core/nng_impl.h"
#include <string.h>
-static nni_mtx nni_aio_lk;
-// These are used for expiration.
-static nni_cv nni_aio_expire_cv;
-static bool nni_aio_expire_exit;
-static nni_thr nni_aio_expire_thr;
-static nni_list nni_aio_expire_list;
-static nni_aio *nni_aio_expire_aio;
+struct nni_aio_expire_q {
+ nni_mtx eq_mtx;
+ nni_cv eq_cv;
+ nni_list eq_list;
+ nni_aio *eq_aio; // currently expiring (task dispatch)
+ nni_thr eq_thr;
+ bool eq_exit;
+};
+
+static nni_aio_expire_q **nni_aio_expire_q_list;
+static int nni_aio_expire_q_cnt;
// Design notes.
//
@@ -33,8 +37,13 @@ static nni_aio *nni_aio_expire_aio;
// free to examine the aio for list membership, etc. The provider must
// not call finish more than once though.
//
-// A single lock, nni_aio_lk, is used to protect the flags on the AIO,
-// as well as the expire list on the AIOs. We will not permit an AIO
+// We use an array of expiration queues, each with it's own lock and
+// condition variable, and expiration thread. By default this is one
+// per CPU core present -- the goal being to reduce overall pressure
+// caused by a single lock. The number of queues (and threads) can
+// be tuned using the NNG_EXPIRE_THREADS tunable.
+//
+// We will not permit an AIO
// to be marked done if an expiration is outstanding.
//
// In order to synchronize with the expiration, we record the aio as
@@ -97,6 +106,8 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
nni_task_init(&aio->a_task, NULL, cb, arg);
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
+ aio->a_expire_q =
+ nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt];
}
void
@@ -104,22 +115,23 @@ nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
void * arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
// This is like aio_close, but we don't want to dispatch
// the task. And unlike aio_stop, we don't want to wait
// for the task. (Because we implicitly do task_fini.)
// We also wait if the aio is being expired.
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
nni_list_node_remove(&aio->a_expire_node);
- while (nni_aio_expire_aio == aio) {
- nni_cv_wait(&nni_aio_expire_cv);
+ while (eq->eq_aio == aio) {
+ nni_cv_wait(&eq->eq_cv);
}
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
fn(aio, arg, NNG_ECLOSED);
@@ -189,15 +201,16 @@ nni_aio_stop(nni_aio *aio)
if (aio != NULL) {
nni_aio_cancel_fn fn;
void * arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
nni_list_node_remove(&aio->a_expire_node);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
aio->a_stop = true;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
fn(aio, arg, NNG_ECANCELED);
@@ -213,15 +226,16 @@ nni_aio_close(nni_aio *aio)
if (aio != NULL) {
nni_aio_cancel_fn fn;
void * arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
nni_list_node_remove(&aio->a_expire_node);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
aio->a_stop = true;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
fn(aio, arg, NNG_ECLOSED);
@@ -307,8 +321,9 @@ nni_aio_begin(nni_aio *aio)
// a bug in the caller. These checks are not technically thread
// safe in the event that they are false. Users of race detectors
// checks may wish ignore or suppress these checks.
+ nni_aio_expire_q *eq = aio->a_expire_q;
- aio_safe_lock(&nni_aio_lk);
+ aio_safe_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
@@ -323,9 +338,9 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;
- aio_safe_unlock(&nni_aio_lk);
+ aio_safe_unlock(&eq->eq_mtx);
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
// We should not reschedule anything at this point.
if (aio->a_stop) {
aio->a_result = NNG_ECANCELED;
@@ -333,19 +348,21 @@ nni_aio_begin(nni_aio *aio)
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_expire_ok = false;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (NNG_ECANCELED);
}
nni_task_prep(&aio->a_task);
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
return (0);
}
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
+ nni_aio_expire_q *eq = aio->a_expire_q;
+
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
@@ -362,10 +379,10 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
}
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop) {
nni_task_abort(&aio->a_task);
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
}
@@ -378,7 +395,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
nni_aio_expire_add(aio);
}
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
return (0);
}
@@ -389,13 +406,15 @@ nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
void * arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
+ nni_list_node_remove(&aio->a_expire_node);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
// Stop any I/O at the provider level.
if (fn != NULL) {
@@ -409,10 +428,11 @@ static void
nni_aio_finish_impl(
nni_aio *aio, int rv, size_t count, nni_msg *msg, bool sync)
{
- nni_mtx_lock(&nni_aio_lk);
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_list_node_remove(&aio->a_expire_node);
+ nni_mtx_lock(&eq->eq_mtx);
+ nni_list_node_remove(&aio->a_expire_node);
aio->a_result = rv;
aio->a_count = count;
aio->a_cancel_fn = NULL;
@@ -423,7 +443,7 @@ nni_aio_finish_impl(
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
if (sync) {
nni_task_exec(&aio->a_task);
@@ -485,7 +505,7 @@ nni_aio_list_active(nni_aio *aio)
static void
nni_aio_expire_add(nni_aio *aio)
{
- nni_list *list = &nni_aio_expire_list;
+ nni_list *list = &aio->a_expire_q->eq_list;
nni_aio * prev;
// This is a reverse walk of the list. We're more likely to find
@@ -501,22 +521,23 @@ nni_aio_expire_add(nni_aio *aio)
// This has the shortest time, so insert at the start.
nni_list_prepend(list, aio);
// And, as we are the latest, kick the thing.
- nni_cv_wake(&nni_aio_expire_cv);
+ nni_cv_wake(&aio->a_expire_q->eq_cv);
}
}
static void
-nni_aio_expire_loop(void *unused)
+nni_aio_expire_loop(void *arg)
{
- nni_list *list = &nni_aio_expire_list;
- nni_time now;
-
- NNI_ARG_UNUSED(unused);
+ nni_aio_expire_q *q = arg;
+ nni_list * list = &q->eq_list;
+ nni_mtx * mtx = &q->eq_mtx;
+ nni_cv * cv = &q->eq_cv;
+ nni_time now;
nni_thr_set_name(NULL, "nng:aio:expire");
now = nni_clock();
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(mtx);
for (;;) {
nni_aio *aio;
@@ -524,12 +545,12 @@ nni_aio_expire_loop(void *unused)
if ((aio = nni_list_first(list)) == NULL) {
- if (nni_aio_expire_exit) {
- nni_mtx_unlock(&nni_aio_lk);
+ if (q->eq_exit) {
+ nni_mtx_unlock(mtx);
return;
}
- nni_cv_wait(&nni_aio_expire_cv);
+ nni_cv_wait(cv);
now = nni_clock();
continue;
@@ -537,7 +558,7 @@ nni_aio_expire_loop(void *unused)
if (now < aio->a_expire) {
// Unexpired; the list is ordered, so we just wait.
- nni_cv_until(&nni_aio_expire_cv, aio->a_expire);
+ nni_cv_until(cv, aio->a_expire);
now = nni_clock();
continue;
}
@@ -547,29 +568,29 @@ nni_aio_expire_loop(void *unused)
nni_list_remove(list, aio);
rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- nni_aio_cancel_fn fn = aio->a_cancel_fn;
- void * arg = aio->a_cancel_arg;
+ nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
+ void * cancel_arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
aio->a_cancel_arg = NULL;
// Place a temporary hold on the aio. This prevents it
// from being destroyed.
- nni_aio_expire_aio = aio;
+ q->eq_aio = aio;
// We let the cancel function handle the completion.
// If there is no cancellation function, then we cannot
// terminate the aio - we've tried, but it has to run
// to it's natural conclusion.
- nni_mtx_unlock(&nni_aio_lk);
- fn(aio, arg, rv);
+ nni_mtx_unlock(mtx);
+ cancel_fn(aio, cancel_arg, rv);
// Get updated time before reacquiring lock.
now = nni_clock();
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(mtx);
- nni_aio_expire_aio = NULL;
- nni_cv_wake(&nni_aio_expire_cv);
+ q->eq_aio = NULL;
+ nni_cv_wake(cv);
}
}
@@ -642,16 +663,17 @@ static void
nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
{
NNI_ARG_UNUSED(arg);
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_mtx_lock(&nni_aio_lk);
+ nni_mtx_lock(&eq->eq_mtx);
if (!aio->a_sleep) {
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
return;
}
aio->a_sleep = false;
nni_list_node_remove(&aio->a_expire_node);
- nni_mtx_unlock(&nni_aio_lk);
+ nni_mtx_unlock(&eq->eq_mtx);
nni_aio_finish_error(aio, rv);
}
@@ -685,43 +707,86 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
}
-void
-nni_aio_sys_fini(void)
+static void
+nni_aio_expire_q_free(nni_aio_expire_q *eq)
{
- nni_mtx *mtx = &nni_aio_lk;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_thr *thr = &nni_aio_expire_thr;
+ if (eq == NULL) {
+ return;
+ }
+ if (!eq->eq_exit) {
+ nni_mtx_lock(&eq->eq_mtx);
+ eq->eq_exit = true;
+ nni_cv_wake(&eq->eq_cv);
+ nni_mtx_unlock(&eq->eq_mtx);
+ }
- if (!nni_aio_expire_exit) {
- nni_mtx_lock(mtx);
- nni_aio_expire_exit = true;
- nni_cv_wake(cv);
- nni_mtx_unlock(mtx);
+ nni_thr_fini(&eq->eq_thr);
+ nni_cv_fini(&eq->eq_cv);
+ nni_mtx_fini(&eq->eq_mtx);
+ NNI_FREE_STRUCT(eq);
+}
+
+static nni_aio_expire_q *
+nni_aio_expire_q_alloc(void)
+{
+ nni_aio_expire_q *eq;
+
+ if ((eq = NNI_ALLOC_STRUCT(eq)) == NULL) {
+ return (NULL);
+ }
+ nni_mtx_init(&eq->eq_mtx);
+ nni_cv_init(&eq->eq_cv, &eq->eq_mtx);
+ NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node);
+ eq->eq_exit = false;
+
+ if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) {
+ nni_aio_expire_q_free(eq);
+ return (NULL);
}
- nni_thr_fini(thr);
- nni_cv_fini(cv);
- nni_mtx_fini(mtx);
+ nni_thr_run(&eq->eq_thr);
+ return (eq);
+}
+
+void
+nni_aio_sys_fini(void)
+{
+ for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
+ nni_aio_expire_q_free(nni_aio_expire_q_list[i]);
+ }
+ nni_free(nni_aio_expire_q_list,
+ sizeof(nni_aio_expire_q *) * nni_aio_expire_q_cnt);
+ nni_aio_expire_q_cnt = 0;
+ nni_aio_expire_q_list = NULL;
}
int
nni_aio_sys_init(void)
{
- int rv;
- nni_thr *thr = &nni_aio_expire_thr;
+ int num_thr;
- NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
- nni_mtx_init(&nni_aio_lk);
- nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
+ // We create a thread per CPU core for expiration by default.
+#ifndef NNG_EXPIRE_THREADS
+ num_thr = nni_plat_ncpu();
+#else
+ num_thr = NNG_EXPIRE_THREADS;
+#endif
+ if (num_thr > 256) {
+ num_thr = 256;
+ }
- nni_aio_expire_exit = false;
- rv = nni_thr_init(thr, nni_aio_expire_loop, NULL);
- if (rv != 0) {
- nni_aio_sys_fini();
- return (rv);
+ nni_aio_expire_q_list =
+ nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
+ nni_aio_expire_q_cnt = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ nni_aio_expire_q *eq;
+ if ((eq = nni_aio_expire_q_alloc()) == NULL) {
+ nni_aio_sys_fini();
+ return (NNG_ENOMEM);
+ }
+ nni_aio_expire_q_list[i] = eq;
}
- nni_thr_run(thr);
return (0);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index e108e4b8..ba231bd1 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -165,6 +165,8 @@ extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
+typedef struct nni_aio_expire_q nni_aio_expire_q;
+
// An nni_aio is an async I/O handle. The details of this aio structure
// are private to the AIO framework. The structure has the public name
// (nng_aio) so that we minimize the pollution in the public API namespace.
@@ -200,8 +202,9 @@ struct nng_aio {
nni_list_node a_prov_node; // Linkage on provider list.
void * a_prov_extra[2]; // Extra data used by provider
- nni_list_node a_expire_node; // Expiration node
- nni_reap_node a_reap_node;
+ nni_aio_expire_q *a_expire_q;
+ nni_list_node a_expire_node; // Expiration node
+ nni_reap_node a_reap_node;
};
#endif // CORE_AIO_H