aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c101
-rw-r--r--src/core/defs.h5
2 files changed, 75 insertions, 31 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index cda46671..6512a78e 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,12 +12,14 @@
#include <string.h>
struct nni_aio_expire_q {
- nni_mtx eq_mtx;
- nni_cv eq_cv;
- nni_list eq_list;
- nni_aio *eq_aio; // currently expiring (task dispatch)
- nni_thr eq_thr;
- bool eq_exit;
+ nni_mtx eq_mtx;
+ nni_cv eq_cv;
+ nni_aio **eq_list;
+ uint32_t eq_len;
+ uint32_t eq_cap;
+ nni_aio * eq_aio; // currently expiring (task dispatch)
+ nni_thr eq_thr;
+ bool eq_exit;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -98,6 +100,7 @@ static nni_reap_list aio_reap_list = {
};
static void nni_aio_expire_add(nni_aio *);
+static void nni_aio_expire_rm(nni_aio *);
void
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -123,7 +126,7 @@ nni_aio_fini(nni_aio *aio)
// We also wait if the aio is being expired.
nni_mtx_lock(&eq->eq_mtx);
aio->a_stop = true;
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
while (eq->eq_aio == aio) {
nni_cv_wait(&eq->eq_cv);
}
@@ -204,7 +207,7 @@ nni_aio_stop(nni_aio *aio)
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -229,7 +232,7 @@ nni_aio_close(nni_aio *aio)
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -409,7 +412,7 @@ nni_aio_abort(nni_aio *aio, int rv)
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
fn = aio->a_cancel_fn;
arg = aio->a_cancel_arg;
aio->a_cancel_fn = NULL;
@@ -432,7 +435,7 @@ nni_aio_finish_impl(
nni_mtx_lock(&eq->eq_mtx);
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
aio->a_result = rv;
aio->a_count = count;
aio->a_cancel_fn = NULL;
@@ -505,23 +508,45 @@ nni_aio_list_active(nni_aio *aio)
static void
nni_aio_expire_add(nni_aio *aio)
{
- nni_list *list = &aio->a_expire_q->eq_list;
- nni_aio * prev;
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ if (eq->eq_len >= eq->eq_cap) {
+ nni_aio **new_list =
+ nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *));
+ for (uint32_t i = 0; i < eq->eq_len; i++) {
+ new_list[i] = eq->eq_list[i];
+ }
+ nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
+ eq->eq_list = new_list;
+ eq->eq_cap *= 2;
+ }
+ eq->eq_list[eq->eq_len++] = aio;
+ if (eq->eq_len == 1) {
+ nni_cv_wake(&eq->eq_cv);
+ }
+}
- // This is a reverse walk of the list. We're more likely to find
- // a match at the end of the list.
- for (prev = nni_list_last(list); prev != NULL;
- prev = nni_list_prev(list, prev)) {
- if (aio->a_expire >= prev->a_expire) {
- nni_list_insert_after(list, aio, prev);
+static void
+nni_aio_expire_rm(nni_aio *aio)
+{
+ uint32_t aio_idx = 0;
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ for (aio_idx = 0; aio_idx < eq->eq_len; aio_idx++) {
+ if (aio == eq->eq_list[aio_idx]) {
+ eq->eq_list[aio_idx] = eq->eq_list[eq->eq_len - 1];
+ eq->eq_len--;
break;
}
}
- if (prev == NULL) {
- // This has the shortest time, so insert at the start.
- nni_list_prepend(list, aio);
- // And, as we are the latest, kick the thing.
- nni_cv_wake(&aio->a_expire_q->eq_cv);
+
+ if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) {
+ nni_aio **new_list =
+ nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4);
+ for (uint32_t i = 0; i < eq->eq_len; i++) {
+ new_list[i] = eq->eq_list[i];
+ }
+ nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
+ eq->eq_list = new_list;
+ eq->eq_cap /= 2;
}
}
@@ -529,10 +554,11 @@ static void
nni_aio_expire_loop(void *arg)
{
nni_aio_expire_q *q = arg;
- nni_list * list = &q->eq_list;
+ nni_aio ** list = q->eq_list;
nni_mtx * mtx = &q->eq_mtx;
nni_cv * cv = &q->eq_cv;
nni_time now;
+ uint32_t aio_idx;
nni_thr_set_name(NULL, "nng:aio:expire");
@@ -543,7 +569,7 @@ nni_aio_expire_loop(void *arg)
nni_aio *aio;
int rv;
- if ((aio = nni_list_first(list)) == NULL) {
+ if (q->eq_len == 0) {
if (q->eq_exit) {
nni_mtx_unlock(mtx);
@@ -556,8 +582,18 @@ nni_aio_expire_loop(void *arg)
continue;
}
+ // Find the timer with min expire time.
+ list = q->eq_list;
+ aio_idx = 0;
+ aio = list[aio_idx];
+ for (uint32_t i = 0; i < q->eq_len; i++) {
+ if (list[i]->a_expire < aio->a_expire) {
+ aio = list[i];
+ aio_idx = i;
+ }
+ }
if (now < aio->a_expire) {
- // Unexpired; the list is ordered, so we just wait.
+ // Unexpired; we just wait for the next expired aio.
nni_cv_until(cv, aio->a_expire);
now = nni_clock();
continue;
@@ -565,7 +601,8 @@ nni_aio_expire_loop(void *arg)
// The time has come for this aio. Expire it, canceling any
// outstanding I/O.
- nni_list_remove(list, aio);
+ list[aio_idx] = list[q->eq_len - 1];
+ q->eq_len--;
rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
@@ -672,7 +709,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
}
aio->a_sleep = false;
- nni_list_node_remove(&aio->a_expire_node);
+ nni_aio_expire_rm(aio);
nni_mtx_unlock(&eq->eq_mtx);
nni_aio_finish_error(aio, rv);
@@ -720,6 +757,7 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq)
nni_mtx_unlock(&eq->eq_mtx);
}
+ nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *));
nni_thr_fini(&eq->eq_thr);
nni_cv_fini(&eq->eq_cv);
nni_mtx_fini(&eq->eq_mtx);
@@ -736,7 +774,9 @@ nni_aio_expire_q_alloc(void)
}
nni_mtx_init(&eq->eq_mtx);
nni_cv_init(&eq->eq_cv, &eq->eq_mtx);
- NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node);
+ eq->eq_cap = NNI_EXPIRE_Q_SIZE;
+ eq->eq_len = 0;
+ eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *));
eq->eq_exit = false;
if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) {
@@ -775,7 +815,6 @@ nni_aio_sys_init(void)
num_thr = 256;
}
-
nni_aio_expire_q_list =
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
nni_aio_expire_q_cnt = num_thr;
diff --git a/src/core/defs.h b/src/core/defs.h
index 1dffdad0..a313c248 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -175,4 +175,9 @@ typedef nni_type nni_opt_type;
// NNI_MAX_HEADER_SIZE is our header size.
#define NNI_MAX_HEADER_SIZE ((NNI_MAX_MAX_TTL + 1) * sizeof(uint32_t))
+// NNI_EXPIRE_Q_SIZE is the default size of aio expire queue
+#ifndef NNI_EXPIRE_Q_SIZE
+#define NNI_EXPIRE_Q_SIZE 256
+#endif
+
#endif // CORE_DEFS_H