diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-07 10:03:30 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-07 11:20:14 -0800 |
| commit | 8fa3b2aa8e9191669f137be39ba61ad39243483a (patch) | |
| tree | 8c9e39ab5bbcce38a1bbe78a2badb1145e481aae | |
| parent | a02b1c7040c77f2549bfee16af36688f6b20ae63 (diff) | |
| download | nng-8fa3b2aa8e9191669f137be39ba61ad39243483a.tar.gz nng-8fa3b2aa8e9191669f137be39ba61ad39243483a.tar.bz2 nng-8fa3b2aa8e9191669f137be39ba61ad39243483a.zip | |
aio: separate stop / shutdown from fini (deallocate)
Probably other subsystems should get the same treatment. We need
to basically start the process of shutting down so that subsystems
know to cease operation before we rip memory out from underneath them.
This ensures that no new operations can be started as well, once we
have begun the process of teardown.
We also enhanced the completion of sleep to avoid some extra locking
contention, since the expiration *is* the completion.
Includes a test for this case.
| -rw-r--r-- | src/core/aio.c | 56 | ||||
| -rw-r--r-- | src/core/aio.h | 1 | ||||
| -rw-r--r-- | src/core/aio_test.c | 42 | ||||
| -rw-r--r-- | src/core/init.c | 6 |
4 files changed, 91 insertions, 14 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 91b5c85a..5807869b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "core/taskq.h" #include <string.h> struct nni_aio_expire_q { @@ -18,6 +19,7 @@ struct nni_aio_expire_q { nni_thr eq_thr; nni_time eq_next; // next expiration bool eq_exit; + bool eq_stop; }; static nni_aio_expire_q **nni_aio_expire_q_list; @@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; // We should not reschedule anything at this point. - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; @@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (aio->a_stop || eq->eq_exit) { + if (aio->a_stop || eq->eq_stop) { nni_task_abort(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); @@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg) nni_mtx_unlock(mtx); return; } - if (now < next) { - // Early wake up (just to reschedule), no need to - // rescan the list. This is an optimization. + if (now < next && !(q->eq_stop && aio != NULL)) { + // nothing to do! nni_cv_until(cv, next); continue; } q->eq_next = NNI_TIME_NEVER; exp_idx = 0; while (aio != NULL) { - if ((aio->a_expire < now) && + if ((q->eq_stop || aio->a_expire < now) && (exp_idx < NNI_EXPIRE_BATCH)) { nni_aio *nxt; @@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; - rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + if (q->eq_stop) { + rv = NNG_ECANCELED; + } else if (aio->a_expire_ok) { + aio->a_expire_ok = false; + rv = 0; + } else { + rv = NNG_ETIMEDOUT; + } nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; void *cancel_arg = aio->a_cancel_arg; @@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg) // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to its natural conclusion. - if (cancel_fn != NULL) { + // + // For the special case of sleeping, we don't need to + // drop the lock and call the cancel function, we are + // already doing it right here! + if (aio->a_sleep) { + aio->a_result = rv; + aio->a_sleep = false; + nni_task_dispatch(&aio->a_task); + } else if (cancel_fn != NULL) { nni_mtx_unlock(mtx); cancel_fn(aio, cancel_arg, rv); nni_mtx_lock(mtx); @@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg) aio->a_expiring = false; } nni_cv_wake(cv); - - if (now < q->eq_next) { - nni_cv_until(cv, q->eq_next); - } } } @@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) } static void +nni_aio_expire_q_stop(nni_aio_expire_q *eq) +{ + if (eq != NULL && !eq->eq_stop) { + nni_mtx_lock(&eq->eq_mtx); + eq->eq_stop = true; + nni_cv_wake(&eq->eq_cv); + nni_mtx_unlock(&eq->eq_mtx); + } +} + +static void nni_aio_expire_q_free(nni_aio_expire_q *eq) { if (eq == NULL) { return; } + NNI_ASSERT(eq->eq_stop); if (!eq->eq_exit) { nni_mtx_lock(&eq->eq_mtx); eq->eq_exit = true; @@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void) } void +nni_aio_sys_stop(void) +{ + for (int i = 0; i < nni_aio_expire_q_cnt; i++) { + nni_aio_expire_q_stop(nni_aio_expire_q_list[i]); + } +} + +void nni_aio_sys_fini(void) { for (int i = 0; i < nni_aio_expire_q_cnt; i++) { diff --git a/src/core/aio.h b/src/core/aio.h index 50cb266b..f8c6730f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -193,6 +193,7 @@ extern void nni_aio_completions_add( nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(nng_init_params *); +extern void nni_aio_sys_stop(void); extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 3e040fe1..f18eb5ed 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -72,6 +72,46 @@ test_sleep_timeout(void) nng_aio_free(aio); } +static void +sleep_reap(void *arg) +{ + nng_aio *aio = *(nng_aio **) arg; + if (nng_aio_result(aio) != NNG_ECANCELED) { + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + } + nng_aio_reap(aio); +} + +static void +test_sleep_fini(void) +{ + static nng_aio *aio; + NUTS_TRUE(nng_aio_alloc(&aio, sleep_reap, &aio) == 0); + nng_sleep_aio(20000, aio); + nng_msleep(1); + // intentionally we do not free the aio here. reap should clean it. + nng_fini(); + nng_init(NULL); // so that TEST_FINI will reap +} + +static void +test_sleep_fini_many(void) +{ +#define NIOS 2000 + static nng_aio *aios[NIOS]; + for (int i = 0; i < NIOS; i++) { + int rv = nng_aio_alloc(&(aios[i]), sleep_reap, &(aios[i])); + if (rv != 0) { + NUTS_ASSERT(rv == 0); + } + } + for (int i = 0; i < NIOS; i++) { + nng_sleep_aio(20000, aios[i]); + } + nng_fini(); + nng_init(NULL); +} + void test_insane_nio(void) { @@ -400,6 +440,8 @@ test_aio_busy(void) NUTS_TESTS = { { "sleep", test_sleep }, { "sleep timeout", test_sleep_timeout }, + { "sleep fini", test_sleep_fini }, + { "sleep fini many", test_sleep_fini_many }, { "insane nio", test_insane_nio }, { "provider cancel", test_provider_cancel }, { "consumer cancel", test_consumer_cancel }, diff --git a/src/core/init.c b/src/core/init.c index 6672beb1..fa07919e 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -133,14 +133,16 @@ nng_fini(void) nni_atomic_flag_reset(&init_busy); return; } + nni_aio_sys_stop(); // no more scheduling allowed! nni_sock_closeall(); nni_sp_tran_sys_fini(); nni_tls_sys_fini(); nni_reap_drain(); - nni_aio_sys_fini(); nni_taskq_sys_fini(); - nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_reap_drain(); + nni_aio_sys_fini(); nni_id_map_sys_fini(); + nni_reap_sys_fini(); // must be near the end nni_plat_fini(); nni_atomic_flag_reset(&init_busy); } |
