From 8fa3b2aa8e9191669f137be39ba61ad39243483a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Dec 2024 10:03:30 -0800 Subject: aio: separate stop / shutdown from fini (deallocate) Probably other subsystems should get the same treatment. We need to basically start the process of shutting down so that subsystems know to cease operation before we rip memory out from underneath them. This ensures that no new operations can be started as well, once we have begun the process of teardown. We also enhanced the completion of sleep to avoid some extra locking contention, since the expiration *is* the completion. Includes a test for this case. --- src/core/aio.c | 56 +++++++++++++++++++++++++++++++++++++++++------------ src/core/aio.h | 1 + src/core/aio_test.c | 42 ++++++++++++++++++++++++++++++++++++++++ src/core/init.c | 6 ++++-- 4 files changed, 91 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/core/aio.c b/src/core/aio.c index 91b5c85a..5807869b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "core/taskq.h" #include struct nni_aio_expire_q { @@ -18,6 +19,7 @@ struct nni_aio_expire_q { nni_thr eq_thr; nni_time eq_next; // next expiration bool eq_exit; + bool eq_stop; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; // We should not reschedule anything at this point. - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; @@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { nni_task_abort(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); @@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg) nni_mtx_unlock(mtx); return; } - if (now < next) { - // Early wake up (just to reschedule), no need to - // rescan the list. This is an optimization. + if (now < next && !(q->eq_stop && aio != NULL)) { + // nothing to do! nni_cv_until(cv, next); continue; } q->eq_next = NNI_TIME_NEVER; exp_idx = 0; while (aio != NULL) { - if ((aio->a_expire < now) && + if ((q->eq_stop || aio->a_expire < now) && (exp_idx < NNI_EXPIRE_BATCH)) { nni_aio *nxt; @@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; - rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + if (q->eq_stop) { + rv = NNG_ECANCELED; + } else if (aio->a_expire_ok) { + aio->a_expire_ok = false; + rv = 0; + } else { + rv = NNG_ETIMEDOUT; + } nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; void *cancel_arg = aio->a_cancel_arg; @@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg) // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to its natural conclusion. - if (cancel_fn != NULL) { + // + // For the special case of sleeping, we don't need to + // drop the lock and call the cancel function, we are + // already doing it right here! + if (aio->a_sleep) { + aio->a_result = rv; + aio->a_sleep = false; + nni_task_dispatch(&aio->a_task); + } else if (cancel_fn != NULL) { nni_mtx_unlock(mtx); cancel_fn(aio, cancel_arg, rv); nni_mtx_lock(mtx); @@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg) aio->a_expiring = false; } nni_cv_wake(cv); - - if (now < q->eq_next) { - nni_cv_until(cv, q->eq_next); - } } } @@ -768,12 +780,24 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } } +static void +nni_aio_expire_q_stop(nni_aio_expire_q *eq) +{ + if (eq != NULL && !eq->eq_stop) { + nni_mtx_lock(&eq->eq_mtx); + eq->eq_stop = true; + nni_cv_wake(&eq->eq_cv); + nni_mtx_unlock(&eq->eq_mtx); + } +} + static void nni_aio_expire_q_free(nni_aio_expire_q *eq) { if (eq == NULL) { return; } + NNI_ASSERT(eq->eq_stop); if (!eq->eq_exit) { nni_mtx_lock(&eq->eq_mtx); eq->eq_exit = true; @@ -810,6 +834,14 @@ nni_aio_expire_q_alloc(void) return (eq); } +void +nni_aio_sys_stop(void) +{ + for (int i = 0; i < nni_aio_expire_q_cnt; i++) { + nni_aio_expire_q_stop(nni_aio_expire_q_list[i]); + } +} + void nni_aio_sys_fini(void) { diff --git a/src/core/aio.h b/src/core/aio.h index 50cb266b..f8c6730f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -193,6 +193,7 @@ extern void nni_aio_completions_add( nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(nng_init_params *); +extern void nni_aio_sys_stop(void); extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 3e040fe1..f18eb5ed 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -72,6 +72,46 @@ test_sleep_timeout(void) nng_aio_free(aio); } +static void +sleep_reap(void *arg) +{ + nng_aio *aio = *(nng_aio **) arg; + if (nng_aio_result(aio) != NNG_ECANCELED) { + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + } + nng_aio_reap(aio); +} + +static void +test_sleep_fini(void) +{ + static nng_aio *aio; + NUTS_TRUE(nng_aio_alloc(&aio, sleep_reap, &aio) == 0); + nng_sleep_aio(20000, aio); + nng_msleep(1); + // intentionally we do not free the aio here. reap should clean it. + nng_fini(); + nng_init(NULL); // so that TEST_FINI will reap +} + +static void +test_sleep_fini_many(void) +{ +#define NIOS 2000 + static nng_aio *aios[NIOS]; + for (int i = 0; i < NIOS; i++) { + int rv = nng_aio_alloc(&(aios[i]), sleep_reap, &(aios[i])); + if (rv != 0) { + NUTS_ASSERT(rv == 0); + } + } + for (int i = 0; i < NIOS; i++) { + nng_sleep_aio(20000, aios[i]); + } + nng_fini(); + nng_init(NULL); +} + void test_insane_nio(void) { @@ -400,6 +440,8 @@ test_aio_busy(void) NUTS_TESTS = { { "sleep", test_sleep }, { "sleep timeout", test_sleep_timeout }, + { "sleep fini", test_sleep_fini }, + { "sleep fini many", test_sleep_fini_many }, { "insane nio", test_insane_nio }, { "provider cancel", test_provider_cancel }, { "consumer cancel", test_consumer_cancel }, diff --git a/src/core/init.c b/src/core/init.c index 6672beb1..fa07919e 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -133,14 +133,16 @@ nng_fini(void) nni_atomic_flag_reset(&init_busy); return; } + nni_aio_sys_stop(); // no more scheduling allowed! nni_sock_closeall(); nni_sp_tran_sys_fini(); nni_tls_sys_fini(); nni_reap_drain(); - nni_aio_sys_fini(); nni_taskq_sys_fini(); - nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_reap_drain(); + nni_aio_sys_fini(); nni_id_map_sys_fini(); + nni_reap_sys_fini(); // must be near the end nni_plat_fini(); nni_atomic_flag_reset(&init_busy); } -- cgit v1.2.3-70-g09d2