aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h5
2 files changed, 71 insertions, 4 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 0ac70a0d..54ba7000 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -468,6 +468,71 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
return (true);
}
+bool
+nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
+{
+ nni_aio_expire_q *eq = aio->a_expire_q;
+ bool timeout = false;
+
+ if (!aio->a_sleep && !aio->a_use_expire) {
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ timeout = true;
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
+ break;
+ }
+ } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
+ timeout = true;
+ }
+
+ // Do this outside the lock. Note that we don't strictly need to have
+ // done this for the failure cases below (the task framework does the
+ // right thing if the task isn't prepped), but those should be uncommon
+ // cases and doing this here avoids nesting the locks.
+ nni_task_prep(&aio->a_task);
+
+ nni_mtx_lock(&eq->eq_mtx);
+ if (timeout) {
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_abort) {
+ aio->a_sleep = false;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_sleep = false;
+ aio->a_result = NNG_ECLOSED;
+ nni_mtx_unlock(&eq->eq_mtx);
+ nni_task_dispatch(&aio->a_task);
+ return (false);
+ }
+
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancel;
+ aio->a_cancel_arg = data;
+
+ // We only schedule expiration if we have a way for the expiration
+ // handler to actively cancel it.
+ if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
+ nni_aio_expire_add(aio);
+ }
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (true);
+}
+
// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
@@ -826,9 +891,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
- if (nni_aio_begin(aio) != 0) {
- return;
- }
aio->a_expire_ok = true;
aio->a_sleep = true;
switch (aio->a_timeout) {
@@ -848,7 +910,7 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms;
// we don't do anything else here, so we can ignore the return
- (void) nni_aio_defer(aio, nni_sleep_cancel, NULL);
+ (void) nni_aio_start(aio, nni_sleep_cancel, NULL);
}
static bool
diff --git a/src/core/aio.h b/src/core/aio.h
index 8628d8ef..b09b6e49 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -175,6 +175,11 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
// or was canceled before this call (but after nni_aio_begin).
extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
+// nni_aio_start should be called before any asynchronous operation
+// is filed. It need not be called for completions that are synchronous
+// at job submission.
+extern bool nni_aio_start(nni_aio *, nni_aio_cancel_fn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
// nni_aio_completion_list is used after removing the aio from an