diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-04 09:41:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-04 09:41:12 -0700 |
| commit | 0aa1de1316b46bb4af23fdf26759bca08008eaf5 (patch) | |
| tree | aad04dd247016e6ef31469d2f92b64e633565da9 /src/core | |
| parent | 4e6a71bf2f0fdfdfe15a774a5d541aa22b8b9195 (diff) | |
| download | nng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.tar.gz nng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.tar.bz2 nng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.zip | |
fixes #324 nni_aio_set_synch leads to race condition
fixes #325 synchronous aio completion crash
fixes #327 move nni_clock() operations to outside the nni_aio_lk.
This work was done for the context tree, and is necessary to properly
enable that branch.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 137 | ||||
| -rw-r--r-- | src/core/aio.h | 18 |
2 files changed, 79 insertions, 76 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 249d263a..9cf04217 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -63,14 +63,14 @@ struct nng_aio { // These fields are private to the aio framework. nni_cv a_cv; - unsigned a_fini : 1; // shutting down (no new operations) - unsigned a_done : 1; // operation has completed - unsigned a_pend : 1; // completion routine pending - unsigned a_active : 1; // aio was started - unsigned a_expiring : 1; // expiration callback in progress - unsigned a_waiting : 1; // a thread is waiting for this to finish - unsigned a_synch : 1; // run completion synchronously - unsigned a_sleep : 1; // sleeping with no action + bool a_fini : 1; // shutting down (no new operations) + bool a_done : 1; // operation has completed + bool a_pend : 1; // completion routine pending + bool a_active : 1; // aio was started + bool a_expiring : 1; // expiration callback in progress + bool a_waiting : 1; // a thread is waiting for this to finish + bool a_synch : 1; // run completion synchronously + bool a_sleep : 1; // sleeping with no action nni_task a_task; // Read/write operations. @@ -193,7 +193,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_mtx_lock(&nni_aio_lk); - aio->a_fini = 1; + aio->a_fini = true; nni_mtx_unlock(&nni_aio_lk); nni_aio_abort(aio, NNG_ECANCELED); @@ -284,19 +284,13 @@ nni_aio_count(nni_aio *aio) } void -nni_aio_set_synch(nni_aio *aio) -{ - aio->a_synch = 1; -} - -void nni_aio_wait(nni_aio *aio) { nni_mtx_lock(&nni_aio_lk); // Wait until we're done, and the synchronous completion flag // is cleared (meaning any synch completion is finished). while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) { - aio->a_waiting = 1; + aio->a_waiting = true; nni_cv_wait(&aio->a_cv); } nni_mtx_unlock(&nni_aio_lk); @@ -306,45 +300,48 @@ nni_aio_wait(nni_aio *aio) int nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { + nni_time now = nni_clock(); + nni_mtx_lock(&nni_aio_lk); + if (aio->a_fini) { // We should not reschedule anything at this point. - aio->a_active = 0; + aio->a_active = false; aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - aio->a_done = 0; - aio->a_pend = 0; + aio->a_done = false; + aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - aio->a_active = 1; + aio->a_active = true; + for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } - // Convert the relative timeout to an absolute timeout. - if (aio->a_sleep) { - // expire node already set. - nni_aio_expire_add(aio); - } else { + if (!aio->a_sleep) { switch (aio->a_timeout) { case NNG_DURATION_ZERO: aio->a_expire = NNI_TIME_ZERO; - nni_aio_expire_add(aio); break; case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; break; default: - aio->a_expire = nni_clock() + aio->a_timeout; - nni_aio_expire_add(aio); + aio->a_expire = now + aio->a_timeout; break; } } + + // Convert the relative timeout to an absolute timeout. + if (aio->a_expire != NNI_TIME_NEVER) { + nni_aio_expire_add(aio); + } nni_mtx_unlock(&nni_aio_lk); return (0); } @@ -369,15 +366,16 @@ nni_aio_abort(nni_aio *aio, int rv) // I/O provider related functions. static void -nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) +nni_aio_finish_impl( + nni_aio *aio, int rv, size_t count, nni_msg *msg, bool synch) { - nni_mtx_lock(&nni_aio_lk); + NNI_ASSERT(!aio->a_pend); // provider only calls us *once* - NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once* + nni_mtx_lock(&nni_aio_lk); nni_list_node_remove(&aio->a_expire_node); - aio->a_pend = 1; + aio->a_pend = true; aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -386,47 +384,64 @@ nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) } aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = 0; + aio->a_sleep = false; // If we are expiring, then we rely on the expiration thread to // complete this; we must not because the expiration thread is // still holding the reference. - if (!aio->a_expiring) { - aio->a_done = 1; - if (aio->a_waiting) { - aio->a_waiting = 0; - nni_cv_wake(&aio->a_cv); - } - if (!aio->a_synch) { - nni_task_dispatch(&aio->a_task); - } else { + if (aio->a_expiring) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + + aio->a_done = true; + aio->a_synch = synch; + + if (synch) { + if (aio->a_task.task_cb != NULL) { nni_mtx_unlock(&nni_aio_lk); aio->a_task.task_cb(aio->a_task.task_arg); nni_mtx_lock(&nni_aio_lk); - aio->a_synch = 0; } + } else { + nni_task_dispatch(&aio->a_task); + } + aio->a_synch = false; + + if (aio->a_waiting) { + aio->a_waiting = false; + nni_cv_wake(&aio->a_cv); } + + // This has to be done with the lock still held, in order + // to prevent taskq wait from returning prematurely. nni_mtx_unlock(&nni_aio_lk); } void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_aio_finish_impl(aio, result, count, NULL); + nni_aio_finish_impl(aio, result, count, NULL, false); +} + +void +nni_aio_finish_synch(nni_aio *aio, int result, size_t count) +{ + nni_aio_finish_impl(aio, result, count, NULL, true); } void nni_aio_finish_error(nni_aio *aio, int result) { - nni_aio_finish_impl(aio, result, 0, NULL); + nni_aio_finish_impl(aio, result, 0, NULL, false); } void nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) { NNI_ASSERT(msg != NULL); - nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg, false); } void @@ -496,6 +511,8 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { + now = nni_clock(); + nni_mtx_lock(&nni_aio_lk); if (nni_aio_expire_run == 0) { @@ -509,7 +526,6 @@ nni_aio_expire_loop(void *arg) continue; } - now = nni_clock(); if (now < aio->a_expire) { // Unexpired; the list is ordered, so we just wait. nni_cv_until(&nni_aio_expire_cv, aio->a_expire); @@ -525,10 +541,10 @@ nni_aio_expire_loop(void *arg) // the aio, similar to the consumers. The actual taskq // dispatch on completion won't occur until this is cleared, // and the done flag won't be set either. - aio->a_expiring = 1; + aio->a_expiring = true; cancelfn = aio->a_prov_cancel; rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT; - aio->a_sleep = 0; + aio->a_sleep = false; // Cancel any outstanding activity. This is always non-NULL // for a valid aio, and becomes NULL only when an AIO is @@ -538,24 +554,19 @@ nni_aio_expire_loop(void *arg) cancelfn(aio, rv); nni_mtx_lock(&nni_aio_lk); } else { - aio->a_pend = 1; + aio->a_pend = true; aio->a_result = rv; } NNI_ASSERT(aio->a_pend); // nni_aio_finish was run NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_expiring = 0; - aio->a_done = 1; - if (!aio->a_synch) { - nni_task_dispatch(&aio->a_task); - } else { - nni_mtx_unlock(&nni_aio_lk); - aio->a_task.task_cb(aio->a_task.task_arg); - nni_mtx_lock(&nni_aio_lk); - aio->a_synch = 0; - } + aio->a_expiring = false; + aio->a_done = true; + + nni_task_dispatch(&aio->a_task); + if (aio->a_waiting) { - aio->a_waiting = 0; + aio->a_waiting = false; nni_cv_wake(&aio->a_cv); } nni_mtx_unlock(&nni_aio_lk); @@ -649,12 +660,12 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { - aio->a_sleep = 0; + aio->a_sleep = false; (void) nni_aio_start(aio, NULL, NULL); return; } } - aio->a_sleep = 1; + aio->a_sleep = true; aio->a_expire = nni_clock() + ms; (void) nni_aio_start(aio, NULL, NULL); } diff --git a/src/core/aio.h b/src/core/aio.h index 49ef5298..a0f4934f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -79,19 +79,6 @@ extern void *nni_aio_get_output(nni_aio *, unsigned); extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); -// nni_aio_set_synch sets a synchronous completion flag on the AIO. -// When this is set, the next time the AIO is completed, the callback -// be run synchronously, from the thread calling the finish routine. -// It is important that this only be set when the provider knows that -// it is not holding any locks or resources when completing the operation, -// or when the consumer knows that the callback routine does not acquire -// any locks. Use with caution to avoid deadlocks. The flag is cleared -// automatically when the completion callback is executed. Some care has -// been taken so that other aio operations like aio_wait will work, -// although it is still an error to try waiting for an aio from that aio's -// completion callback. -void nni_aio_set_synch(nni_aio *); - // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait @@ -123,6 +110,11 @@ extern int nni_aio_list_active(nni_aio *); // nni_aio_finish is called by the provider when an operation is complete. extern void nni_aio_finish(nni_aio *, int, size_t); +// nni_aio_finish_synch is to be called when a synchronous completion is +// desired. It is very important that the caller not hold any locks when +// calling this, but it is useful for chaining completions to minimize +// context switch overhead during completions. +extern void nni_aio_finish_synch(nni_aio *, int, size_t); extern void nni_aio_finish_error(nni_aio *, int); extern void nni_aio_finish_msg(nni_aio *, nni_msg *); |
