aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c56
-rw-r--r--src/core/aio.h1
-rw-r--r--src/core/aio_test.c42
-rw-r--r--src/core/init.c6
4 files changed, 91 insertions, 14 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 91b5c85a..5807869b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "core/taskq.h"
#include <string.h>
struct nni_aio_expire_q {
@@ -18,6 +19,7 @@ struct nni_aio_expire_q {
nni_thr eq_thr;
nni_time eq_next; // next expiration
bool eq_exit;
+ bool eq_stop;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
// We should not reschedule anything at this point.
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
@@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
@@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg)
nni_mtx_unlock(mtx);
return;
}
- if (now < next) {
- // Early wake up (just to reschedule), no need to
- // rescan the list. This is an optimization.
+ if (now < next && !(q->eq_stop && aio != NULL)) {
+ // nothing to do!
nni_cv_until(cv, next);
continue;
}
q->eq_next = NNI_TIME_NEVER;
exp_idx = 0;
while (aio != NULL) {
- if ((aio->a_expire < now) &&
+ if ((q->eq_stop || aio->a_expire < now) &&
(exp_idx < NNI_EXPIRE_BATCH)) {
nni_aio *nxt;
@@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg)
for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ if (q->eq_stop) {
+ rv = NNG_ECANCELED;
+ } else if (aio->a_expire_ok) {
+ aio->a_expire_ok = false;
+ rv = 0;
+ } else {
+ rv = NNG_ETIMEDOUT;
+ }
nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
void *cancel_arg = aio->a_cancel_arg;
@@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg)
// If there is no cancellation function, then we cannot
// terminate the aio - we've tried, but it has to run
// to its natural conclusion.
- if (cancel_fn != NULL) {
+ //
+ // For the special case of sleeping, we don't need to
+ // drop the lock and call the cancel function, we are
+ // already doing it right here!
+ if (aio->a_sleep) {
+ aio->a_result = rv;
+ aio->a_sleep = false;
+ nni_task_dispatch(&aio->a_task);
+ } else if (cancel_fn != NULL) {
nni_mtx_unlock(mtx);
cancel_fn(aio, cancel_arg, rv);
nni_mtx_lock(mtx);
@@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg)
aio->a_expiring = false;
}
nni_cv_wake(cv);
-
- if (now < q->eq_next) {
- nni_cv_until(cv, q->eq_next);
- }
}
}
@@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
static void
+nni_aio_expire_q_stop(nni_aio_expire_q *eq)
+{
+ if (eq != NULL && !eq->eq_stop) {
+ nni_mtx_lock(&eq->eq_mtx);
+ eq->eq_stop = true;
+ nni_cv_wake(&eq->eq_cv);
+ nni_mtx_unlock(&eq->eq_mtx);
+ }
+}
+
+static void
nni_aio_expire_q_free(nni_aio_expire_q *eq)
{
if (eq == NULL) {
return;
}
+ NNI_ASSERT(eq->eq_stop);
if (!eq->eq_exit) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_exit = true;
@@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void)
}
void
+nni_aio_sys_stop(void)
+{
+ for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
+ nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
+ }
+}
+
+void
nni_aio_sys_fini(void)
{
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
diff --git a/src/core/aio.h b/src/core/aio.h
index 50cb266b..f8c6730f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -193,6 +193,7 @@ extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);
extern int nni_aio_sys_init(nng_init_params *);
+extern void nni_aio_sys_stop(void);
extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
diff --git a/src/core/aio_test.c b/src/core/aio_test.c
index 3e040fe1..f18eb5ed 100644
--- a/src/core/aio_test.c
+++ b/src/core/aio_test.c
@@ -72,6 +72,46 @@ test_sleep_timeout(void)
nng_aio_free(aio);
}
+static void
+sleep_reap(void *arg)
+{
+ nng_aio *aio = *(nng_aio **) arg;
+ if (nng_aio_result(aio) != NNG_ECANCELED) {
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ }
+ nng_aio_reap(aio);
+}
+
+static void
+test_sleep_fini(void)
+{
+ static nng_aio *aio;
+ NUTS_TRUE(nng_aio_alloc(&aio, sleep_reap, &aio) == 0);
+ nng_sleep_aio(20000, aio);
+ nng_msleep(1);
+ // intentionally we do not free the aio here. reap should clean it.
+ nng_fini();
+ nng_init(NULL); // so that TEST_FINI will reap
+}
+
+static void
+test_sleep_fini_many(void)
+{
+#define NIOS 2000
+ static nng_aio *aios[NIOS];
+ for (int i = 0; i < NIOS; i++) {
+ int rv = nng_aio_alloc(&(aios[i]), sleep_reap, &(aios[i]));
+ if (rv != 0) {
+ NUTS_ASSERT(rv == 0);
+ }
+ }
+ for (int i = 0; i < NIOS; i++) {
+ nng_sleep_aio(20000, aios[i]);
+ }
+ nng_fini();
+ nng_init(NULL);
+}
+
void
test_insane_nio(void)
{
@@ -400,6 +440,8 @@ test_aio_busy(void)
NUTS_TESTS = {
{ "sleep", test_sleep },
{ "sleep timeout", test_sleep_timeout },
+ { "sleep fini", test_sleep_fini },
+ { "sleep fini many", test_sleep_fini_many },
{ "insane nio", test_insane_nio },
{ "provider cancel", test_provider_cancel },
{ "consumer cancel", test_consumer_cancel },
diff --git a/src/core/init.c b/src/core/init.c
index 6672beb1..fa07919e 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -133,14 +133,16 @@ nng_fini(void)
nni_atomic_flag_reset(&init_busy);
return;
}
+ nni_aio_sys_stop(); // no more scheduling allowed!
nni_sock_closeall();
nni_sp_tran_sys_fini();
nni_tls_sys_fini();
nni_reap_drain();
- nni_aio_sys_fini();
nni_taskq_sys_fini();
- nni_reap_sys_fini(); // must be before timer and aio (expire)
+ nni_reap_drain();
+ nni_aio_sys_fini();
nni_id_map_sys_fini();
+ nni_reap_sys_fini(); // must be near the end
nni_plat_fini();
nni_atomic_flag_reset(&init_busy);
}