aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c53
1 files changed, 41 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index f039cdc8..87c79d0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
@@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio)
return (0);
}
+void
+nni_aio_reset(nni_aio *aio)
+{
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
+
+ for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
+ aio->a_outputs[i] = NULL;
+ }
+}
+
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
@@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
+ aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
}
@@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
} else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
timeout = true;
}
+ if (!aio->a_sleep) {
+ aio->a_expire_ok = false;
+ }
+ aio->a_result = 0;
// Do this outside the lock. Note that we don't strictly need to have
// done this for the failure cases below (the task framework does the
@@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);
nni_mtx_lock(&eq->eq_mtx);
+ NNI_ASSERT(!aio->a_stopped);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
- aio->a_sleep = false;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (timeout) {
- aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ nni_aio_reset(aio);
aio->a_expire_ok = true;
aio->a_sleep = true;
switch (aio->a_timeout) {