aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-12-10 22:17:23 -0800
committerGarrett D'Amore <garrett@damore.org>2020-12-12 11:32:51 -0800
commit2033988343bce413763d3e9664e3e8372da48591 (patch)
treef39ce75c40dd94f95884d7d4c43a76df510a86bf /src/core/aio.c
parentb45f876d005371f62fc261a5584c4d7dafd7a0f7 (diff)
downloadnng-2033988343bce413763d3e9664e3e8372da48591.tar.gz
nng-2033988343bce413763d3e9664e3e8372da48591.tar.bz2
nng-2033988343bce413763d3e9664e3e8372da48591.zip
fixes #1313 support deferred nng_aio destruction
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c134
1 files changed, 104 insertions, 30 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 1a8739e4..b910a600 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -14,11 +14,18 @@
static nni_mtx nni_aio_lk;
// These are used for expiration.
static nni_cv nni_aio_expire_cv;
-static int nni_aio_expire_run;
+static bool nni_aio_expire_exit;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_list;
static nni_aio *nni_aio_expire_aio;
+// Reaping items.
+static nni_thr nni_aio_reap_thr;
+static nni_aio *nni_aio_reap_list;
+static nni_mtx nni_aio_reap_lk;
+static nni_cv nni_aio_reap_cv;
+static bool nni_aio_reap_exit;
+
// Design notes.
//
// AIOs are only ever "completed" by the provider, which must call
@@ -76,7 +83,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
// TODO: This probably could just use nni_aio_stop.
@@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio)
}
}
+void
+nni_aio_reap(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_mtx_lock(&nni_aio_reap_lk);
+ aio->a_reap_next = nni_aio_reap_list;
+ nni_aio_reap_list = aio;
+ nni_cv_wake1(&nni_aio_reap_cv);
+ nni_mtx_unlock(&nni_aio_reap_lk);
+ }
+}
+
int
nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
{
@@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -347,7 +366,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
- void * arg;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
@@ -471,13 +490,13 @@ nni_aio_expire_loop(void *unused)
NNI_ARG_UNUSED(unused);
- nni_thr_set_name(NULL, "nng:aio:expire");
+ nni_thr_set_name(NULL, "nng:aio:expire");
for (;;) {
nni_aio_cancel_fn fn;
- nni_time now;
- nni_aio * aio;
- int rv;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
now = nni_clock();
@@ -485,7 +504,7 @@ nni_aio_expire_loop(void *unused)
if ((aio = nni_list_first(list)) == NULL) {
- if (nni_aio_expire_run == 0) {
+ if (nni_aio_expire_exit) {
nni_mtx_unlock(&nni_aio_lk);
return;
}
@@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused)
}
}
+static void
+nni_aio_reap_loop(void *unused)
+{
+ NNI_ARG_UNUSED(unused);
+
+ nni_thr_set_name(NULL, "nng:aio:reap");
+
+ nni_mtx_lock(&nni_aio_reap_lk);
+
+ for (;;) {
+ nni_aio *aio;
+
+ if ((aio = nni_aio_reap_list) == NULL) {
+ if (nni_aio_reap_exit) {
+ break;
+ }
+
+ nni_cv_wait(&nni_aio_reap_cv);
+ continue;
+ }
+ nni_aio_reap_list = NULL;
+ nni_mtx_unlock(&nni_aio_reap_lk);
+
+ while (aio != NULL) {
+ nni_aio *old = aio;
+ aio = aio->a_reap_next;
+ nni_aio_free(old);
+ }
+
+ nni_mtx_lock(&nni_aio_reap_lk);
+ }
+
+ nni_mtx_unlock(&nni_aio_reap_lk);
+}
+
void *
nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
{
@@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
void
nni_aio_sys_fini(void)
{
- nni_mtx *mtx = &nni_aio_lk;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_thr *thr = &nni_aio_expire_thr;
+ nni_mtx *mtx1 = &nni_aio_lk;
+ nni_cv * cv1 = &nni_aio_expire_cv;
+ nni_thr *thr1 = &nni_aio_expire_thr;
+ nni_mtx *mtx2 = &nni_aio_reap_lk;
+ nni_cv * cv2 = &nni_aio_reap_cv;
+ nni_thr *thr2 = &nni_aio_reap_thr;
- if (nni_aio_expire_run) {
- nni_mtx_lock(mtx);
- nni_aio_expire_run = 0;
- nni_cv_wake(cv);
- nni_mtx_unlock(mtx);
+ if (!nni_aio_expire_exit) {
+ nni_mtx_lock(mtx1);
+ nni_aio_expire_exit = true;
+ nni_cv_wake(cv1);
+ nni_mtx_unlock(mtx1);
}
- nni_thr_fini(thr);
- nni_cv_fini(cv);
- nni_mtx_fini(mtx);
+ if (!nni_aio_reap_exit) {
+ nni_mtx_lock(mtx2);
+ nni_aio_reap_exit = true;
+ nni_cv_wake(cv2);
+ nni_mtx_unlock(mtx2);
+ }
+
+ nni_thr_fini(thr1);
+ nni_cv_fini(cv1);
+ nni_mtx_fini(mtx1);
+
+ nni_thr_fini(thr2);
+ nni_cv_fini(cv2);
+ nni_mtx_fini(mtx2);
}
int
nni_aio_sys_init(void)
{
- int rv;
- nni_mtx *mtx = &nni_aio_lk;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_thr *thr = &nni_aio_expire_thr;
+ int rv, rv1, rv2;
+ nni_thr *thr1 = &nni_aio_expire_thr;
+ nni_thr *thr2 = &nni_aio_reap_thr;
NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
- nni_mtx_init(mtx);
- nni_cv_init(cv, mtx);
+ nni_mtx_init(&nni_aio_lk);
+ nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
+ nni_mtx_init(&nni_aio_reap_lk);
+ nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk);
+
+ nni_aio_expire_exit = false;
+ nni_aio_reap_exit = false;
- if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
+ rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL);
+ rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL);
+ if (((rv = rv1) != 0) || ((rv = rv2) != 0)) {
nni_aio_sys_fini();
return (rv);
}
- nni_aio_expire_run = 1;
- nni_thr_run(thr);
+ nni_thr_run(thr1);
+ nni_thr_run(thr2);
return (0);
}