diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 53 |
1 files changed, 41 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index f039cdc8..87c79d0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); @@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio) return (0); } +void +nni_aio_reset(nni_aio *aio) +{ + aio->a_result = 0; + aio->a_count = 0; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; + + for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { + aio->a_outputs[i] = NULL; + } +} + int nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { @@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; + aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); } @@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { timeout = true; } + if (!aio->a_sleep) { + aio->a_expire_ok = false; + } + aio->a_result = 0; // Do this outside the lock. Note that we don't strictly need to have // done this for the failure cases below (the task framework does the @@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_task_prep(&aio->a_task); nni_mtx_lock(&eq->eq_mtx); + NNI_ASSERT(!aio->a_stopped); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_expire_ok = false; + aio->a_count = 0; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { - aio->a_sleep = false; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_count = 0; NNI_ASSERT(aio->a_result != 0); nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (timeout) { - aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_expire_ok = false; + aio->a_count = 0; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + nni_aio_reset(aio); aio->a_expire_ok = true; aio->a_sleep = true; switch (aio->a_timeout) { |
