From 06ebeefb102b223ff77dd47e16b64bd9575f7c34 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 26 Dec 2024 10:20:33 -0800 Subject: aio: introduce NNG_ESTOPPED This error code results when an AIO is stopped permanently, as a result of nni_aio_close or nni_aio_stop. The associated AIO object cannot be used again. This discrimantes against a file being closed, or a temporary cancellation which might allow the aio to be reused. Consumers must check for this error status in their callbacks, and not resubmit an operation that failed with this error. Doing so, will result in an infinite loop of submit / errors. --- src/core/aio.c | 56 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 23 deletions(-) (limited to 'src/core/aio.c') diff --git a/src/core/aio.c b/src/core/aio.c index 54ba7000..f039cdc8 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -60,11 +60,10 @@ static int nni_aio_expire_q_cnt; // when we want to know if the operation itself is complete. // // In order to guard against aio reuse during teardown, we set the a_stop -// flag. Any attempt to initialize for a new operation after that point -// will fail and the caller will get NNG_ECANCELED indicating this. The -// provider that calls nni_aio_begin() MUST check the return value, and -// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the -// request and return. +// flag. Any attempt to submit new operation after that point will fail with +// the status NNG_ESTOPPED indicating this. The provider that calls +// nni_aio_begin() MUST check the return value, and if it comes back +// NNG_ESTOPPED then it must simply discard the request and // return. // // Calling nni_aio_wait waits for the current outstanding operation to // complete, but does not block another one from being started on the @@ -118,7 +117,7 @@ nni_aio_fini(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); + fn(aio, arg, NNG_ESTOPPED); } nni_task_fini(&aio->a_task); @@ -201,7 +200,7 @@ nni_aio_stop(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECANCELED); + fn(aio, arg, NNG_ESTOPPED); } nni_aio_wait(aio); @@ -226,7 +225,7 @@ nni_aio_close(nni_aio *aio) nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { - fn(aio, arg, NNG_ECLOSED); + fn(aio, arg, NNG_ESTOPPED); } } } @@ -353,12 +352,13 @@ nni_aio_begin(nni_aio *aio) // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { - aio->a_result = NNG_ECANCELED; + aio->a_result = NNG_ESTOPPED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; + aio->a_stop = true; nni_mtx_unlock(&eq->eq_mtx); - return (NNG_ECANCELED); + return (NNG_ESTOPPED); } nni_task_prep(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); @@ -386,15 +386,17 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; + nni_mtx_unlock(&eq->eq_mtx); + return (NNG_ESTOPPED); + } if (aio->a_abort) { int rv = aio->a_result; nni_mtx_unlock(&eq->eq_mtx); + NNI_ASSERT(rv != 0); return (rv); } - if (aio->a_stop || eq->eq_stop) { - nni_mtx_unlock(&eq->eq_mtx); - return (NNG_ECLOSED); - } NNI_ASSERT(aio->a_cancel_fn == NULL); aio->a_cancel_fn = cancel; @@ -434,22 +436,25 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } nni_mtx_lock(&eq->eq_mtx); - if (timeout) { + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_result = NNG_ESTOPPED; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { aio->a_sleep = false; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } - if (aio->a_stop || eq->eq_stop) { + if (timeout) { aio->a_sleep = false; - aio->a_result = NNG_ECLOSED; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -499,22 +504,26 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_task_prep(&aio->a_task); nni_mtx_lock(&eq->eq_mtx); - if (timeout) { + if (aio->a_stop || eq->eq_stop) { + aio->a_stop = true; aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_result = NNG_ESTOPPED; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { aio->a_sleep = false; + aio->a_abort = false; + NNI_ASSERT(aio->a_result != 0); nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } - if (aio->a_stop || eq->eq_stop) { + if (timeout) { aio->a_sleep = false; - aio->a_result = NNG_ECLOSED; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_abort = false; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -767,7 +776,8 @@ nni_aio_expire_loop(void *arg) for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; if (q->eq_stop) { - rv = NNG_ECANCELED; + rv = NNG_ESTOPPED; + aio->a_stop = true; } else if (aio->a_expire_ok) { aio->a_expire_ok = false; rv = 0; -- cgit v1.2.3-70-g09d2