diff options
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 157 |
1 files changed, 55 insertions, 102 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index d945e713..15351840 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -16,10 +16,8 @@ struct nni_task { void * task_arg; nni_cb task_cb; nni_taskq * task_tq; - bool task_sched; - bool task_run; - bool task_done; - bool task_exec; + unsigned task_busy; + bool task_prep; bool task_fini; nni_mtx task_mtx; nni_cv task_cv; @@ -50,33 +48,26 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_run = true; - task->task_sched = false; - nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); - if (task->task_sched || task->task_exec) { - // task resubmitted itself most likely. - // We cannot touch the rest of the flags, - // since the called function has taken control. - nni_mtx_unlock(&task->task_mtx); - } else { - task->task_done = true; - nni_cv_wake(&task->task_cv); - + task->task_busy--; + if (task->task_busy == 0) { if (task->task_fini) { task->task_fini = false; nni_mtx_unlock(&task->task_mtx); nni_task_fini(task); - } else { - nni_mtx_unlock(&task->task_mtx); + + nni_mtx_lock(&tq->tq_mtx); + continue; } + nni_cv_wake(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); + nni_mtx_lock(&tq->tq_mtx); continue; } @@ -150,99 +141,65 @@ nni_taskq_fini(nni_taskq *tq) } void -nni_task_dispatch(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; + nni_mtx_lock(&task->task_mtx); + if (task->task_prep) { + task->task_prep = false; + } else { + task->task_busy++; + } + nni_mtx_unlock(&task->task_mtx); - // If there is no callback to perform, then do nothing! - // The user will be none the wiser. - if (task->task_cb == NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_run = false; - task->task_exec = false; - task->task_done = true; - nni_cv_wake(&task->task_cv); - nni_mtx_unlock(&task->task_mtx); - return; + if (task->task_cb != NULL) { + task->task_cb(task->task_arg); } - nni_mtx_lock(&tq->tq_mtx); + nni_mtx_lock(&task->task_mtx); - task->task_sched = true; - task->task_run = false; - task->task_exec = false; - task->task_done = false; + task->task_busy--; + if (task->task_busy == 0) { + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + return; + } + nni_cv_wake(&task->task_cv); + } nni_mtx_unlock(&task->task_mtx); - - nni_list_append(&tq->tq_tasks, task); - nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate - nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_exec(nni_task *task) +nni_task_dispatch(nni_task *task) { + nni_taskq *tq = task->task_tq; + + // If there is no callback to perform, then do nothing! + // The user will be none the wiser. if (task->task_cb == NULL) { - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_run = false; - task->task_exec = false; - task->task_done = true; - nni_cv_wake(&task->task_cv); - nni_mtx_unlock(&task->task_mtx); + nni_task_exec(task); return; } nni_mtx_lock(&task->task_mtx); - if (task->task_exec) { - // recursive taskq_exec, run it asynchronously - nni_mtx_unlock(&task->task_mtx); - nni_task_dispatch(task); - return; + if (task->task_prep) { + task->task_prep = false; + } else { + task->task_busy++; } - task->task_exec = true; - task->task_sched = false; - task->task_done = false; - task->task_run = false; nni_mtx_unlock(&task->task_mtx); - task->task_cb(task->task_arg); - - nni_mtx_lock(&task->task_mtx); - task->task_exec = false; - if (task->task_sched || task->task_run) { - // cb scheduled a task - nni_mtx_unlock(&task->task_mtx); - return; - } - task->task_done = true; - nni_cv_wake(&task->task_cv); - if (task->task_fini) { - task->task_fini = false; - nni_mtx_unlock(&task->task_mtx); - nni_task_fini(task); - } else { - nni_mtx_unlock(&task->task_mtx); - } + nni_mtx_lock(&tq->tq_mtx); + nni_list_append(&tq->tq_tasks, task); + nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate + nni_mtx_unlock(&tq->tq_mtx); } void nni_task_prep(nni_task *task) { nni_mtx_lock(&task->task_mtx); - task->task_sched = true; - task->task_done = false; - task->task_run = false; - nni_mtx_unlock(&task->task_mtx); -} - -void -nni_task_unprep(nni_task *task) -{ - nni_mtx_lock(&task->task_mtx); - task->task_sched = false; - task->task_done = false; - task->task_run = false; - nni_cv_wake(&task->task_cv); + task->task_busy++; + task->task_prep = true; nni_mtx_unlock(&task->task_mtx); } @@ -250,8 +207,7 @@ void nni_task_wait(nni_task *task) { nni_mtx_lock(&task->task_mtx); - while ((task->task_sched || task->task_run || task->task_exec) && - (!task->task_done)) { + while (task->task_busy) { nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); @@ -268,15 +224,12 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) NNI_LIST_NODE_INIT(&task->task_node); nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); - task->task_sched = false; - task->task_done = false; - task->task_run = false; - task->task_sched = false; - task->task_exec = false; - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq != NULL ? tq : nni_taskq_systq; - *taskp = task; + task->task_prep = false; + task->task_busy = 0; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } @@ -285,7 +238,7 @@ nni_task_fini(nni_task *task) { NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); - if ((task->task_run || task->task_exec) && (!task->task_done)) { + if (task->task_busy) { // destroy later. task->task_fini = true; nni_mtx_unlock(&task->task_mtx); |
