From c8ce57d668d73d92a071fa86f81e07ca403d8672 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 9 Dec 2024 13:33:11 -0800 Subject: aio: introduce nni_aio_defer This will replace nni_aio_schedule, and it includes finishing the task if needed. It does so without dropping the lock and so is more efficient and race free. This includes some conversion of some subsystems to it. --- src/core/aio.c | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 6 deletions(-) (limited to 'src/core/aio.c') diff --git a/src/core/aio.c b/src/core/aio.c index eaff5c80..0ac70a0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "core/taskq.h" #include @@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_cancel_fn = NULL; aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; - aio->a_expire_ok = false; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECANCELED); @@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) return (0); } +bool +nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) +{ + nni_aio_expire_q *eq = aio->a_expire_q; + bool timeout = false; + + if (!aio->a_sleep && !aio->a_use_expire) { + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + timeout = true; + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; + break; + } + } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { + timeout = true; + } + + nni_mtx_lock(&eq->eq_mtx); + if (timeout) { + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_abort) { + aio->a_sleep = false; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_stop || eq->eq_stop) { + aio->a_sleep = false; + aio->a_result = NNG_ECLOSED; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancel; + aio->a_cancel_arg = data; + + // We only schedule expiration if we have a way for the expiration + // handler to actively cancel it. + if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { + nni_aio_expire_add(aio); + } + nni_mtx_unlock(&eq->eq_mtx); + return (true); +} + // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void @@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) aio->a_expire = ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms; - if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { - nni_aio_finish_error(aio, rv); - } + // we don't do anything else here, so we can ignore the return + (void) nni_aio_defer(aio, nni_sleep_cancel, NULL); } static bool -- cgit v1.2.3-70-g09d2