diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-12-10 22:17:23 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-12-12 11:32:51 -0800 |
| commit | 2033988343bce413763d3e9664e3e8372da48591 (patch) | |
| tree | f39ce75c40dd94f95884d7d4c43a76df510a86bf /src | |
| parent | b45f876d005371f62fc261a5584c4d7dafd7a0f7 (diff) | |
| download | nng-2033988343bce413763d3e9664e3e8372da48591.tar.gz nng-2033988343bce413763d3e9664e3e8372da48591.tar.bz2 nng-2033988343bce413763d3e9664e3e8372da48591.zip | |
fixes #1313 support deferred nng_aio destruction
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 134 | ||||
| -rw-r--r-- | src/core/aio.h | 8 | ||||
| -rw-r--r-- | src/core/aio_test.c | 21 | ||||
| -rw-r--r-- | src/nng.c | 6 | ||||
| -rw-r--r-- | src/supplemental/http/http_client.c | 21 |
5 files changed, 145 insertions, 45 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 1a8739e4..b910a600 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -14,11 +14,18 @@ static nni_mtx nni_aio_lk; // These are used for expiration. static nni_cv nni_aio_expire_cv; -static int nni_aio_expire_run; +static bool nni_aio_expire_exit; static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_list; static nni_aio *nni_aio_expire_aio; +// Reaping items. +static nni_thr nni_aio_reap_thr; +static nni_aio *nni_aio_reap_list; +static nni_mtx nni_aio_reap_lk; +static nni_cv nni_aio_reap_cv; +static bool nni_aio_reap_exit; + // Design notes. // // AIOs are only ever "completed" by the provider, which must call @@ -76,7 +83,7 @@ void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; - void * arg; + void * arg; // TODO: This probably could just use nni_aio_stop. @@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio) } } +void +nni_aio_reap(nni_aio *aio) +{ + if (aio != NULL) { + nni_mtx_lock(&nni_aio_reap_lk); + aio->a_reap_next = nni_aio_reap_list; + nni_aio_reap_list = aio; + nni_cv_wake1(&nni_aio_reap_cv); + nni_mtx_unlock(&nni_aio_reap_lk); + } +} + int nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov) { @@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -347,7 +366,7 @@ void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; - void * arg; + void * arg; nni_mtx_lock(&nni_aio_lk); fn = aio->a_cancel_fn; @@ -471,13 +490,13 @@ nni_aio_expire_loop(void *unused) NNI_ARG_UNUSED(unused); - nni_thr_set_name(NULL, "nng:aio:expire"); + nni_thr_set_name(NULL, "nng:aio:expire"); for (;;) { nni_aio_cancel_fn fn; - nni_time now; - nni_aio * aio; - int rv; + nni_time now; + nni_aio * aio; + int rv; now = nni_clock(); @@ -485,7 +504,7 @@ nni_aio_expire_loop(void *unused) if ((aio = nni_list_first(list)) == NULL) { - if (nni_aio_expire_run == 0) { + if (nni_aio_expire_exit) { nni_mtx_unlock(&nni_aio_lk); return; } @@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused) } } +static void +nni_aio_reap_loop(void *unused) +{ + NNI_ARG_UNUSED(unused); + + nni_thr_set_name(NULL, "nng:aio:reap"); + + nni_mtx_lock(&nni_aio_reap_lk); + + for (;;) { + nni_aio *aio; + + if ((aio = nni_aio_reap_list) == NULL) { + if (nni_aio_reap_exit) { + break; + } + + nni_cv_wait(&nni_aio_reap_cv); + continue; + } + nni_aio_reap_list = NULL; + nni_mtx_unlock(&nni_aio_reap_lk); + + while (aio != NULL) { + nni_aio *old = aio; + aio = aio->a_reap_next; + nni_aio_free(old); + } + + nni_mtx_lock(&nni_aio_reap_lk); + } + + nni_mtx_unlock(&nni_aio_reap_lk); +} + void * nni_aio_get_prov_extra(nni_aio *aio, unsigned index) { @@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) void nni_aio_sys_fini(void) { - nni_mtx *mtx = &nni_aio_lk; - nni_cv * cv = &nni_aio_expire_cv; - nni_thr *thr = &nni_aio_expire_thr; + nni_mtx *mtx1 = &nni_aio_lk; + nni_cv * cv1 = &nni_aio_expire_cv; + nni_thr *thr1 = &nni_aio_expire_thr; + nni_mtx *mtx2 = &nni_aio_reap_lk; + nni_cv * cv2 = &nni_aio_reap_cv; + nni_thr *thr2 = &nni_aio_reap_thr; - if (nni_aio_expire_run) { - nni_mtx_lock(mtx); - nni_aio_expire_run = 0; - nni_cv_wake(cv); - nni_mtx_unlock(mtx); + if (!nni_aio_expire_exit) { + nni_mtx_lock(mtx1); + nni_aio_expire_exit = true; + nni_cv_wake(cv1); + nni_mtx_unlock(mtx1); } - nni_thr_fini(thr); - nni_cv_fini(cv); - nni_mtx_fini(mtx); + if (!nni_aio_reap_exit) { + nni_mtx_lock(mtx2); + nni_aio_reap_exit = true; + nni_cv_wake(cv2); + nni_mtx_unlock(mtx2); + } + + nni_thr_fini(thr1); + nni_cv_fini(cv1); + nni_mtx_fini(mtx1); + + nni_thr_fini(thr2); + nni_cv_fini(cv2); + nni_mtx_fini(mtx2); } int nni_aio_sys_init(void) { - int rv; - nni_mtx *mtx = &nni_aio_lk; - nni_cv * cv = &nni_aio_expire_cv; - nni_thr *thr = &nni_aio_expire_thr; + int rv, rv1, rv2; + nni_thr *thr1 = &nni_aio_expire_thr; + nni_thr *thr2 = &nni_aio_reap_thr; NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node); - nni_mtx_init(mtx); - nni_cv_init(cv, mtx); + nni_mtx_init(&nni_aio_lk); + nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk); + nni_mtx_init(&nni_aio_reap_lk); + nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk); + + nni_aio_expire_exit = false; + nni_aio_reap_exit = false; - if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) { + rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL); + rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL); + if (((rv = rv1) != 0) || ((rv = rv2) != 0)) { nni_aio_sys_fini(); return (rv); } - nni_aio_expire_run = 1; - nni_thr_run(thr); + nni_thr_run(thr1); + nni_thr_run(thr2); return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index 80f48341..dbe7fbb9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -28,6 +28,10 @@ extern void nni_aio_init(nni_aio *, nni_cb, void *arg); // It waits for the callback to complete. extern void nni_aio_fini(nni_aio *); +// nni_aio_reap is used to asynchronously reap the aio. It can +// be called even from the callback of the aio itself. +extern void nni_aio_reap(nni_aio *); + // nni_aio_alloc allocates an aio object and initializes it. The callback // is called with the supplied argument when the operation is complete. // If NULL is supplied for the callback, then nni_aio_wake is used in its @@ -195,8 +199,8 @@ struct nng_aio { nni_list_node a_prov_node; // Linkage on provider list. void * a_prov_extra[2]; // Extra data used by provider - // Expire node. - nni_list_node a_expire_node; + nni_list_node a_expire_node; // Expiration node + struct nng_aio *a_reap_next; }; #endif // CORE_AIO_H diff --git a/src/core/aio_test.c b/src/core/aio_test.c index 3dab4b04..acf3c129 100644 --- a/src/core/aio_test.c +++ b/src/core/aio_test.c @@ -35,7 +35,7 @@ void test_sleep(void) { nng_time start; - nng_time end = 0; + nng_time end = 0; nng_aio *aio; NUTS_PASS(nng_aio_alloc(&aio, sleep_done, &end)); @@ -55,7 +55,7 @@ void test_sleep_timeout(void) { nng_time start; - nng_time end = 0; + nng_time end = 0; nng_aio *aio; NUTS_TRUE(nng_aio_alloc(&aio, sleep_done, &end) == 0); @@ -226,6 +226,22 @@ test_zero_timeout(void) NUTS_PASS(nng_close(s)); } +static void +aio_sleep_cb(void *arg) +{ + nng_aio *aio = *(nng_aio **) arg; + nng_aio_reap(aio); +} + +void +test_aio_reap(void) +{ + nng_aio *a; + NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a)); + nng_sleep_aio(10, a); + nng_msleep(20); +} + NUTS_TESTS = { { "sleep", test_sleep }, { "sleep timeout", test_sleep_timeout }, @@ -236,5 +252,6 @@ NUTS_TESTS = { { "explicit timeout", test_explicit_timeout }, { "inherited timeout", test_inherited_timeout }, { "zero timeout", test_zero_timeout }, + { "aio reap", test_aio_reap }, { NULL, NULL }, };
\ No newline at end of file @@ -1737,6 +1737,12 @@ nng_aio_free(nng_aio *aio) } void +nng_aio_reap(nng_aio *aio) +{ + nni_aio_reap(aio); +} + +void nng_sleep_aio(nng_duration ms, nng_aio *aio) { nni_sleep_aio(ms, aio); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index c6f5167a..3737cb7b 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -224,11 +224,10 @@ typedef struct http_txn { nni_http_res * res; nni_http_chunks *chunks; http_txn_state state; - nni_reap_item reap; } http_txn; static void -http_txn_reap(void *arg) +http_txn_fini(void *arg) { http_txn *txn = arg; if (txn->client != NULL) { @@ -239,7 +238,7 @@ http_txn_reap(void *arg) } } nni_http_chunks_free(txn->chunks); - nni_aio_free(txn->aio); + nni_aio_reap(txn->aio); NNI_FREE_STRUCT(txn); } @@ -270,7 +269,7 @@ http_txn_cb(void *arg) if ((rv = nni_aio_result(txn->aio)) != 0) { http_txn_finish_aios(txn, rv); nni_mtx_unlock(&http_txn_lk); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } switch (txn->state) { @@ -314,7 +313,7 @@ http_txn_cb(void *arg) // never transfers data), then we are done. http_txn_finish_aios(txn, 0); nni_mtx_unlock(&http_txn_lk); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } @@ -333,7 +332,7 @@ http_txn_cb(void *arg) // All done! http_txn_finish_aios(txn, 0); nni_mtx_unlock(&http_txn_lk); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; case HTTP_RECVING_CHUNKS: @@ -352,7 +351,7 @@ http_txn_cb(void *arg) } http_txn_finish_aios(txn, 0); nni_mtx_unlock(&http_txn_lk); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } @@ -360,7 +359,7 @@ error: http_txn_finish_aios(txn, rv); nni_http_conn_close(txn->conn); nni_mtx_unlock(&http_txn_lk); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); } static void @@ -411,7 +410,7 @@ nni_http_transact_conn( if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) { nni_mtx_unlock(&http_txn_lk); nni_aio_finish_error(aio, rv); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } nni_http_res_reset(txn->res); @@ -448,7 +447,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req, if ((rv = nni_http_req_set_header(req, "Connection", "close")) != 0) { nni_aio_finish_error(aio, rv); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } @@ -463,7 +462,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req, if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) { nni_mtx_unlock(&http_txn_lk); nni_aio_finish_error(aio, rv); - nni_reap(&txn->reap, http_txn_reap, txn); + http_txn_fini(txn); return; } nni_http_res_reset(txn->res); |
