diff options
| -rw-r--r-- | src/core/aio.c | 53 | ||||
| -rw-r--r-- | src/core/aio.h | 6 | ||||
| -rw-r--r-- | src/core/stream.c | 2 |
3 files changed, 49 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index f039cdc8..87c79d0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); @@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio) return (0); } +void +nni_aio_reset(nni_aio *aio) +{ + aio->a_result = 0; + aio->a_count = 0; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; + + for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { + aio->a_outputs[i] = NULL; + } +} + int nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { @@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; + aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); } @@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { timeout = true; } + if (!aio->a_sleep) { + aio->a_expire_ok = false; + } + aio->a_result = 0; // Do this outside the lock. Note that we don't strictly need to have // done this for the failure cases below (the task framework does the @@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_task_prep(&aio->a_task); nni_mtx_lock(&eq->eq_mtx); + NNI_ASSERT(!aio->a_stopped); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_expire_ok = false; + aio->a_count = 0; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { - aio->a_sleep = false; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_count = 0; NNI_ASSERT(aio->a_result != 0); nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (timeout) { - aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_expire_ok = false; + aio->a_count = 0; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + nni_aio_reset(aio); aio->a_expire_ok = true; aio->a_sleep = true; switch (aio->a_timeout) { diff --git a/src/core/aio.h b/src/core/aio.h index b09b6e49..d909853f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -175,6 +175,11 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); // or was canceled before this call (but after nni_aio_begin). extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *); +// nni_aio_reset is called by providers before doing any work -- it resets +// counts other fields to their initial state. It will not reset the closed +// state if the aio has been stopped or closed. +extern void nni_aio_reset(nni_aio *); + // nni_aio_start should be called before any asynchronous operation // is filed. It need not be called for completions that are synchronous // at job submission. @@ -230,6 +235,7 @@ struct nng_aio { bool a_use_expire; // Use expire instead of timeout bool a_abort; // Task was aborted bool a_init; // Initialized this + bool a_stopped; // Debug - set when we finish stopped nni_task a_task; // Read/write operations. diff --git a/src/core/stream.c b/src/core/stream.c index 16e11eca..2c7a9ff3 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -140,12 +140,14 @@ nng_stream_free(nng_stream *s) void nng_stream_send(nng_stream *s, nng_aio *aio) { + nni_aio_reset(aio); s->s_send(s, aio); } void nng_stream_recv(nng_stream *s, nng_aio *aio) { + nni_aio_reset(aio); s->s_recv(s, aio); } |
