diff options
| author | wangha <68215275+wanghaEMQ@users.noreply.github.com> | 2021-07-07 06:35:12 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-07-06 15:35:12 -0700 |
| commit | f37a72f3697125092baa510f26d2e2f9f0b854e0 (patch) | |
| tree | 4bf1210da8d7e286d164e6389c2f7d4dcfedf4cb /src/core/aio.c | |
| parent | f9fa485df941c9336c49fa5be4f9346754218128 (diff) | |
| download | nng-f37a72f3697125092baa510f26d2e2f9f0b854e0.tar.gz nng-f37a72f3697125092baa510f26d2e2f9f0b854e0.tar.bz2 nng-f37a72f3697125092baa510f26d2e2f9f0b854e0.zip | |
Turn aio expire queue from nni list to array for efficiency. (#1449)
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 101 |
1 files changed, 70 insertions, 31 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index cda46671..6512a78e 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,12 +12,14 @@ #include <string.h> struct nni_aio_expire_q { - nni_mtx eq_mtx; - nni_cv eq_cv; - nni_list eq_list; - nni_aio *eq_aio; // currently expiring (task dispatch) - nni_thr eq_thr; - bool eq_exit; + nni_mtx eq_mtx; + nni_cv eq_cv; + nni_aio **eq_list; + uint32_t eq_len; + uint32_t eq_cap; + nni_aio * eq_aio; // currently expiring (task dispatch) + nni_thr eq_thr; + bool eq_exit; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -98,6 +100,7 @@ static nni_reap_list aio_reap_list = { }; static void nni_aio_expire_add(nni_aio *); +static void nni_aio_expire_rm(nni_aio *); void nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -123,7 +126,7 @@ nni_aio_fini(nni_aio *aio) // We also wait if the aio is being expired. nni_mtx_lock(&eq->eq_mtx); aio->a_stop = true; - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); while (eq->eq_aio == aio) { nni_cv_wait(&eq->eq_cv); } @@ -204,7 +207,7 @@ nni_aio_stop(nni_aio *aio) nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; @@ -229,7 +232,7 @@ nni_aio_close(nni_aio *aio) nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; @@ -409,7 +412,7 @@ nni_aio_abort(nni_aio *aio, int rv) nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; @@ -432,7 +435,7 @@ nni_aio_finish_impl( nni_mtx_lock(&eq->eq_mtx); - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); aio->a_result = rv; aio->a_count = count; aio->a_cancel_fn = NULL; @@ -505,23 +508,45 @@ nni_aio_list_active(nni_aio *aio) static void nni_aio_expire_add(nni_aio *aio) { - nni_list *list = &aio->a_expire_q->eq_list; - nni_aio * prev; + nni_aio_expire_q *eq = aio->a_expire_q; + if (eq->eq_len >= eq->eq_cap) { + nni_aio **new_list = + nni_zalloc(eq->eq_cap * 2 * sizeof(nni_aio *)); + for (uint32_t i = 0; i < eq->eq_len; i++) { + new_list[i] = eq->eq_list[i]; + } + nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); + eq->eq_list = new_list; + eq->eq_cap *= 2; + } + eq->eq_list[eq->eq_len++] = aio; + if (eq->eq_len == 1) { + nni_cv_wake(&eq->eq_cv); + } +} - // This is a reverse walk of the list. We're more likely to find - // a match at the end of the list. - for (prev = nni_list_last(list); prev != NULL; - prev = nni_list_prev(list, prev)) { - if (aio->a_expire >= prev->a_expire) { - nni_list_insert_after(list, aio, prev); +static void +nni_aio_expire_rm(nni_aio *aio) +{ + uint32_t aio_idx = 0; + nni_aio_expire_q *eq = aio->a_expire_q; + for (aio_idx = 0; aio_idx < eq->eq_len; aio_idx++) { + if (aio == eq->eq_list[aio_idx]) { + eq->eq_list[aio_idx] = eq->eq_list[eq->eq_len - 1]; + eq->eq_len--; break; } } - if (prev == NULL) { - // This has the shortest time, so insert at the start. - nni_list_prepend(list, aio); - // And, as we are the latest, kick the thing. - nni_cv_wake(&aio->a_expire_q->eq_cv); + + if (eq->eq_len < eq->eq_cap / 4 && eq->eq_cap > NNI_EXPIRE_Q_SIZE) { + nni_aio **new_list = + nni_zalloc(eq->eq_cap * sizeof(nni_aio *) / 4); + for (uint32_t i = 0; i < eq->eq_len; i++) { + new_list[i] = eq->eq_list[i]; + } + nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); + eq->eq_list = new_list; + eq->eq_cap /= 2; } } @@ -529,10 +554,11 @@ static void nni_aio_expire_loop(void *arg) { nni_aio_expire_q *q = arg; - nni_list * list = &q->eq_list; + nni_aio ** list = q->eq_list; nni_mtx * mtx = &q->eq_mtx; nni_cv * cv = &q->eq_cv; nni_time now; + uint32_t aio_idx; nni_thr_set_name(NULL, "nng:aio:expire"); @@ -543,7 +569,7 @@ nni_aio_expire_loop(void *arg) nni_aio *aio; int rv; - if ((aio = nni_list_first(list)) == NULL) { + if (q->eq_len == 0) { if (q->eq_exit) { nni_mtx_unlock(mtx); @@ -556,8 +582,18 @@ nni_aio_expire_loop(void *arg) continue; } + // Find the timer with min expire time. + list = q->eq_list; + aio_idx = 0; + aio = list[aio_idx]; + for (uint32_t i = 0; i < q->eq_len; i++) { + if (list[i]->a_expire < aio->a_expire) { + aio = list[i]; + aio_idx = i; + } + } if (now < aio->a_expire) { - // Unexpired; the list is ordered, so we just wait. + // Unexpired; we just wait for the next expired aio. nni_cv_until(cv, aio->a_expire); now = nni_clock(); continue; @@ -565,7 +601,8 @@ nni_aio_expire_loop(void *arg) // The time has come for this aio. Expire it, canceling any // outstanding I/O. - nni_list_remove(list, aio); + list[aio_idx] = list[q->eq_len - 1]; + q->eq_len--; rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; @@ -672,7 +709,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) } aio->a_sleep = false; - nni_list_node_remove(&aio->a_expire_node); + nni_aio_expire_rm(aio); nni_mtx_unlock(&eq->eq_mtx); nni_aio_finish_error(aio, rv); @@ -720,6 +757,7 @@ nni_aio_expire_q_free(nni_aio_expire_q *eq) nni_mtx_unlock(&eq->eq_mtx); } + nni_free(eq->eq_list, eq->eq_cap * sizeof(nni_aio *)); nni_thr_fini(&eq->eq_thr); nni_cv_fini(&eq->eq_cv); nni_mtx_fini(&eq->eq_mtx); @@ -736,7 +774,9 @@ nni_aio_expire_q_alloc(void) } nni_mtx_init(&eq->eq_mtx); nni_cv_init(&eq->eq_cv, &eq->eq_mtx); - NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node); + eq->eq_cap = NNI_EXPIRE_Q_SIZE; + eq->eq_len = 0; + eq->eq_list = nni_zalloc(eq->eq_cap * sizeof(nni_aio *)); eq->eq_exit = false; if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) { @@ -775,7 +815,6 @@ nni_aio_sys_init(void) num_thr = 256; } - nni_aio_expire_q_list = nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); nni_aio_expire_q_cnt = num_thr; |
