diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 70 |
1 files changed, 66 insertions, 4 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 0ac70a0d..54ba7000 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -468,6 +468,71 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) return (true); } +bool +nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) +{ + nni_aio_expire_q *eq = aio->a_expire_q; + bool timeout = false; + + if (!aio->a_sleep && !aio->a_use_expire) { + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + timeout = true; + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; + break; + } + } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { + timeout = true; + } + + // Do this outside the lock. Note that we don't strictly need to have + // done this for the failure cases below (the task framework does the + // right thing if the task isn't prepped), but those should be uncommon + // cases and doing this here avoids nesting the locks. + nni_task_prep(&aio->a_task); + + nni_mtx_lock(&eq->eq_mtx); + if (timeout) { + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_abort) { + aio->a_sleep = false; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_stop || eq->eq_stop) { + aio->a_sleep = false; + aio->a_result = NNG_ECLOSED; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancel; + aio->a_cancel_arg = data; + + // We only schedule expiration if we have a way for the expiration + // handler to actively cancel it. + if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { + nni_aio_expire_add(aio); + } + nni_mtx_unlock(&eq->eq_mtx); + return (true); +} + // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void @@ -826,9 +891,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { - if (nni_aio_begin(aio) != 0) { - return; - } aio->a_expire_ok = true; aio->a_sleep = true; switch (aio->a_timeout) { @@ -848,7 +910,7 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms; // we don't do anything else here, so we can ignore the return - (void) nni_aio_defer(aio, nni_sleep_cancel, NULL); + (void) nni_aio_start(aio, nni_sleep_cancel, NULL); } static bool |
