diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 56 | ||||
| -rw-r--r-- | src/core/aio.h | 1 | ||||
| -rw-r--r-- | src/core/aio_test.c | 42 | ||||
| -rw-r--r-- | src/core/init.c | 6 |
4 files changed, 91 insertions, 14 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 91b5c85a..5807869b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "core/taskq.h" #include <string.h> struct nni_aio_expire_q { @@ -18,6 +19,7 @@ struct nni_aio_expire_q { nni_thr eq_thr; nni_time eq_next; // next expiration bool eq_exit; + bool eq_stop; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; // We should not reschedule anything at this point. - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; @@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { nni_task_abort(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); @@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg) nni_mtx_unlock(mtx); return; } - if (now < next) { - // Early wake up (just to reschedule), no need to - // rescan the list. This is an optimization. + if (now < next && !(q->eq_stop && aio != NULL)) { + // nothing to do! nni_cv_until(cv, next); continue; } q->eq_next = NNI_TIME_NEVER; exp_idx = 0; while (aio != NULL) { - if ((aio->a_expire < now) && + if ((q->eq_stop || aio->a_expire < now) && (exp_idx < NNI_EXPIRE_BATCH)) { nni_aio *nxt; @@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; - rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + if (q->eq_stop) { + rv = NNG_ECANCELED; + } else if (aio->a_expire_ok) { + aio->a_expire_ok = false; + rv = 0; + } else { + rv = NNG_ETIMEDOUT; + } nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; void *cancel_arg = aio->a_cancel_arg; @@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg) // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to its natural conclusion. - if (cancel_fn != NULL) { + // + // For the special case of sleeping, we don't need to + // drop the lock and call the cancel function, we are + // already doing it right here! + if (aio->a_sleep) { + aio->a_result = rv; + aio->a_sleep = false; + nni_task_dispatch(&aio->a_task); + } else if (cancel_fn != NULL) { nni_mtx_unlock(mtx); cancel_fn(aio, cancel_arg, rv); nni_mtx_lock(mtx); @@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg) aio->a_expiring = false; } nni_cv_wake(cv); - - if (now < q->eq_next) { - nni_cv_until(cv, q->eq_next); - } } } @@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } static void +nni_aio_expire_q_stop(nni_aio_expire_q *eq) +{ + if (eq != NULL && !eq->eq_stop) { + nni_mtx_lock(&eq->eq_mtx); + eq->eq_stop = true; + nni_cv_wake(&eq->eq_cv); + nni_mtx_unlock(&eq->eq_mtx); + } +} + +static void nni_aio_expire_q_free(nni_aio_expire_q *eq) { if (eq == NULL) { return; } + NNI_ASSERT(eq->eq_stop); if (!eq->eq_exit) { nni_mtx_lock(&eq->eq_mtx); eq->eq_exit = true; @@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void) } void +nni_aio_sys_stop(void) +{ + for (int i = 0; i < nni_aio_expire_q_cnt; i++) { + nni_aio_expire_q_stop(nni_aio_expire_q_list[i]); + } +} + +void nni_aio_sys_fini(void) { for (int i = 0; i < nni_aio_expire_q_cnt; i++) { diff --git a/src/core/aio.h b/src/core/aio.h index 50cb266b..f8c6730f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -193,6 +193,7 @@ extern void nni_aio_completions_add( nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(nng_init_params *); +extern void nni_aio_sys_stop(void); extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 3e040fe1..f18eb5ed 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -72,6 +72,46 @@ test_sleep_timeout(void) nng_aio_free(aio); } +static void +sleep_reap(void *arg) +{ + nng_aio *aio = *(nng_aio **) arg; + if (nng_aio_result(aio) != NNG_ECANCELED) { + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + } + nng_aio_reap(aio); +} + +static void +test_sleep_fini(void) +{ + static nng_aio *aio; + NUTS_TRUE(nng_aio_alloc(&aio, sleep_reap, &aio) == 0); + nng_sleep_aio(20000, aio); + nng_msleep(1); + // intentionally we do not free the aio here. reap should clean it. + nng_fini(); + nng_init(NULL); // so that TEST_FINI will reap +} + +static void +test_sleep_fini_many(void) +{ +#define NIOS 2000 + static nng_aio *aios[NIOS]; + for (int i = 0; i < NIOS; i++) { + int rv = nng_aio_alloc(&(aios[i]), sleep_reap, &(aios[i])); + if (rv != 0) { + NUTS_ASSERT(rv == 0); + } + } + for (int i = 0; i < NIOS; i++) { + nng_sleep_aio(20000, aios[i]); + } + nng_fini(); + nng_init(NULL); +} + void test_insane_nio(void) { @@ -400,6 +440,8 @@ test_aio_busy(void) NUTS_TESTS = { { "sleep", test_sleep }, { "sleep timeout", test_sleep_timeout }, + { "sleep fini", test_sleep_fini }, + { "sleep fini many", test_sleep_fini_many }, { "insane nio", test_insane_nio }, { "provider cancel", test_provider_cancel }, { "consumer cancel", test_consumer_cancel }, diff --git a/src/core/init.c b/src/core/init.c index 6672beb1..fa07919e 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -133,14 +133,16 @@ nng_fini(void) nni_atomic_flag_reset(&init_busy); return; } + nni_aio_sys_stop(); // no more scheduling allowed! nni_sock_closeall(); nni_sp_tran_sys_fini(); nni_tls_sys_fini(); nni_reap_drain(); - nni_aio_sys_fini(); nni_taskq_sys_fini(); - nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_reap_drain(); + nni_aio_sys_fini(); nni_id_map_sys_fini(); + nni_reap_sys_fini(); // must be near the end nni_plat_fini(); nni_atomic_flag_reset(&init_busy); } |
