diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 56 |
1 files changed, 44 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 91b5c85a..5807869b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "core/taskq.h" #include <string.h> struct nni_aio_expire_q { @@ -18,6 +19,7 @@ struct nni_aio_expire_q { nni_thr eq_thr; nni_time eq_next; // next expiration bool eq_exit; + bool eq_stop; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; // We should not reschedule anything at this point. - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; @@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { nni_task_abort(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); @@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg) nni_mtx_unlock(mtx); return; } - if (now < next) { - // Early wake up (just to reschedule), no need to - // rescan the list. This is an optimization. + if (now < next && !(q->eq_stop && aio != NULL)) { + // nothing to do! nni_cv_until(cv, next); continue; } q->eq_next = NNI_TIME_NEVER; exp_idx = 0; while (aio != NULL) { - if ((aio->a_expire < now) && + if ((q->eq_stop || aio->a_expire < now) && (exp_idx < NNI_EXPIRE_BATCH)) { nni_aio *nxt; @@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; - rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + if (q->eq_stop) { + rv = NNG_ECANCELED; + } else if (aio->a_expire_ok) { + aio->a_expire_ok = false; + rv = 0; + } else { + rv = NNG_ETIMEDOUT; + } nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; void *cancel_arg = aio->a_cancel_arg; @@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg) // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to its natural conclusion. - if (cancel_fn != NULL) { + // + // For the special case of sleeping, we don't need to + // drop the lock and call the cancel function, we are + // already doing it right here! + if (aio->a_sleep) { + aio->a_result = rv; + aio->a_sleep = false; + nni_task_dispatch(&aio->a_task); + } else if (cancel_fn != NULL) { nni_mtx_unlock(mtx); cancel_fn(aio, cancel_arg, rv); nni_mtx_lock(mtx); @@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg) aio->a_expiring = false; } nni_cv_wake(cv); - - if (now < q->eq_next) { - nni_cv_until(cv, q->eq_next); - } } } @@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } static void +nni_aio_expire_q_stop(nni_aio_expire_q *eq) +{ + if (eq != NULL && !eq->eq_stop) { + nni_mtx_lock(&eq->eq_mtx); + eq->eq_stop = true; + nni_cv_wake(&eq->eq_cv); + nni_mtx_unlock(&eq->eq_mtx); + } +} + +static void nni_aio_expire_q_free(nni_aio_expire_q *eq) { if (eq == NULL) { return; } + NNI_ASSERT(eq->eq_stop); if (!eq->eq_exit) { nni_mtx_lock(&eq->eq_mtx); eq->eq_exit = true; @@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void) } void +nni_aio_sys_stop(void) +{ + for (int i = 0; i < nni_aio_expire_q_cnt; i++) { + nni_aio_expire_q_stop(nni_aio_expire_q_list[i]); + } +} + +void nni_aio_sys_fini(void) { for (int i = 0; i < nni_aio_expire_q_cnt; i++) { |
