aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-26 12:13:49 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-26 12:53:46 -0800
commitb5826dac78e75520c26f2d84a952fe04d69fa2d3 (patch)
tree707d73b1e52c638c9a4843e3480b3190131b111c /src/core/aio.c
parent9cece0bdd4c044f029997b85b73d0b49f80ad1e6 (diff)
downloadnng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.tar.gz
nng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.tar.bz2
nng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.zip
aio: introduce nni_aio_reset to reset the aio before submitting more work
This allows some use cases to reset things like the counts and outputs, before submitting more jobs. Providers should call this near the top of their functions; this is done without any locks so it should be very fast.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c53
1 files changed, 41 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index f039cdc8..87c79d0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
@@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio)
return (0);
}
+void
+nni_aio_reset(nni_aio *aio)
+{
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
+
+ for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
+ aio->a_outputs[i] = NULL;
+ }
+}
+
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
@@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
+ aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
}
@@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
} else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
timeout = true;
}
+ if (!aio->a_sleep) {
+ aio->a_expire_ok = false;
+ }
+ aio->a_result = 0;
// Do this outside the lock. Note that we don't strictly need to have
// done this for the failure cases below (the task framework does the
@@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);
nni_mtx_lock(&eq->eq_mtx);
+ NNI_ASSERT(!aio->a_stopped);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
- aio->a_sleep = false;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (timeout) {
- aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ nni_aio_reset(aio);
aio->a_expire_ok = true;
aio->a_sleep = true;
switch (aio->a_timeout) {