diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-26 12:13:49 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-26 12:53:46 -0800 |
| commit | b5826dac78e75520c26f2d84a952fe04d69fa2d3 (patch) | |
| tree | 707d73b1e52c638c9a4843e3480b3190131b111c /src/core | |
| parent | 9cece0bdd4c044f029997b85b73d0b49f80ad1e6 (diff) | |
| download | nng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.tar.gz nng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.tar.bz2 nng-b5826dac78e75520c26f2d84a952fe04d69fa2d3.zip | |
aio: introduce nni_aio_reset to reset the aio before submitting more work
This allows some use cases to reset things like the counts and outputs, before
submitting more jobs. Providers should call this near the top of their
functions; this is done without any locks so it should be very fast.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 53 | ||||
| -rw-r--r-- | src/core/aio.h | 6 | ||||
| -rw-r--r-- | src/core/stream.c | 2 |
3 files changed, 49 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index f039cdc8..87c79d0d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio) aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); @@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio) return (0); } +void +nni_aio_reset(nni_aio *aio) +{ + aio->a_result = 0; + aio->a_count = 0; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; + + for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { + aio->a_outputs[i] = NULL; + } +} + int nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { @@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; + aio->a_stop = true; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ESTOPPED); } @@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { timeout = true; } + if (!aio->a_sleep) { + aio->a_expire_ok = false; + } + aio->a_result = 0; // Do this outside the lock. Note that we don't strictly need to have // done this for the failure cases below (the task framework does the @@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) nni_task_prep(&aio->a_task); nni_mtx_lock(&eq->eq_mtx); + NNI_ASSERT(!aio->a_stopped); if (aio->a_stop || eq->eq_stop) { - aio->a_stop = true; - aio->a_sleep = false; - aio->a_result = NNG_ESTOPPED; + aio->a_stop = true; + aio->a_sleep = false; + aio->a_expire_ok = false; + aio->a_count = 0; + aio->a_result = NNG_ESTOPPED; + aio->a_stopped = true; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (aio->a_abort) { - aio->a_sleep = false; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_count = 0; NNI_ASSERT(aio->a_result != 0); nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); } if (timeout) { - aio->a_sleep = false; - aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; - aio->a_abort = false; + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + aio->a_expire_ok = false; + aio->a_count = 0; nni_mtx_unlock(&eq->eq_mtx); nni_task_dispatch(&aio->a_task); return (false); @@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + nni_aio_reset(aio); aio->a_expire_ok = true; aio->a_sleep = true; switch (aio->a_timeout) { diff --git a/src/core/aio.h b/src/core/aio.h index b09b6e49..d909853f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -175,6 +175,11 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); // or was canceled before this call (but after nni_aio_begin). extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *); +// nni_aio_reset is called by providers before doing any work -- it resets +// counts other fields to their initial state. It will not reset the closed +// state if the aio has been stopped or closed. +extern void nni_aio_reset(nni_aio *); + // nni_aio_start should be called before any asynchronous operation // is filed. It need not be called for completions that are synchronous // at job submission. @@ -230,6 +235,7 @@ struct nng_aio { bool a_use_expire; // Use expire instead of timeout bool a_abort; // Task was aborted bool a_init; // Initialized this + bool a_stopped; // Debug - set when we finish stopped nni_task a_task; // Read/write operations. diff --git a/src/core/stream.c b/src/core/stream.c index 16e11eca..2c7a9ff3 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -140,12 +140,14 @@ nng_stream_free(nng_stream *s) void nng_stream_send(nng_stream *s, nng_aio *aio) { + nni_aio_reset(aio); s->s_send(s, aio); } void nng_stream_recv(nng_stream *s, nng_aio *aio) { + nni_aio_reset(aio); s->s_recv(s, aio); } |
