diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 70 |
1 files changed, 64 insertions, 6 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index eaff5c80..0ac70a0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "core/taskq.h" #include <string.h> @@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_cancel_fn = NULL; aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; - aio->a_expire_ok = false; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECANCELED); @@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) return (0); } +bool +nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) +{ + nni_aio_expire_q *eq = aio->a_expire_q; + bool timeout = false; + + if (!aio->a_sleep && !aio->a_use_expire) { + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + timeout = true; + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; + break; + } + } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { + timeout = true; + } + + nni_mtx_lock(&eq->eq_mtx); + if (timeout) { + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_abort) { + aio->a_sleep = false; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_stop || eq->eq_stop) { + aio->a_sleep = false; + aio->a_result = NNG_ECLOSED; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancel; + aio->a_cancel_arg = data; + + // We only schedule expiration if we have a way for the expiration + // handler to actively cancel it. + if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { + nni_aio_expire_add(aio); + } + nni_mtx_unlock(&eq->eq_mtx); + return (true); +} + // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void @@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) aio->a_expire = ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms; - if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { - nni_aio_finish_error(aio, rv); - } + // we don't do anything else here, so we can ignore the return + (void) nni_aio_defer(aio, nni_sleep_cancel, NULL); } static bool |
