summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-04 09:41:12 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-04 09:41:12 -0700
commit0aa1de1316b46bb4af23fdf26759bca08008eaf5 (patch)
treeaad04dd247016e6ef31469d2f92b64e633565da9 /src/core
parent4e6a71bf2f0fdfdfe15a774a5d541aa22b8b9195 (diff)
downloadnng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.tar.gz
nng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.tar.bz2
nng-0aa1de1316b46bb4af23fdf26759bca08008eaf5.zip
fixes #324 nni_aio_set_synch leads to race condition
fixes #325 synchronous aio completion crash fixes #327 move nni_clock() operations to outside the nni_aio_lk. This work was done for the context tree, and is necessary to properly enable that branch.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c137
-rw-r--r--src/core/aio.h18
2 files changed, 79 insertions, 76 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 249d263a..9cf04217 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -63,14 +63,14 @@ struct nng_aio {
// These fields are private to the aio framework.
nni_cv a_cv;
- unsigned a_fini : 1; // shutting down (no new operations)
- unsigned a_done : 1; // operation has completed
- unsigned a_pend : 1; // completion routine pending
- unsigned a_active : 1; // aio was started
- unsigned a_expiring : 1; // expiration callback in progress
- unsigned a_waiting : 1; // a thread is waiting for this to finish
- unsigned a_synch : 1; // run completion synchronously
- unsigned a_sleep : 1; // sleeping with no action
+ bool a_fini : 1; // shutting down (no new operations)
+ bool a_done : 1; // operation has completed
+ bool a_pend : 1; // completion routine pending
+ bool a_active : 1; // aio was started
+ bool a_expiring : 1; // expiration callback in progress
+ bool a_waiting : 1; // a thread is waiting for this to finish
+ bool a_synch : 1; // run completion synchronously
+ bool a_sleep : 1; // sleeping with no action
nni_task a_task;
// Read/write operations.
@@ -193,7 +193,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_lk);
- aio->a_fini = 1;
+ aio->a_fini = true;
nni_mtx_unlock(&nni_aio_lk);
nni_aio_abort(aio, NNG_ECANCELED);
@@ -284,19 +284,13 @@ nni_aio_count(nni_aio *aio)
}
void
-nni_aio_set_synch(nni_aio *aio)
-{
- aio->a_synch = 1;
-}
-
-void
nni_aio_wait(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
// Wait until we're done, and the synchronous completion flag
// is cleared (meaning any synch completion is finished).
while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) {
- aio->a_waiting = 1;
+ aio->a_waiting = true;
nni_cv_wait(&aio->a_cv);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -306,45 +300,48 @@ nni_aio_wait(nni_aio *aio)
int
nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
+ nni_time now = nni_clock();
+
nni_mtx_lock(&nni_aio_lk);
+
if (aio->a_fini) {
// We should not reschedule anything at this point.
- aio->a_active = 0;
+ aio->a_active = false;
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- aio->a_done = 0;
- aio->a_pend = 0;
+ aio->a_done = false;
+ aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- aio->a_active = 1;
+ aio->a_active = true;
+
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
- // Convert the relative timeout to an absolute timeout.
- if (aio->a_sleep) {
- // expire node already set.
- nni_aio_expire_add(aio);
- } else {
+ if (!aio->a_sleep) {
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
aio->a_expire = NNI_TIME_ZERO;
- nni_aio_expire_add(aio);
break;
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
- aio->a_expire = nni_clock() + aio->a_timeout;
- nni_aio_expire_add(aio);
+ aio->a_expire = now + aio->a_timeout;
break;
}
}
+
+ // Convert the relative timeout to an absolute timeout.
+ if (aio->a_expire != NNI_TIME_NEVER) {
+ nni_aio_expire_add(aio);
+ }
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
@@ -369,15 +366,16 @@ nni_aio_abort(nni_aio *aio, int rv)
// I/O provider related functions.
static void
-nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg)
+nni_aio_finish_impl(
+ nni_aio *aio, int rv, size_t count, nni_msg *msg, bool synch)
{
- nni_mtx_lock(&nni_aio_lk);
+ NNI_ASSERT(!aio->a_pend); // provider only calls us *once*
- NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once*
+ nni_mtx_lock(&nni_aio_lk);
nni_list_node_remove(&aio->a_expire_node);
- aio->a_pend = 1;
+ aio->a_pend = true;
aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -386,47 +384,64 @@ nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg)
}
aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = 0;
+ aio->a_sleep = false;
// If we are expiring, then we rely on the expiration thread to
// complete this; we must not because the expiration thread is
// still holding the reference.
- if (!aio->a_expiring) {
- aio->a_done = 1;
- if (aio->a_waiting) {
- aio->a_waiting = 0;
- nni_cv_wake(&aio->a_cv);
- }
- if (!aio->a_synch) {
- nni_task_dispatch(&aio->a_task);
- } else {
+ if (aio->a_expiring) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
+ aio->a_done = true;
+ aio->a_synch = synch;
+
+ if (synch) {
+ if (aio->a_task.task_cb != NULL) {
nni_mtx_unlock(&nni_aio_lk);
aio->a_task.task_cb(aio->a_task.task_arg);
nni_mtx_lock(&nni_aio_lk);
- aio->a_synch = 0;
}
+ } else {
+ nni_task_dispatch(&aio->a_task);
+ }
+ aio->a_synch = false;
+
+ if (aio->a_waiting) {
+ aio->a_waiting = false;
+ nni_cv_wake(&aio->a_cv);
}
+
+ // This has to be done with the lock still held, in order
+ // to prevent taskq wait from returning prematurely.
nni_mtx_unlock(&nni_aio_lk);
}
void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_aio_finish_impl(aio, result, count, NULL);
+ nni_aio_finish_impl(aio, result, count, NULL, false);
+}
+
+void
+nni_aio_finish_synch(nni_aio *aio, int result, size_t count)
+{
+ nni_aio_finish_impl(aio, result, count, NULL, true);
}
void
nni_aio_finish_error(nni_aio *aio, int result)
{
- nni_aio_finish_impl(aio, result, 0, NULL);
+ nni_aio_finish_impl(aio, result, 0, NULL, false);
}
void
nni_aio_finish_msg(nni_aio *aio, nni_msg *msg)
{
NNI_ASSERT(msg != NULL);
- nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg);
+ nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg, false);
}
void
@@ -496,6 +511,8 @@ nni_aio_expire_loop(void *arg)
NNI_ARG_UNUSED(arg);
for (;;) {
+ now = nni_clock();
+
nni_mtx_lock(&nni_aio_lk);
if (nni_aio_expire_run == 0) {
@@ -509,7 +526,6 @@ nni_aio_expire_loop(void *arg)
continue;
}
- now = nni_clock();
if (now < aio->a_expire) {
// Unexpired; the list is ordered, so we just wait.
nni_cv_until(&nni_aio_expire_cv, aio->a_expire);
@@ -525,10 +541,10 @@ nni_aio_expire_loop(void *arg)
// the aio, similar to the consumers. The actual taskq
// dispatch on completion won't occur until this is cleared,
// and the done flag won't be set either.
- aio->a_expiring = 1;
+ aio->a_expiring = true;
cancelfn = aio->a_prov_cancel;
rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT;
- aio->a_sleep = 0;
+ aio->a_sleep = false;
// Cancel any outstanding activity. This is always non-NULL
// for a valid aio, and becomes NULL only when an AIO is
@@ -538,24 +554,19 @@ nni_aio_expire_loop(void *arg)
cancelfn(aio, rv);
nni_mtx_lock(&nni_aio_lk);
} else {
- aio->a_pend = 1;
+ aio->a_pend = true;
aio->a_result = rv;
}
NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_expiring = 0;
- aio->a_done = 1;
- if (!aio->a_synch) {
- nni_task_dispatch(&aio->a_task);
- } else {
- nni_mtx_unlock(&nni_aio_lk);
- aio->a_task.task_cb(aio->a_task.task_arg);
- nni_mtx_lock(&nni_aio_lk);
- aio->a_synch = 0;
- }
+ aio->a_expiring = false;
+ aio->a_done = true;
+
+ nni_task_dispatch(&aio->a_task);
+
if (aio->a_waiting) {
- aio->a_waiting = 0;
+ aio->a_waiting = false;
nni_cv_wake(&aio->a_cv);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -649,12 +660,12 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// If the timeout on the aio is shorter than our sleep time,
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
- aio->a_sleep = 0;
+ aio->a_sleep = false;
(void) nni_aio_start(aio, NULL, NULL);
return;
}
}
- aio->a_sleep = 1;
+ aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
(void) nni_aio_start(aio, NULL, NULL);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 49ef5298..a0f4934f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -79,19 +79,6 @@ extern void *nni_aio_get_output(nni_aio *, unsigned);
extern void nni_aio_set_msg(nni_aio *, nni_msg *);
extern nni_msg *nni_aio_get_msg(nni_aio *);
-// nni_aio_set_synch sets a synchronous completion flag on the AIO.
-// When this is set, the next time the AIO is completed, the callback
-// be run synchronously, from the thread calling the finish routine.
-// It is important that this only be set when the provider knows that
-// it is not holding any locks or resources when completing the operation,
-// or when the consumer knows that the callback routine does not acquire
-// any locks. Use with caution to avoid deadlocks. The flag is cleared
-// automatically when the completion callback is executed. Some care has
-// been taken so that other aio operations like aio_wait will work,
-// although it is still an error to try waiting for an aio from that aio's
-// completion callback.
-void nni_aio_set_synch(nni_aio *);
-
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
@@ -123,6 +110,11 @@ extern int nni_aio_list_active(nni_aio *);
// nni_aio_finish is called by the provider when an operation is complete.
extern void nni_aio_finish(nni_aio *, int, size_t);
+// nni_aio_finish_synch is to be called when a synchronous completion is
+// desired. It is very important that the caller not hold any locks when
+// calling this, but it is useful for chaining completions to minimize
+// context switch overhead during completions.
+extern void nni_aio_finish_synch(nni_aio *, int, size_t);
extern void nni_aio_finish_error(nni_aio *, int);
extern void nni_aio_finish_msg(nni_aio *, nni_msg *);