summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-08-09 22:39:46 -0700
committerGarrett D'Amore <garrett@damore.org>2021-08-09 22:39:46 -0700
commiteca7cb3ac170e159f5a5b79f68b1890409f7179e (patch)
tree1ae94d6ed3ec2b66af3c745dd89e626bfcc07b7c
parent22f089b6b5ebe3be380c33c926990fe891ae0da9 (diff)
downloadnng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.tar.gz
nng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.tar.bz2
nng-eca7cb3ac170e159f5a5b79f68b1890409f7179e.zip
fixes #1488 aio expiration list performance work needed
There were several problems with the array implementation, both from performance and from correctness. This corrects those errors (hopefully) and restores the expiration lists as linked lists.
-rw-r--r--src/core/aio.c198
-rw-r--r--src/core/aio.h11
-rw-r--r--src/core/defs.h7
3 files changed, 102 insertions, 114 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 8ca00dd6..dfab8f60 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,14 +12,13 @@
#include <string.h>
struct nni_aio_expire_q {
- nni_mtx eq_mtx;
- nni_cv eq_cv;
- nni_aio **eq_list;
- uint32_t eq_len;
- uint32_t eq_cap;
- nni_aio * eq_aio; // currently expiring (task dispatch)
- nni_thr eq_thr;
- bool eq_exit;
+ nni_mtx eq_mtx;
+ nni_cv eq_cv;
+ nni_list eq_list;
+ uint32_t eq_len;
+ nni_thr eq_thr;
+ nni_time eq_next; // next expiration
+ bool eq_exit;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -117,7 +116,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
// This is like aio_close, but we don't want to dispatch
@@ -126,10 +125,10 @@ nni_aio_fini(nni_aio *aio)
// We also wait if the aio is being expired.
nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
- nni_aio_expire_rm(aio);
- while (eq->eq_aio == aio) {
+ while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -203,7 +202,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -228,7 +227,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -407,7 +406,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -508,125 +507,114 @@ static void
nni_aio_expire_add(nni_aio *aio)
{
nni_aio_expire_q *eq = aio->a_expire_q;
- if (eq->eq_len >= eq->eq_cap) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *));
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap *= 2;
- }
- eq->eq_list[eq->eq_len++] = aio;
- // Fire the latest aio, but it cames with performance punishment
- nni_cv_wake(&eq->eq_cv);
+ nni_list_append(&eq->eq_list, aio);
+
+ if (eq->eq_next > aio->a_expire) {
+ eq->eq_next = aio->a_expire;
+ nni_cv_wake(&eq->eq_cv);
+ }
}
static void
nni_aio_expire_rm(nni_aio *aio)
{
- nni_aio_expire_q *eq = aio->a_expire_q;
-
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- if (aio == eq->eq_list[i]) {
- eq->eq_list[i] = eq->eq_list[eq->eq_len - 1];
- eq->eq_len--;
- break;
- }
- }
+ nni_list_node_remove(&aio->a_expire_node);
- if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4);
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap /= 4;
- }
+ // If this item is the one that is going to wake the loop,
+ // don't worry about it. It will wake up normally, or when we
+ // add a new aio to it. Worst case is just one spurious wake up,
+ // which we'd need to do anyway.
}
static void
nni_aio_expire_loop(void *arg)
{
nni_aio_expire_q *q = arg;
- nni_mtx * mtx = &q->eq_mtx;
- nni_cv * cv = &q->eq_cv;
- nni_aio ** list;
+ nni_mtx *mtx = &q->eq_mtx;
+ nni_cv *cv = &q->eq_cv;
nni_time now;
- uint32_t aio_idx;
+ uint32_t exp_idx;
+ nni_aio *expires[NNI_EXPIRE_BATCH];
nni_thr_set_name(NULL, "nng:aio:expire");
- now = nni_clock();
nni_mtx_lock(mtx);
for (;;) {
nni_aio *aio;
int rv;
-
- if (q->eq_len == 0) {
-
- if (q->eq_exit) {
- nni_mtx_unlock(mtx);
- return;
- }
-
- nni_cv_wait(cv);
-
- now = nni_clock();
+ nni_time next;
+
+ next = q->eq_next;
+ now = nni_clock();
+
+ // Each time we wake up, we scan the entire list of elements.
+ // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements
+ // (a batch) to a saved array of things we are going to cancel.
+ // This mostly runs in O(n), provided you don't have many
+ // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously.
+ aio = nni_list_first(&q->eq_list);
+ if ((aio == NULL) && (q->eq_exit)) {
+ nni_mtx_unlock(mtx);
+ return;
+ }
+ if (now < next) {
+ // Early wake up (just to reschedule), no need to
+ // rescan the list. This is an optimization.
+ nni_cv_until(cv, next);
continue;
}
-
- // Find the timer with min expire time.
- list = q->eq_list;
- aio_idx = 0;
- aio = list[aio_idx];
- for (uint32_t i = 0; i < q->eq_len; i++) {
- if (list[i]->a_expire < aio->a_expire) {
- aio = list[i];
- aio_idx = i;
+ q->eq_next = NNI_TIME_NEVER;
+ exp_idx = 0;
+ while (aio != NULL) {
+ if ((aio->a_expire < now) &&
+ (exp_idx < NNI_EXPIRE_BATCH)) {
+ nni_aio *nxt;
+
+ // This one is expiring.
+ expires[exp_idx++] = aio;
+ // save the next node
+ nxt = nni_list_next(&q->eq_list, aio);
+ nni_list_remove(&q->eq_list, aio);
+ // Place a temporary hold on the aio.
+ // This prevents it from being destroyed.
+ aio->a_expiring = true;
+ aio = nxt;
+ continue;
}
- }
- if (now < aio->a_expire) {
- // Unexpired; we just wait for the next expired aio.
- nni_cv_until(cv, aio->a_expire);
- now = nni_clock();
- continue;
+ if (aio->a_expire < q->eq_next) {
+ q->eq_next = aio->a_expire;
+ }
+ aio = nni_list_next(&q->eq_list, aio);
}
- // The time has come for this aio. Expire it, canceling any
- // outstanding I/O.
- list[aio_idx] = list[q->eq_len - 1];
- q->eq_len--;
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ for (uint32_t i = 0; i < exp_idx; i++) {
+ aio = expires[i];
+ rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
- void * cancel_arg = aio->a_cancel_arg;
+ nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
+ void *cancel_arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- // Place a temporary hold on the aio. This prevents it
- // from being destroyed.
- q->eq_aio = aio;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
- // We let the cancel function handle the completion.
- // If there is no cancellation function, then we cannot
- // terminate the aio - we've tried, but it has to run
- // to it's natural conclusion.
- nni_mtx_unlock(mtx);
- cancel_fn(aio, cancel_arg, rv);
-
- // Get updated time before reacquiring lock.
- now = nni_clock();
-
- nni_mtx_lock(mtx);
-
- q->eq_aio = NULL;
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to its natural conclusion.
+ if (cancel_fn != NULL) {
+ nni_mtx_unlock(mtx);
+ cancel_fn(aio, cancel_arg, rv);
+ nni_mtx_lock(mtx);
+ }
+ aio->a_expiring = false;
+ }
nni_cv_wake(cv);
+
+ if (now < q->eq_next) {
+ nni_cv_until(cv, q->eq_next);
+ }
}
}
@@ -756,7 +744,6 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq)
nni_mtx_unlock(&eq->eq_mtx);
}
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
nni_thr_fini(&eq->eq_thr);
nni_cv_fini(&eq->eq_cv);
nni_mtx_fini(&eq->eq_mtx);
@@ -773,9 +760,8 @@ nni_aio_expire_q_alloc(void)
}
nni_mtx_init(&eq->eq_mtx);
nni_cv_init(&eq->eq_cv, &eq->eq_mtx);
- eq->eq_cap = NNI_EXPIRE_Q_SIZE;
- eq->eq_len = 0;
- eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *));
+ NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node);
+ eq->eq_next = NNI_TIME_NEVER;
eq->eq_exit = false;
if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) {
diff --git a/src/core/aio.h b/src/core/aio.h
index ba231bd1..9ef5f63d 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -167,10 +167,10 @@ extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
-// An nni_aio is an async I/O handle. The details of this aio structure
+// nng_aio is an async I/O handle. The details of this aio structure
// are private to the AIO framework. The structure has the public name
// (nng_aio) so that we minimize the pollution in the public API namespace.
-// It is a coding error for anything out side of the AIO framework to access
+// It is a coding error for anything outside the AIO framework to access
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
@@ -181,6 +181,7 @@ struct nng_aio {
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
+ bool a_expiring; // Expiration in progress
nni_task a_task;
// Read/write operations.
@@ -198,9 +199,9 @@ struct nng_aio {
// Provider-use fields.
nni_aio_cancel_fn a_cancel_fn;
- void * a_cancel_arg;
+ void *a_cancel_arg;
nni_list_node a_prov_node; // Linkage on provider list.
- void * a_prov_extra[2]; // Extra data used by provider
+ void *a_prov_extra[2]; // Extra data used by provider
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
diff --git a/src/core/defs.h b/src/core/defs.h
index b1dbed18..fb9c7447 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -165,9 +165,10 @@ typedef nni_type nni_opt_type;
// NNI_MAX_HEADER_SIZE is our header size.
#define NNI_MAX_HEADER_SIZE ((NNI_MAX_MAX_TTL + 1) * sizeof(uint32_t))
-// NNI_EXPIRE_Q_SIZE is the default size of aio expire queue
-#ifndef NNI_EXPIRE_Q_SIZE
-#define NNI_EXPIRE_Q_SIZE 256
+// NNI_EXPIRE_BATCH lets us handle expiration in batches,
+// reducing the number of traverses of the expiration list we perform.
+#ifndef NNI_EXPIRE_BATCH
+#define NNI_EXPIRE_BATCH 100
#endif
#endif // CORE_DEFS_H