From 0aa1de1316b46bb4af23fdf26759bca08008eaf5 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 4 Apr 2018 09:41:12 -0700 Subject: fixes #324 nni_aio_set_synch leads to race condition fixes #325 synchronous aio completion crash fixes #327 move nni_clock() operations to outside the nni_aio_lk. This work was done for the context tree, and is necessary to properly enable that branch. --- src/core/aio.c | 137 ++++++++++++++++++--------------- src/core/aio.h | 18 ++--- src/supplemental/websocket/websocket.c | 6 +- src/transport/ipc/ipc.c | 6 +- src/transport/tcp/tcp.c | 6 +- src/transport/tls/tls.c | 6 +- src/transport/zerotier/zerotier.c | 1 - 7 files changed, 87 insertions(+), 93 deletions(-) diff --git a/src/core/aio.c b/src/core/aio.c index 249d263a..9cf04217 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -63,14 +63,14 @@ struct nng_aio { // These fields are private to the aio framework. nni_cv a_cv; - unsigned a_fini : 1; // shutting down (no new operations) - unsigned a_done : 1; // operation has completed - unsigned a_pend : 1; // completion routine pending - unsigned a_active : 1; // aio was started - unsigned a_expiring : 1; // expiration callback in progress - unsigned a_waiting : 1; // a thread is waiting for this to finish - unsigned a_synch : 1; // run completion synchronously - unsigned a_sleep : 1; // sleeping with no action + bool a_fini : 1; // shutting down (no new operations) + bool a_done : 1; // operation has completed + bool a_pend : 1; // completion routine pending + bool a_active : 1; // aio was started + bool a_expiring : 1; // expiration callback in progress + bool a_waiting : 1; // a thread is waiting for this to finish + bool a_synch : 1; // run completion synchronously + bool a_sleep : 1; // sleeping with no action nni_task a_task; // Read/write operations. @@ -193,7 +193,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_mtx_lock(&nni_aio_lk); - aio->a_fini = 1; + aio->a_fini = true; nni_mtx_unlock(&nni_aio_lk); nni_aio_abort(aio, NNG_ECANCELED); @@ -283,12 +283,6 @@ nni_aio_count(nni_aio *aio) return (aio->a_count); } -void -nni_aio_set_synch(nni_aio *aio) -{ - aio->a_synch = 1; -} - void nni_aio_wait(nni_aio *aio) { @@ -296,7 +290,7 @@ nni_aio_wait(nni_aio *aio) // Wait until we're done, and the synchronous completion flag // is cleared (meaning any synch completion is finished). while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) { - aio->a_waiting = 1; + aio->a_waiting = true; nni_cv_wait(&aio->a_cv); } nni_mtx_unlock(&nni_aio_lk); @@ -306,45 +300,48 @@ nni_aio_wait(nni_aio *aio) int nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { + nni_time now = nni_clock(); + nni_mtx_lock(&nni_aio_lk); + if (aio->a_fini) { // We should not reschedule anything at this point. - aio->a_active = 0; + aio->a_active = false; aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - aio->a_done = 0; - aio->a_pend = 0; + aio->a_done = false; + aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - aio->a_active = 1; + aio->a_active = true; + for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } - // Convert the relative timeout to an absolute timeout. - if (aio->a_sleep) { - // expire node already set. - nni_aio_expire_add(aio); - } else { + if (!aio->a_sleep) { switch (aio->a_timeout) { case NNG_DURATION_ZERO: aio->a_expire = NNI_TIME_ZERO; - nni_aio_expire_add(aio); break; case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; break; default: - aio->a_expire = nni_clock() + aio->a_timeout; - nni_aio_expire_add(aio); + aio->a_expire = now + aio->a_timeout; break; } } + + // Convert the relative timeout to an absolute timeout. + if (aio->a_expire != NNI_TIME_NEVER) { + nni_aio_expire_add(aio); + } nni_mtx_unlock(&nni_aio_lk); return (0); } @@ -369,15 +366,16 @@ nni_aio_abort(nni_aio *aio, int rv) // I/O provider related functions. static void -nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) +nni_aio_finish_impl( + nni_aio *aio, int rv, size_t count, nni_msg *msg, bool synch) { - nni_mtx_lock(&nni_aio_lk); + NNI_ASSERT(!aio->a_pend); // provider only calls us *once* - NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once* + nni_mtx_lock(&nni_aio_lk); nni_list_node_remove(&aio->a_expire_node); - aio->a_pend = 1; + aio->a_pend = true; aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -386,47 +384,64 @@ nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg) } aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = 0; + aio->a_sleep = false; // If we are expiring, then we rely on the expiration thread to // complete this; we must not because the expiration thread is // still holding the reference. - if (!aio->a_expiring) { - aio->a_done = 1; - if (aio->a_waiting) { - aio->a_waiting = 0; - nni_cv_wake(&aio->a_cv); - } - if (!aio->a_synch) { - nni_task_dispatch(&aio->a_task); - } else { + if (aio->a_expiring) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + + aio->a_done = true; + aio->a_synch = synch; + + if (synch) { + if (aio->a_task.task_cb != NULL) { nni_mtx_unlock(&nni_aio_lk); aio->a_task.task_cb(aio->a_task.task_arg); nni_mtx_lock(&nni_aio_lk); - aio->a_synch = 0; } + } else { + nni_task_dispatch(&aio->a_task); + } + aio->a_synch = false; + + if (aio->a_waiting) { + aio->a_waiting = false; + nni_cv_wake(&aio->a_cv); } + + // This has to be done with the lock still held, in order + // to prevent taskq wait from returning prematurely. nni_mtx_unlock(&nni_aio_lk); } void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_aio_finish_impl(aio, result, count, NULL); + nni_aio_finish_impl(aio, result, count, NULL, false); +} + +void +nni_aio_finish_synch(nni_aio *aio, int result, size_t count) +{ + nni_aio_finish_impl(aio, result, count, NULL, true); } void nni_aio_finish_error(nni_aio *aio, int result) { - nni_aio_finish_impl(aio, result, 0, NULL); + nni_aio_finish_impl(aio, result, 0, NULL, false); } void nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) { NNI_ASSERT(msg != NULL); - nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg, false); } void @@ -496,6 +511,8 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { + now = nni_clock(); + nni_mtx_lock(&nni_aio_lk); if (nni_aio_expire_run == 0) { @@ -509,7 +526,6 @@ nni_aio_expire_loop(void *arg) continue; } - now = nni_clock(); if (now < aio->a_expire) { // Unexpired; the list is ordered, so we just wait. nni_cv_until(&nni_aio_expire_cv, aio->a_expire); @@ -525,10 +541,10 @@ nni_aio_expire_loop(void *arg) // the aio, similar to the consumers. The actual taskq // dispatch on completion won't occur until this is cleared, // and the done flag won't be set either. - aio->a_expiring = 1; + aio->a_expiring = true; cancelfn = aio->a_prov_cancel; rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT; - aio->a_sleep = 0; + aio->a_sleep = false; // Cancel any outstanding activity. This is always non-NULL // for a valid aio, and becomes NULL only when an AIO is @@ -538,24 +554,19 @@ nni_aio_expire_loop(void *arg) cancelfn(aio, rv); nni_mtx_lock(&nni_aio_lk); } else { - aio->a_pend = 1; + aio->a_pend = true; aio->a_result = rv; } NNI_ASSERT(aio->a_pend); // nni_aio_finish was run NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_expiring = 0; - aio->a_done = 1; - if (!aio->a_synch) { - nni_task_dispatch(&aio->a_task); - } else { - nni_mtx_unlock(&nni_aio_lk); - aio->a_task.task_cb(aio->a_task.task_arg); - nni_mtx_lock(&nni_aio_lk); - aio->a_synch = 0; - } + aio->a_expiring = false; + aio->a_done = true; + + nni_task_dispatch(&aio->a_task); + if (aio->a_waiting) { - aio->a_waiting = 0; + aio->a_waiting = false; nni_cv_wake(&aio->a_cv); } nni_mtx_unlock(&nni_aio_lk); @@ -649,12 +660,12 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { - aio->a_sleep = 0; + aio->a_sleep = false; (void) nni_aio_start(aio, NULL, NULL); return; } } - aio->a_sleep = 1; + aio->a_sleep = true; aio->a_expire = nni_clock() + ms; (void) nni_aio_start(aio, NULL, NULL); } diff --git a/src/core/aio.h b/src/core/aio.h index 49ef5298..a0f4934f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -79,19 +79,6 @@ extern void *nni_aio_get_output(nni_aio *, unsigned); extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); -// nni_aio_set_synch sets a synchronous completion flag on the AIO. -// When this is set, the next time the AIO is completed, the callback -// be run synchronously, from the thread calling the finish routine. -// It is important that this only be set when the provider knows that -// it is not holding any locks or resources when completing the operation, -// or when the consumer knows that the callback routine does not acquire -// any locks. Use with caution to avoid deadlocks. The flag is cleared -// automatically when the completion callback is executed. Some care has -// been taken so that other aio operations like aio_wait will work, -// although it is still an error to try waiting for an aio from that aio's -// completion callback. -void nni_aio_set_synch(nni_aio *); - // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is // complete (such as when the callback is executed or after nni_aio_wait @@ -123,6 +110,11 @@ extern int nni_aio_list_active(nni_aio *); // nni_aio_finish is called by the provider when an operation is complete. extern void nni_aio_finish(nni_aio *, int, size_t); +// nni_aio_finish_synch is to be called when a synchronous completion is +// desired. It is very important that the caller not hold any locks when +// calling this, but it is useful for chaining completions to minimize +// context switch overhead during completions. +extern void nni_aio_finish_synch(nni_aio *, int, size_t); extern void nni_aio_finish_error(nni_aio *, int); extern void nni_aio_finish_msg(nni_aio *, nni_msg *); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 6932ce70..7bd698bd 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -604,8 +604,7 @@ ws_write_cb(void *arg) if (aio != NULL) { nng_msg *msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); - nni_aio_set_synch(aio); - nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); } } @@ -1022,8 +1021,7 @@ ws_read_cb(void *arg) body += frame->len; } nni_aio_set_msg(wm->aio, msg); - nni_aio_set_synch(aio); - nni_aio_finish(wm->aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(wm->aio, 0, nni_msg_len(msg)); ws_msg_fini(wm); } } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index e2bb5f99..98d3f177 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -252,8 +252,7 @@ nni_ipc_pipe_send_cb(void *arg) n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); - nni_aio_set_synch(aio); - nni_aio_finish(aio, 0, n); + nni_aio_finish_synch(aio, 0, n); } static void @@ -341,8 +340,7 @@ nni_ipc_pipe_recv_cb(void *arg) nni_mtx_unlock(&pipe->mtx); nni_aio_set_msg(aio, msg); - nni_aio_set_synch(aio); - nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); return; recv_error: diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 7f819d4f..9db5b016 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -250,8 +250,7 @@ nni_tcp_pipe_send_cb(void *arg) n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); - nni_aio_set_synch(aio); - nni_aio_finish(aio, 0, n); + nni_aio_finish_synch(aio, 0, n); } static void @@ -321,9 +320,8 @@ nni_tcp_pipe_recv_cb(void *arg) } nni_mtx_unlock(&p->mtx); - nni_aio_set_synch(aio); nni_aio_set_msg(aio, msg); - nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); return; recv_error: diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 610a7f7c..a78e8085 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -257,8 +257,7 @@ nni_tls_pipe_send_cb(void *arg) n = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); nni_msg_free(msg); - nni_aio_set_synch(aio); - nni_aio_finish(aio, 0, n); + nni_aio_finish_synch(aio, 0, n); } static void @@ -329,9 +328,8 @@ nni_tls_pipe_recv_cb(void *arg) } nni_mtx_unlock(&p->mtx); - nni_aio_set_synch(aio); nni_aio_set_msg(aio, msg); - nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); return; recv_error: diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index da22dc54..9fc0bc62 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1368,7 +1368,6 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, // This should be non-blocking/best-effort, so while // not great that we're holding the lock, also not tragic. - nni_aio_set_synch(aio); nni_plat_udp_send(udp, aio); return (0); -- cgit v1.2.3-70-g09d2