From 2033988343bce413763d3e9664e3e8372da48591 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 10 Dec 2020 22:17:23 -0800 Subject: fixes #1313 support deferred nng_aio destruction --- src/core/aio.c | 134 ++++++++++++++++++++++++++++++++++++++++------------ src/core/aio.h | 8 +++- src/core/aio_test.c | 21 +++++++- 3 files changed, 129 insertions(+), 34 deletions(-) (limited to 'src/core') diff --git a/src/core/aio.c b/src/core/aio.c index 1a8739e4..b910a600 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -14,11 +14,18 @@ static nni_mtx nni_aio_lk; // These are used for expiration. static nni_cv nni_aio_expire_cv; -static int nni_aio_expire_run; +static bool nni_aio_expire_exit; static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_list; static nni_aio *nni_aio_expire_aio; +// Reaping items. +static nni_thr nni_aio_reap_thr; +static nni_aio *nni_aio_reap_list; +static nni_mtx nni_aio_reap_lk; +static nni_cv nni_aio_reap_cv; +static bool nni_aio_reap_exit; + // Design notes. // // AIOs are only ever "completed" by the provider, which must call @@ -76,7 +83,7 @@ void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; - void * arg; + void * arg; // TODO: This probably could just use nni_aio_stop. @@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio) } } +void +nni_aio_reap(nni_aio *aio) +{ + if (aio != NULL) { + nni_mtx_lock(&nni_aio_reap_lk); + aio->a_reap_next = nni_aio_reap_list; + nni_aio_reap_list = aio; + nni_cv_wake1(&nni_aio_reap_cv); + nni_mtx_unlock(&nni_aio_reap_lk); + } +} + int nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov) { @@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -347,7 +366,7 @@ void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -471,13 +490,13 @@ nni_aio_expire_loop(void *unused) NNI_ARG_UNUSED(unused); - nni_thr_set_name(NULL, "nng:aio:expire"); + nni_thr_set_name(NULL, "nng:aio:expire"); for (;;) { nni_aio_cancel_fn fn; - nni_time now; - nni_aio * aio; - int rv; + nni_time now; + nni_aio * aio; + int rv; now = nni_clock(); @@ -485,7 +504,7 @@ nni_aio_expire_loop(void *unused) if ((aio = nni_list_first(list)) == NULL) { - if (nni_aio_expire_run == 0) { + if (nni_aio_expire_exit) { nni_mtx_unlock(&nni_aio_lk); return; } @@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused) } } +static void +nni_aio_reap_loop(void *unused) +{ + NNI_ARG_UNUSED(unused); + + nni_thr_set_name(NULL, "nng:aio:reap"); + + nni_mtx_lock(&nni_aio_reap_lk); + + for (;;) { + nni_aio *aio; + + if ((aio = nni_aio_reap_list) == NULL) { + if (nni_aio_reap_exit) { + break; + } + + nni_cv_wait(&nni_aio_reap_cv); + continue; + } + nni_aio_reap_list = NULL; + nni_mtx_unlock(&nni_aio_reap_lk); + + while (aio != NULL) { + nni_aio *old = aio; + aio = aio->a_reap_next; + nni_aio_free(old); + } + + nni_mtx_lock(&nni_aio_reap_lk); + } + + nni_mtx_unlock(&nni_aio_reap_lk); +} + void * nni_aio_get_prov_extra(nni_aio *aio, unsigned index) { @@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) void nni_aio_sys_fini(void) { - nni_mtx *mtx = &nni_aio_lk; - nni_cv * cv = &nni_aio_expire_cv; - nni_thr *thr = &nni_aio_expire_thr; + nni_mtx *mtx1 = &nni_aio_lk; + nni_cv * cv1 = &nni_aio_expire_cv; + nni_thr *thr1 = &nni_aio_expire_thr; + nni_mtx *mtx2 = &nni_aio_reap_lk; + nni_cv * cv2 = &nni_aio_reap_cv; + nni_thr *thr2 = &nni_aio_reap_thr; - if (nni_aio_expire_run) { - nni_mtx_lock(mtx); - nni_aio_expire_run = 0; - nni_cv_wake(cv); - nni_mtx_unlock(mtx); + if (!nni_aio_expire_exit) { + nni_mtx_lock(mtx1); + nni_aio_expire_exit = true; + nni_cv_wake(cv1); + nni_mtx_unlock(mtx1); } - nni_thr_fini(thr); - nni_cv_fini(cv); - nni_mtx_fini(mtx); + if (!nni_aio_reap_exit) { + nni_mtx_lock(mtx2); + nni_aio_reap_exit = true; + nni_cv_wake(cv2); + nni_mtx_unlock(mtx2); + } + + nni_thr_fini(thr1); + nni_cv_fini(cv1); + nni_mtx_fini(mtx1); + + nni_thr_fini(thr2); + nni_cv_fini(cv2); + nni_mtx_fini(mtx2); } int nni_aio_sys_init(void) { - int rv; - nni_mtx *mtx = &nni_aio_lk; - nni_cv * cv = &nni_aio_expire_cv; - nni_thr *thr = &nni_aio_expire_thr; + int rv, rv1, rv2; + nni_thr *thr1 = &nni_aio_expire_thr; + nni_thr *thr2 = &nni_aio_reap_thr; NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node); - nni_mtx_init(mtx); - nni_cv_init(cv, mtx); + nni_mtx_init(&nni_aio_lk); + nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk); + nni_mtx_init(&nni_aio_reap_lk); + nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk); + + nni_aio_expire_exit = false; + nni_aio_reap_exit = false; - if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) { + rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL); + rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL); + if (((rv = rv1) != 0) || ((rv = rv2) != 0)) { nni_aio_sys_fini(); return (rv); } - nni_aio_expire_run = 1; - nni_thr_run(thr); + nni_thr_run(thr1); + nni_thr_run(thr2); return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index 80f48341..dbe7fbb9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -28,6 +28,10 @@ extern void nni_aio_init(nni_aio *, nni_cb, void *arg); // It waits for the callback to complete. extern void nni_aio_fini(nni_aio *); +// nni_aio_reap is used to asynchronously reap the aio. It can +// be called even from the callback of the aio itself. +extern void nni_aio_reap(nni_aio *); + // nni_aio_alloc allocates an aio object and initializes it. The callback // is called with the supplied argument when the operation is complete. // If NULL is supplied for the callback, then nni_aio_wake is used in its @@ -195,8 +199,8 @@ struct nng_aio { nni_list_node a_prov_node; // Linkage on provider list. void * a_prov_extra[2]; // Extra data used by provider - // Expire node. - nni_list_node a_expire_node; + nni_list_node a_expire_node; // Expiration node + struct nng_aio *a_reap_next; }; #endif // CORE_AIO_H diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 3dab4b04..acf3c129 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -35,7 +35,7 @@ void test_sleep(void) { nng_time start; - nng_time end = 0; + nng_time end = 0; nng_aio *aio; NUTS_PASS(nng_aio_alloc(&aio, sleep_done, &end)); @@ -55,7 +55,7 @@ void test_sleep_timeout(void) { nng_time start; - nng_time end = 0; + nng_time end = 0; nng_aio *aio; NUTS_TRUE(nng_aio_alloc(&aio, sleep_done, &end) == 0); @@ -226,6 +226,22 @@ test_zero_timeout(void) NUTS_PASS(nng_close(s)); } +static void +aio_sleep_cb(void *arg) +{ + nng_aio *aio = *(nng_aio **) arg; + nng_aio_reap(aio); +} + +void +test_aio_reap(void) +{ + nng_aio *a; + NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a)); + nng_sleep_aio(10, a); + nng_msleep(20); +} + NUTS_TESTS = { { "sleep", test_sleep }, { "sleep timeout", test_sleep_timeout }, @@ -236,5 +252,6 @@ NUTS_TESTS = { { "explicit timeout", test_explicit_timeout }, { "inherited timeout", test_inherited_timeout }, { "zero timeout", test_zero_timeout }, + { "aio reap", test_aio_reap }, { NULL, NULL }, }; \ No newline at end of file -- cgit v1.2.3-70-g09d2