aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c198
-rw-r--r--src/core/aio.h11
-rw-r--r--src/core/defs.h7
3 files changed, 102 insertions, 114 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 8ca00dd6..dfab8f60 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,14 +12,13 @@
#include <string.h>
struct nni_aio_expire_q {
- nni_mtx eq_mtx;
- nni_cv eq_cv;
- nni_aio **eq_list;
- uint32_t eq_len;
- uint32_t eq_cap;
- nni_aio * eq_aio; // currently expiring (task dispatch)
- nni_thr eq_thr;
- bool eq_exit;
+ nni_mtx eq_mtx;
+ nni_cv eq_cv;
+ nni_list eq_list;
+ uint32_t eq_len;
+ nni_thr eq_thr;
+ nni_time eq_next; // next expiration
+ bool eq_exit;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -117,7 +116,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
// This is like aio_close, but we don't want to dispatch
@@ -126,10 +125,10 @@ nni_aio_fini(nni_aio *aio)
// We also wait if the aio is being expired.
nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
- nni_aio_expire_rm(aio);
- while (eq->eq_aio == aio) {
+ while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -203,7 +202,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -228,7 +227,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -407,7 +406,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
@@ -508,125 +507,114 @@ static void
nni_aio_expire_add(nni_aio *aio)
{
nni_aio_expire_q *eq = aio->a_expire_q;
- if (eq->eq_len >= eq->eq_cap) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *));
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap *= 2;
- }
- eq->eq_list[eq->eq_len++] = aio;
- // Fire the latest aio, but it cames with performance punishment
- nni_cv_wake(&eq->eq_cv);
+ nni_list_append(&eq->eq_list, aio);
+
+ if (eq->eq_next > aio->a_expire) {
+ eq->eq_next = aio->a_expire;
+ nni_cv_wake(&eq->eq_cv);
+ }
}
static void
nni_aio_expire_rm(nni_aio *aio)
{
- nni_aio_expire_q *eq = aio->a_expire_q;
-
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- if (aio == eq->eq_list[i]) {
- eq->eq_list[i] = eq->eq_list[eq->eq_len - 1];
- eq->eq_len--;
- break;
- }
- }
+ nni_list_node_remove(&aio->a_expire_node);
- if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) {
- nni_aio **new_list =
- nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4);
- for (uint32_t i = 0; i < eq->eq_len; i++) {
- new_list[i] = eq->eq_list[i];
- }
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
- eq->eq_list = new_list;
- eq->eq_cap /= 4;
- }
+ // If this item is the one that is going to wake the loop,
+ // don't worry about it. It will wake up normally, or when we
+ // add a new aio to it. Worst case is just one spurious wake up,
+ // which we'd need to do anyway.
}
static void
nni_aio_expire_loop(void *arg)
{
nni_aio_expire_q *q = arg;
- nni_mtx * mtx = &q->eq_mtx;
- nni_cv * cv = &q->eq_cv;
- nni_aio ** list;
+ nni_mtx *mtx = &q->eq_mtx;
+ nni_cv *cv = &q->eq_cv;
nni_time now;
- uint32_t aio_idx;
+ uint32_t exp_idx;
+ nni_aio *expires[NNI_EXPIRE_BATCH];
nni_thr_set_name(NULL, "nng:aio:expire");
- now = nni_clock();
nni_mtx_lock(mtx);
for (;;) {
nni_aio *aio;
int rv;
-
- if (q->eq_len == 0) {
-
- if (q->eq_exit) {
- nni_mtx_unlock(mtx);
- return;
- }
-
- nni_cv_wait(cv);
-
- now = nni_clock();
+ nni_time next;
+
+ next = q->eq_next;
+ now = nni_clock();
+
+ // Each time we wake up, we scan the entire list of elements.
+ // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements
+ // (a batch) to a saved array of things we are going to cancel.
+ // This mostly runs in O(n), provided you don't have many
+ // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously.
+ aio = nni_list_first(&q->eq_list);
+ if ((aio == NULL) && (q->eq_exit)) {
+ nni_mtx_unlock(mtx);
+ return;
+ }
+ if (now < next) {
+ // Early wake up (just to reschedule), no need to
+ // rescan the list. This is an optimization.
+ nni_cv_until(cv, next);
continue;
}
-
- // Find the timer with min expire time.
- list = q->eq_list;
- aio_idx = 0;
- aio = list[aio_idx];
- for (uint32_t i = 0; i < q->eq_len; i++) {
- if (list[i]->a_expire < aio->a_expire) {
- aio = list[i];
- aio_idx = i;
+ q->eq_next = NNI_TIME_NEVER;
+ exp_idx = 0;
+ while (aio != NULL) {
+ if ((aio->a_expire < now) &&
+ (exp_idx < NNI_EXPIRE_BATCH)) {
+ nni_aio *nxt;
+
+ // This one is expiring.
+ expires[exp_idx++] = aio;
+ // save the next node
+ nxt = nni_list_next(&q->eq_list, aio);
+ nni_list_remove(&q->eq_list, aio);
+ // Place a temporary hold on the aio.
+ // This prevents it from being destroyed.
+ aio->a_expiring = true;
+ aio = nxt;
+ continue;
}
- }
- if (now < aio->a_expire) {
- // Unexpired; we just wait for the next expired aio.
- nni_cv_until(cv, aio->a_expire);
- now = nni_clock();
- continue;
+ if (aio->a_expire < q->eq_next) {
+ q->eq_next = aio->a_expire;
+ }
+ aio = nni_list_next(&q->eq_list, aio);
}
- // The time has come for this aio. Expire it, canceling any
- // outstanding I/O.
- list[aio_idx] = list[q->eq_len - 1];
- q->eq_len--;
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ for (uint32_t i = 0; i < exp_idx; i++) {
+ aio = expires[i];
+ rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
- void * cancel_arg = aio->a_cancel_arg;
+ nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
+ void *cancel_arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- // Place a temporary hold on the aio. This prevents it
- // from being destroyed.
- q->eq_aio = aio;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
- // We let the cancel function handle the completion.
- // If there is no cancellation function, then we cannot
- // terminate the aio - we've tried, but it has to run
- // to it's natural conclusion.
- nni_mtx_unlock(mtx);
- cancel_fn(aio, cancel_arg, rv);
-
- // Get updated time before reacquiring lock.
- now = nni_clock();
-
- nni_mtx_lock(mtx);
-
- q->eq_aio = NULL;
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to its natural conclusion.
+ if (cancel_fn != NULL) {
+ nni_mtx_unlock(mtx);
+ cancel_fn(aio, cancel_arg, rv);
+ nni_mtx_lock(mtx);
+ }
+ aio->a_expiring = false;
+ }
nni_cv_wake(cv);
+
+ if (now < q->eq_next) {
+ nni_cv_until(cv, q->eq_next);
+ }
}
}
@@ -756,7 +744,6 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq)
nni_mtx_unlock(&eq->eq_mtx);
}
- nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
nni_thr_fini(&eq->eq_thr);
nni_cv_fini(&eq->eq_cv);
nni_mtx_fini(&eq->eq_mtx);
@@ -773,9 +760,8 @@ nni_aio_expire_q_alloc(void)
}
nni_mtx_init(&eq->eq_mtx);
nni_cv_init(&eq->eq_cv, &eq->eq_mtx);
- eq->eq_cap = NNI_EXPIRE_Q_SIZE;
- eq->eq_len = 0;
- eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *));
+ NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node);
+ eq->eq_next = NNI_TIME_NEVER;
eq->eq_exit = false;
if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) {
diff --git a/src/core/aio.h b/src/core/aio.h
index ba231bd1..9ef5f63d 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -167,10 +167,10 @@ extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
-// An nni_aio is an async I/O handle. The details of this aio structure
+// nng_aio is an async I/O handle. The details of this aio structure
// are private to the AIO framework. The structure has the public name
// (nng_aio) so that we minimize the pollution in the public API namespace.
-// It is a coding error for anything out side of the AIO framework to access
+// It is a coding error for anything outside the AIO framework to access
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
@@ -181,6 +181,7 @@ struct nng_aio {
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
+ bool a_expiring; // Expiration in progress
nni_task a_task;
// Read/write operations.
@@ -198,9 +199,9 @@ struct nng_aio {
// Provider-use fields.
nni_aio_cancel_fn a_cancel_fn;
- void * a_cancel_arg;
+ void *a_cancel_arg;
nni_list_node a_prov_node; // Linkage on provider list.
- void * a_prov_extra[2]; // Extra data used by provider
+ void *a_prov_extra[2]; // Extra data used by provider
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
diff --git a/src/core/defs.h b/src/core/defs.h
index b1dbed18..fb9c7447 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -165,9 +165,10 @@ typedef nni_type nni_opt_type;
// NNI_MAX_HEADER_SIZE is our header size.
#define NNI_MAX_HEADER_SIZE ((NNI_MAX_MAX_TTL + 1) * sizeof(uint32_t))
-// NNI_EXPIRE_Q_SIZE is the default size of aio expire queue
-#ifndef NNI_EXPIRE_Q_SIZE
-#define NNI_EXPIRE_Q_SIZE 256
+// NNI_EXPIRE_BATCH lets us handle expiration in batches,
+// reducing the number of traverses of the expiration list we perform.
+#ifndef NNI_EXPIRE_BATCH
+#define NNI_EXPIRE_BATCH 100
#endif
#endif // CORE_DEFS_H