aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-07 10:03:30 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-07 11:20:14 -0800
commit8fa3b2aa8e9191669f137be39ba61ad39243483a (patch)
tree8c9e39ab5bbcce38a1bbe78a2badb1145e481aae /src/core/aio.c
parenta02b1c7040c77f2549bfee16af36688f6b20ae63 (diff)
downloadnng-8fa3b2aa8e9191669f137be39ba61ad39243483a.tar.gz
nng-8fa3b2aa8e9191669f137be39ba61ad39243483a.tar.bz2
nng-8fa3b2aa8e9191669f137be39ba61ad39243483a.zip
aio: separate stop / shutdown from fini (deallocate)
Probably other subsystems should get the same treatment. We need to basically start the process of shutting down so that subsystems know to cease operation before we rip memory out from underneath them. This ensures that no new operations can be started as well, once we have begun the process of teardown. We also enhanced the completion of sleep to avoid some extra locking contention, since the expiration *is* the completion. Includes a test for this case.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c56
1 files changed, 44 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 91b5c85a..5807869b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "core/taskq.h"
#include <string.h>
struct nni_aio_expire_q {
@@ -18,6 +19,7 @@ struct nni_aio_expire_q {
nni_thr eq_thr;
nni_time eq_next; // next expiration
bool eq_exit;
+ bool eq_stop;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
// We should not reschedule anything at this point.
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
@@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
@@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg)
nni_mtx_unlock(mtx);
return;
}
- if (now < next) {
- // Early wake up (just to reschedule), no need to
- // rescan the list. This is an optimization.
+ if (now < next && !(q->eq_stop && aio != NULL)) {
+ // nothing to do!
nni_cv_until(cv, next);
continue;
}
q->eq_next = NNI_TIME_NEVER;
exp_idx = 0;
while (aio != NULL) {
- if ((aio->a_expire < now) &&
+ if ((q->eq_stop || aio->a_expire < now) &&
(exp_idx < NNI_EXPIRE_BATCH)) {
nni_aio *nxt;
@@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg)
for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ if (q->eq_stop) {
+ rv = NNG_ECANCELED;
+ } else if (aio->a_expire_ok) {
+ aio->a_expire_ok = false;
+ rv = 0;
+ } else {
+ rv = NNG_ETIMEDOUT;
+ }
nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
void *cancel_arg = aio->a_cancel_arg;
@@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg)
// If there is no cancellation function, then we cannot
// terminate the aio - we've tried, but it has to run
// to its natural conclusion.
- if (cancel_fn != NULL) {
+ //
+ // For the special case of sleeping, we don't need to
+ // drop the lock and call the cancel function, we are
+ // already doing it right here!
+ if (aio->a_sleep) {
+ aio->a_result = rv;
+ aio->a_sleep = false;
+ nni_task_dispatch(&aio->a_task);
+ } else if (cancel_fn != NULL) {
nni_mtx_unlock(mtx);
cancel_fn(aio, cancel_arg, rv);
nni_mtx_lock(mtx);
@@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg)
aio->a_expiring = false;
}
nni_cv_wake(cv);
-
- if (now < q->eq_next) {
- nni_cv_until(cv, q->eq_next);
- }
}
}
@@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
static void
+nni_aio_expire_q_stop(nni_aio_expire_q *eq)
+{
+ if (eq != NULL && !eq->eq_stop) {
+ nni_mtx_lock(&eq->eq_mtx);
+ eq->eq_stop = true;
+ nni_cv_wake(&eq->eq_cv);
+ nni_mtx_unlock(&eq->eq_mtx);
+ }
+}
+
+static void
nni_aio_expire_q_free(nni_aio_expire_q *eq)
{
if (eq == NULL) {
return;
}
+ NNI_ASSERT(eq->eq_stop);
if (!eq->eq_exit) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_exit = true;
@@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void)
}
void
+nni_aio_sys_stop(void)
+{
+ for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
+ nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
+ }
+}
+
+void
nni_aio_sys_fini(void)
{
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {