aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-07 11:44:22 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-07 11:56:22 -0800
commit71e459eea31e9e47c0ce64a78e32b242d357f9a0 (patch)
treec4f6608066aaa6505a5625f399c94845abe6c162 /src/core
parent8fa3b2aa8e9191669f137be39ba61ad39243483a (diff)
downloadnng-71e459eea31e9e47c0ce64a78e32b242d357f9a0.tar.gz
nng-71e459eea31e9e47c0ce64a78e32b242d357f9a0.tar.bz2
nng-71e459eea31e9e47c0ce64a78e32b242d357f9a0.zip
fini: add drain mechanism for aio, reap, and task subsystems
Make sure *everything* is drained before proceeding all the way to deallocation.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c20
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/init.c11
-rw-r--r--src/core/reap.c11
-rw-r--r--src/core/reap.h7
-rw-r--r--src/core/taskq.c20
-rw-r--r--src/core/taskq.h1
7 files changed, 56 insertions, 16 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 5807869b..bb8347dd 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -780,15 +780,21 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
}
-static void
+static bool
nni_aio_expire_q_stop(nni_aio_expire_q *eq)
{
- if (eq != NULL && !eq->eq_stop) {
+ bool result = false;
+ if (eq != NULL) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_stop = true;
nni_cv_wake(&eq->eq_cv);
+ while (!nni_list_empty(&eq->eq_list)) {
+ result = true;
+ nni_cv_wait(&eq->eq_cv);
+ }
nni_mtx_unlock(&eq->eq_mtx);
}
+ return (result);
}
static void
@@ -834,12 +840,16 @@ nni_aio_expire_q_alloc(void)
return (eq);
}
-void
-nni_aio_sys_stop(void)
+bool
+nni_aio_sys_drain(void)
{
+ bool result = false;
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
- nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
+ if (nni_aio_expire_q_stop(nni_aio_expire_q_list[i])) {
+ result = true;
+ }
}
+ return (result);
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index f8c6730f..f56d2f58 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -193,7 +193,7 @@ extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);
extern int nni_aio_sys_init(nng_init_params *);
-extern void nni_aio_sys_stop(void);
+extern bool nni_aio_sys_drain(void);
extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
diff --git a/src/core/init.c b/src/core/init.c
index fa07919e..4b31995a 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -133,13 +133,18 @@ nng_fini(void)
nni_atomic_flag_reset(&init_busy);
return;
}
- nni_aio_sys_stop(); // no more scheduling allowed!
nni_sock_closeall();
nni_sp_tran_sys_fini();
+
+ // Drain everything. This is important because some of
+ // these subsystems can dispatch things to other ones.
+ // So we need them *all* to be empty before proceeding.
+ while ((nni_aio_sys_drain() || nni_taskq_sys_drain() ||
+ nni_reap_sys_drain())) {
+ continue;
+ }
nni_tls_sys_fini();
- nni_reap_drain();
nni_taskq_sys_fini();
- nni_reap_drain();
nni_aio_sys_fini();
nni_id_map_sys_fini();
nni_reap_sys_fini(); // must be near the end
diff --git a/src/core/reap.c b/src/core/reap.c
index 3f182205..f996695d 100644
--- a/src/core/reap.c
+++ b/src/core/reap.c
@@ -18,9 +18,9 @@
static nni_reap_list *reap_list = NULL;
static nni_thr reap_thr;
static bool reap_exit = false;
-static nni_mtx reap_mtx = NNI_MTX_INITIALIZER;
+static nni_mtx reap_mtx = NNI_MTX_INITIALIZER;
static bool reap_empty;
-static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx);
+static nni_cv reap_work_cv = NNI_CV_INITIALIZER(&reap_mtx);
static nni_cv reap_empty_cv = NNI_CV_INITIALIZER(&reap_mtx);
static void
@@ -90,14 +90,17 @@ nni_reap(nni_reap_list *rl, void *item)
nni_mtx_unlock(&reap_mtx);
}
-void
-nni_reap_drain(void)
+bool
+nni_reap_sys_drain(void)
{
+ bool result = false;
nni_mtx_lock(&reap_mtx);
while (!reap_empty) {
+ result = true;
nni_cv_wait(&reap_empty_cv);
}
nni_mtx_unlock(&reap_mtx);
+ return (result);
}
int
diff --git a/src/core/reap.h b/src/core/reap.h
index 5f631885..221fd17f 100644
--- a/src/core/reap.h
+++ b/src/core/reap.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -56,8 +56,9 @@ struct nni_reap_list {
extern void nni_reap(nni_reap_list *, void *);
-// nni_reap_drain waits for the reap queue to be drained.
-extern void nni_reap_drain(void);
+// nni_reap_sys_drain waits for the reap queue to be drained.
+// It returns true if it found anything to wait for.
+extern bool nni_reap_sys_drain(void);
extern int nni_reap_sys_init(void);
extern void nni_reap_sys_fini(void);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 496c2fab..1f0ae1b6 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -59,6 +59,7 @@ nni_taskq_thread(void *self)
continue;
}
+ nni_cv_wake(&tq->tq_wait_cv);
if (!tq->tq_run) {
break;
}
@@ -127,6 +128,19 @@ nni_taskq_fini(nni_taskq *tq)
NNI_FREE_STRUCT(tq);
}
+bool
+nni_taskq_drain(nni_taskq *tq)
+{
+ bool result = false;
+ nni_mtx_lock(&tq->tq_mtx);
+ while (!nni_list_empty(&tq->tq_tasks)) {
+ result = true;
+ nni_cv_wait(&tq->tq_wait_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+ return (result);
+}
+
void
nni_task_exec(nni_task *task)
{
@@ -263,6 +277,12 @@ nni_taskq_sys_init(nng_init_params *params)
return (nni_taskq_init(&nni_taskq_systq, (int) num_thr));
}
+bool
+nni_taskq_sys_drain(void)
+{
+ return (nni_taskq_drain(nni_taskq_systq));
+}
+
void
nni_taskq_sys_fini(void)
{
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 498b4f37..d3299433 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -62,6 +62,7 @@ extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *);
extern void nni_task_fini(nni_task *);
extern int nni_taskq_sys_init(nng_init_params *);
+extern bool nni_taskq_sys_drain(void);
extern void nni_taskq_sys_fini(void);
// nni_task implementation details are not to be used except by the