diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-16 14:12:29 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-16 21:08:55 -0700 |
| commit | eab6a4d2d96d11d3926e927c135362fc166895f0 (patch) | |
| tree | b4f8097c339856a1bd12db6d1975fe1b7b951b4b | |
| parent | ec84c5a8406bb203d5f8830e280f93cf1f63cd6a (diff) | |
| download | nng-eab6a4d2d96d11d3926e927c135362fc166895f0.tar.gz nng-eab6a4d2d96d11d3926e927c135362fc166895f0.tar.bz2 nng-eab6a4d2d96d11d3926e927c135362fc166895f0.zip | |
fixes #445 crash in taskq_thread
This changes the array of flags, which was confusing, brittle, and
racy, into a much simpler reference (busy) count on the task structures.
This allows us to support certain kinds of "reentrant" dispatching,
where either a synchronous or asynchronous task can reschedule / dispatch
itself. The new code also helps reduce certain lock pressure, as a bonus.
| -rw-r--r-- | src/core/aio.c | 1 | ||||
| -rw-r--r-- | src/core/taskq.c | 157 | ||||
| -rw-r--r-- | src/core/taskq.h | 7 |
3 files changed, 58 insertions, 107 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 6276589b..99da356a 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -315,7 +315,6 @@ nni_aio_begin(nni_aio *aio) nni_mtx_lock(&nni_aio_lk); // We should not reschedule anything at this point. if (aio->a_stop) { - nni_task_unprep(aio->a_task); aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); diff --git a/src/core/taskq.c b/src/core/taskq.c index d945e713..15351840 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -16,10 +16,8 @@ struct nni_task { void * task_arg; nni_cb task_cb; nni_taskq * task_tq; - bool task_sched; - bool task_run; - bool task_done; - bool task_exec; + unsigned task_busy; + bool task_prep; bool task_fini; nni_mtx task_mtx; nni_cv task_cv; @@ -50,33 +48,26 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_run = true; - task->task_sched = false; - nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); - if (task->task_sched || task->task_exec) { - // task resubmitted itself most likely. - // We cannot touch the rest of the flags, - // since the called function has taken control. - nni_mtx_unlock(&task->task_mtx); - } else { - task->task_done = true; - nni_cv_wake(&task->task_cv); - + task->task_busy--; + if (task->task_busy == 0) { if (task->task_fini) { task->task_fini = false; nni_mtx_unlock(&task->task_mtx); nni_task_fini(task); - } else { - nni_mtx_unlock(&task->task_mtx); + + nni_mtx_lock(&tq->tq_mtx); + continue; } + nni_cv_wake(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); + nni_mtx_lock(&tq->tq_mtx); continue; } @@ -150,99 +141,65 @@ nni_taskq_fini(nni_taskq *tq) } void -nni_task_dispatch(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; + nni_mtx_lock(&task->task_mtx); + if (task->task_prep) { + task->task_prep = false; + } else { + task->task_busy++; + } + nni_mtx_unlock(&task->task_mtx); - // If there is no callback to perform, then do nothing! - // The user will be none the wiser. - if (task->task_cb == NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_run = false; - task->task_exec = false; - task->task_done = true; - nni_cv_wake(&task->task_cv); - nni_mtx_unlock(&task->task_mtx); - return; + if (task->task_cb != NULL) { + task->task_cb(task->task_arg); } - nni_mtx_lock(&tq->tq_mtx); + nni_mtx_lock(&task->task_mtx); - task->task_sched = true; - task->task_run = false; - task->task_exec = false; - task->task_done = false; + task->task_busy--; + if (task->task_busy == 0) { + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + return; + } + nni_cv_wake(&task->task_cv); + } nni_mtx_unlock(&task->task_mtx); - - nni_list_append(&tq->tq_tasks, task); - nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate - nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_exec(nni_task *task) +nni_task_dispatch(nni_task *task) { + nni_taskq *tq = task->task_tq; + + // If there is no callback to perform, then do nothing! + // The user will be none the wiser. if (task->task_cb == NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_run = false; - task->task_exec = false; - task->task_done = true; - nni_cv_wake(&task->task_cv); - nni_mtx_unlock(&task->task_mtx); + nni_task_exec(task); return; } nni_mtx_lock(&task->task_mtx); - if (task->task_exec) { - // recursive taskq_exec, run it asynchronously - nni_mtx_unlock(&task->task_mtx); - nni_task_dispatch(task); - return; + if (task->task_prep) { + task->task_prep = false; + } else { + task->task_busy++; } - task->task_exec = true; - task->task_sched = false; - task->task_done = false; - task->task_run = false; nni_mtx_unlock(&task->task_mtx); - task->task_cb(task->task_arg); - - nni_mtx_lock(&task->task_mtx); - task->task_exec = false; - if (task->task_sched || task->task_run) { - // cb scheduled a task - nni_mtx_unlock(&task->task_mtx); - return; - } - task->task_done = true; - nni_cv_wake(&task->task_cv); - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - } else { - nni_mtx_unlock(&task->task_mtx); - } + nni_mtx_lock(&tq->tq_mtx); + nni_list_append(&tq->tq_tasks, task); + nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate + nni_mtx_unlock(&tq->tq_mtx); } void nni_task_prep(nni_task *task) { nni_mtx_lock(&task->task_mtx); - task->task_sched = true; - task->task_done = false; - task->task_run = false; - nni_mtx_unlock(&task->task_mtx); -} - -void -nni_task_unprep(nni_task *task) -{ - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_done = false; - task->task_run = false; - nni_cv_wake(&task->task_cv); + task->task_busy++; + task->task_prep = true; nni_mtx_unlock(&task->task_mtx); } @@ -250,8 +207,7 @@ void nni_task_wait(nni_task *task) { nni_mtx_lock(&task->task_mtx); - while ((task->task_sched || task->task_run || task->task_exec) && - (!task->task_done)) { + while (task->task_busy) { nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -268,15 +224,12 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) NNI_LIST_NODE_INIT(&task->task_node); nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); - task->task_sched = false; - task->task_done = false; - task->task_run = false; - task->task_sched = false; - task->task_exec = false; - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq != NULL ? tq : nni_taskq_systq; - *taskp = task; + task->task_prep = false; + task->task_busy = 0; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } @@ -285,7 +238,7 @@ nni_task_fini(nni_task *task) { NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); - if ((task->task_run || task->task_exec) && (!task->task_done)) { + if (task->task_busy) { // destroy later. task->task_fini = true; nni_mtx_unlock(&task->task_mtx); diff --git a/src/core/taskq.h b/src/core/taskq.h index 918c6705..8328e4eb 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -31,13 +31,12 @@ extern void nni_task_dispatch(nni_task *); // doubt, use nni_task_dispatch instead.) extern void nni_task_exec(nni_task *); -// nni_task_prep and nni_task_unprep are used by and exclusively for the aio -// framework. nni_task_prep marks the task as "scheduled" without actually +// nni_task_prep is used by and exclusively for the aio framework. +// nni_task_prep marks the task as "scheduled" without actually // dispatching anything to it yet; nni_task_wait will block waiting for the // task to complete normally (after a call to nni_task_dispatch or -// nni_task_exec), or for nni_task_unprep to be called. +// nni_task_exec). extern void nni_task_prep(nni_task *); -extern void nni_task_unprep(nni_task *); extern void nni_task_wait(nni_task *); extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *); |
