aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-08-09 22:39:46 -0700
committerGarrett D'Amore <garrett@damore.org>2021-08-09 22:39:46 -0700
commiteca7cb3ac170e159f5a5b79f68b1890409f7179e (patch)
tree1ae94d6ed3ec2b66af3c745dd89e626bfcc07b7c /src/core/aio.c
parent22f089b6b5ebe3be380c33c926990fe891ae0da9 (diff)
downloadnng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.tar.gz
nng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.tar.bz2
nng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.zip
fixes #1488 aio expiration list performance work needed
There were several problems with the array implementation, both from performance and from correctness. This corrects those errors (hopefully) and restores the expiration lists as linked lists.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c198
1 files changed, 92 insertions, 106 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 8ca00dd6..dfab8f60 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,14 +12,13 @@
#include <string.h>
struct nni_aio_expire_q {
- nni_mtx eq_mtx;
- nni_cv eq_cv;
- nni_aio **eq_list;
- uint32_t eq_len;
- uint32_t eq_cap;
- nni_aio * eq_aio; // currently expiring (task dispatch)
- nni_thr eq_thr;
- bool eq_exit;
+ nni_mtx eq_mtx;
+ nni_cv eq_cv;
+ nni_list eq_list;
+ uint32_t eq_len;
+ nni_thr eq_thr;
+ nni_time eq_next; // next expiration
+ bool eq_exit;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -117,7 +116,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
// This is like aio_close, but we don't want to dispatch
@@ -126,10 +125,10 @@ nni_aio_fini(nni_aio *aio)
// We also wait if the aio is being expired.
nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
- nni_aio_expire_rm(aio);
- while (eq->eq_aio == aio) {
+ while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -203,7 +202,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -228,7 +227,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -407,7 +406,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -508,125 +507,114 @@ static void
nni_aio_expire_add(nni_aio *aio)
{
nni_aio_expire_q *eq = aio->a_expire_q;
- if (eq->eq_len >= eq->eq_cap) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *));
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap *= 2;
- }
- eq->eq_list[eq->eq_len++] = aio;
- // Fire the latest aio, but it cames with performance punishment
- nni_cv_wake(&eq->eq_cv);
+ nni_list_append(&eq->eq_list, aio);
+
+ if (eq->eq_next > aio->a_expire) {
+ eq->eq_next = aio->a_expire;
+ nni_cv_wake(&eq->eq_cv);
+ }
}
static void
nni_aio_expire_rm(nni_aio *aio)
{
- nni_aio_expire_q *eq = aio->a_expire_q;
-
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- if (aio == eq->eq_list[i]) {
- eq->eq_list[i] = eq->eq_list[eq->eq_len - 1];
- eq->eq_len--;
- break;
- }
- }
+ nni_list_node_remove(&aio->a_expire_node);
- if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4);
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap /= 4;
- }
+ // If this item is the one that is going to wake the loop,
+ // don't worry about it. It will wake up normally, or when we
+ // add a new aio to it. Worst case is just one spurious wake up,
+ // which we'd need to do anyway.
}
static void
nni_aio_expire_loop(void *arg)
{
nni_aio_expire_q *q = arg;
- nni_mtx * mtx = &q->eq_mtx;
- nni_cv * cv = &q->eq_cv;
- nni_aio ** list;
+ nni_mtx *mtx = &q->eq_mtx;
+ nni_cv *cv = &q->eq_cv;
nni_time now;
- uint32_t aio_idx;
+ uint32_t exp_idx;
+ nni_aio *expires[NNI_EXPIRE_BATCH];
nni_thr_set_name(NULL, "nng:aio:expire");
- now = nni_clock();
nni_mtx_lock(mtx);
for (;;) {
nni_aio *aio;
int rv;
-
- if (q->eq_len == 0) {
-
- if (q->eq_exit) {
- nni_mtx_unlock(mtx);
- return;
- }
-
- nni_cv_wait(cv);
-
- now = nni_clock();
+ nni_time next;
+
+ next = q->eq_next;
+ now = nni_clock();
+
+ // Each time we wake up, we scan the entire list of elements.
+ // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements
+ // (a batch) to a saved array of things we are going to cancel.
+ // This mostly runs in O(n), provided you don't have many
+ // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously.
+ aio = nni_list_first(&q->eq_list);
+ if ((aio == NULL) && (q->eq_exit)) {
+ nni_mtx_unlock(mtx);
+ return;
+ }
+ if (now < next) {
+ // Early wake up (just to reschedule), no need to
+ // rescan the list. This is an optimization.
+ nni_cv_until(cv, next);
continue;
}
-
- // Find the timer with min expire time.
- list = q->eq_list;
- aio_idx = 0;
- aio = list[aio_idx];
- for (uint32_t i = 0; i < q->eq_len; i++) {
- if (list[i]->a_expire < aio->a_expire) {
- aio = list[i];
- aio_idx = i;
+ q->eq_next = NNI_TIME_NEVER;
+ exp_idx = 0;
+ while (aio != NULL) {
+ if ((aio->a_expire < now) &&
+ (exp_idx < NNI_EXPIRE_BATCH)) {
+ nni_aio *nxt;
+
+ // This one is expiring.
+ expires[exp_idx++] = aio;
+ // save the next node
+ nxt = nni_list_next(&q->eq_list, aio);
+ nni_list_remove(&q->eq_list, aio);
+ // Place a temporary hold on the aio.
+ // This prevents it from being destroyed.
+ aio->a_expiring = true;
+ aio = nxt;
+ continue;
}
- }
- if (now < aio->a_expire) {
- // Unexpired; we just wait for the next expired aio.
- nni_cv_until(cv, aio->a_expire);
- now = nni_clock();
- continue;
+ if (aio->a_expire < q->eq_next) {
+ q->eq_next = aio->a_expire;
+ }
+ aio = nni_list_next(&q->eq_list, aio);
}
- // The time has come for this aio. Expire it, canceling any
- // outstanding I/O.
- list[aio_idx] = list[q->eq_len - 1];
- q->eq_len--;
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ for (uint32_t i = 0; i < exp_idx; i++) {
+ aio = expires[i];
+ rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
- void * cancel_arg = aio->a_cancel_arg;
+ nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
+ void *cancel_arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- // Place a temporary hold on the aio. This prevents it
- // from being destroyed.
- q->eq_aio = aio;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
- // We let the cancel function handle the completion.
- // If there is no cancellation function, then we cannot
- // terminate the aio - we've tried, but it has to run
- // to it's natural conclusion.
- nni_mtx_unlock(mtx);
- cancel_fn(aio, cancel_arg, rv);
-
- // Get updated time before reacquiring lock.
- now = nni_clock();
-
- nni_mtx_lock(mtx);
-
- q->eq_aio = NULL;
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to its natural conclusion.
+ if (cancel_fn != NULL) {
+ nni_mtx_unlock(mtx);
+ cancel_fn(aio, cancel_arg, rv);
+ nni_mtx_lock(mtx);
+ }
+ aio->a_expiring = false;
+ }
nni_cv_wake(cv);
+
+ if (now < q->eq_next) {
+ nni_cv_until(cv, q->eq_next);
+ }
}
}
@@ -756,7 +744,6 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq)
nni_mtx_unlock(&eq->eq_mtx);
}
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
nni_thr_fini(&eq->eq_thr);
nni_cv_fini(&eq->eq_cv);
nni_mtx_fini(&eq->eq_mtx);
@@ -773,9 +760,8 @@ nni_aio_expire_q_alloc(void)
}
nni_mtx_init(&eq->eq_mtx);
nni_cv_init(&eq->eq_cv, &eq->eq_mtx);
- eq->eq_cap = NNI_EXPIRE_Q_SIZE;
- eq->eq_len = 0;
- eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *));
+ NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node);
+ eq->eq_next = NNI_TIME_NEVER;
eq->eq_exit = false;
if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) {