aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c56
1 files changed, 44 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 91b5c85a..5807869b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "core/taskq.h"
#include <string.h>
struct nni_aio_expire_q {
@@ -18,6 +19,7 @@ struct nni_aio_expire_q {
nni_thr eq_thr;
nni_time eq_next; // next expiration
bool eq_exit;
+ bool eq_stop;
};
static nni_aio_expire_q **nni_aio_expire_q_list;
@@ -343,7 +345,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
// We should not reschedule anything at this point.
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
@@ -380,7 +382,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
- if (aio->a_stop || eq->eq_exit) {
+ if (aio->a_stop || eq->eq_stop) {
nni_task_abort(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ECLOSED);
@@ -595,16 +597,15 @@ nni_aio_expire_loop(void *arg)
nni_mtx_unlock(mtx);
return;
}
- if (now < next) {
- // Early wake up (just to reschedule), no need to
- // rescan the list. This is an optimization.
+ if (now < next && !(q->eq_stop && aio != NULL)) {
+ // nothing to do!
nni_cv_until(cv, next);
continue;
}
q->eq_next = NNI_TIME_NEVER;
exp_idx = 0;
while (aio != NULL) {
- if ((aio->a_expire < now) &&
+ if ((q->eq_stop || aio->a_expire < now) &&
(exp_idx < NNI_EXPIRE_BATCH)) {
nni_aio *nxt;
@@ -627,7 +628,14 @@ nni_aio_expire_loop(void *arg)
for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
- rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ if (q->eq_stop) {
+ rv = NNG_ECANCELED;
+ } else if (aio->a_expire_ok) {
+ aio->a_expire_ok = false;
+ rv = 0;
+ } else {
+ rv = NNG_ETIMEDOUT;
+ }
nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn;
void *cancel_arg = aio->a_cancel_arg;
@@ -639,7 +647,15 @@ nni_aio_expire_loop(void *arg)
// If there is no cancellation function, then we cannot
// terminate the aio - we've tried, but it has to run
// to its natural conclusion.
- if (cancel_fn != NULL) {
+ //
+ // For the special case of sleeping, we don't need to
+ // drop the lock and call the cancel function, we are
+ // already doing it right here!
+ if (aio->a_sleep) {
+ aio->a_result = rv;
+ aio->a_sleep = false;
+ nni_task_dispatch(&aio->a_task);
+ } else if (cancel_fn != NULL) {
nni_mtx_unlock(mtx);
cancel_fn(aio, cancel_arg, rv);
nni_mtx_lock(mtx);
@@ -647,10 +663,6 @@ nni_aio_expire_loop(void *arg)
aio->a_expiring = false;
}
nni_cv_wake(cv);
-
- if (now < q->eq_next) {
- nni_cv_until(cv, q->eq_next);
- }
}
}
@@ -769,11 +781,23 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
}
static void
+nni_aio_expire_q_stop(nni_aio_expire_q *eq)
+{
+ if (eq != NULL && !eq->eq_stop) {
+ nni_mtx_lock(&eq->eq_mtx);
+ eq->eq_stop = true;
+ nni_cv_wake(&eq->eq_cv);
+ nni_mtx_unlock(&eq->eq_mtx);
+ }
+}
+
+static void
nni_aio_expire_q_free(nni_aio_expire_q *eq)
{
if (eq == NULL) {
return;
}
+ NNI_ASSERT(eq->eq_stop);
if (!eq->eq_exit) {
nni_mtx_lock(&eq->eq_mtx);
eq->eq_exit = true;
@@ -811,6 +835,14 @@ nni_aio_expire_q_alloc(void)
}
void
+nni_aio_sys_stop(void)
+{
+ for (int i = 0; i < nni_aio_expire_q_cnt; i++) {
+ nni_aio_expire_q_stop(nni_aio_expire_q_list[i]);
+ }
+}
+
+void
nni_aio_sys_fini(void)
{
for (int i = 0; i < nni_aio_expire_q_cnt; i++) {