diff options
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 263 |
1 files changed, 149 insertions, 114 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index b0fe160b..526fa0b4 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -11,11 +11,22 @@ #include "core/nng_impl.h" typedef struct nni_taskq_thr nni_taskq_thr; +struct nni_task { + nni_list_node task_node; + void * task_arg; + nni_cb task_cb; + nni_taskq * task_tq; + bool task_sched; + bool task_run; + bool task_done; + bool task_exec; + bool task_fini; + nni_mtx task_mtx; + nni_cv task_cv; +}; struct nni_taskq_thr { nni_taskq *tqt_tq; nni_thr tqt_thread; - nni_task * tqt_running; - int tqt_wait; }; struct nni_taskq { nni_list tq_tasks; @@ -24,8 +35,7 @@ struct nni_taskq { nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; - int tq_run; - int tq_waiting; + bool tq_run; }; static nni_taskq *nni_taskq_systq = NULL; @@ -40,25 +50,37 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_run = true; + task->task_sched = false; + nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); - thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); + task->task_cb(task->task_arg); - nni_mtx_lock(&tq->tq_mtx); - thr->tqt_running = NULL; - if (thr->tqt_wait || tq->tq_waiting) { - thr->tqt_wait = 0; - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } + nni_mtx_lock(&task->task_mtx); + if (task->task_sched || task->task_exec) { + // task resubmitted itself most likely. + // We cannot touch the rest of the flags, + // since the called function has taken control. + nni_mtx_unlock(&task->task_mtx); + } else { + task->task_done = true; + nni_cv_wake(&task->task_cv); + + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); + } + } + nni_mtx_lock(&tq->tq_mtx); continue; } - if (tq->tq_waiting) { - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } if (!tq->tq_run) { break; } @@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) for (int i = 0; i < nthr; i++) { int rv; - tq->tq_threads[i].tqt_tq = tq; - tq->tq_threads[i].tqt_running = NULL; + tq->tq_threads[i].tqt_tq = tq; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { @@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } } - tq->tq_run = 1; + tq->tq_run = true; for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } @@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (0); } -static void -nni_taskq_drain_locked(nni_taskq *tq) -{ - // We need to first let the taskq completely drain. - for (;;) { - int busy = 0; - if (!nni_list_empty(&tq->tq_tasks)) { - busy = 1; - } else { - int i; - for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running != 0) { - busy = 1; - break; - } - } - } - if (!busy) { - break; - } - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); - } -} - -void -nni_taskq_drain(nni_taskq *tq) -{ - nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - nni_mtx_unlock(&tq->tq_mtx); -} - void nni_taskq_fini(nni_taskq *tq) { - // First drain the taskq completely. This is necessary since some - // tasks that are presently running may need to schedule additional - // tasks, and we don't want those to block. if (tq == NULL) { return; } if (tq->tq_run) { nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - - tq->tq_run = 0; + tq->tq_run = false; nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } @@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task) // If there is no callback to perform, then do nothing! // The user will be none the wiser. if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } nni_mtx_lock(&tq->tq_mtx); - // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_tasks, task)) { - nni_list_append(&tq->tq_tasks, task); - } + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_run = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); + + nni_list_append(&tq->tq_tasks, task); nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_wait(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; - if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } - nni_mtx_lock(&tq->tq_mtx); - for (;;) { - bool running = false; - if (nni_list_active(&tq->tq_tasks, task)) { - running = true; - } else { - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } - } - if (!running) { - break; - } + nni_mtx_lock(&task->task_mtx); + if (task->task_exec) { + // recursive taskq_exec, run it asynchronously + nni_mtx_unlock(&task->task_mtx); + nni_task_dispatch(task); + return; + } + task->task_exec = true; + task->task_sched = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); - tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_wait_cv); + task->task_cb(task->task_arg); + + nni_mtx_lock(&task->task_mtx); + task->task_exec = false; + if (task->task_sched || task->task_run) { + // cb scheduled a task + nni_mtx_unlock(&task->task_mtx); + return; + } + task->task_done = true; + nni_cv_wake(&task->task_cv); + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); } - nni_mtx_unlock(&tq->tq_mtx); } -int -nni_task_cancel(nni_task *task) +void +nni_task_prep(nni_task *task) { - nni_taskq *tq = task->task_tq; - bool running; + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_done = false; + task->task_run = false; + nni_mtx_unlock(&task->task_mtx); +} - nni_mtx_lock(&tq->tq_mtx); - running = true; - for (;;) { - running = false; - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } +void +nni_task_unprep(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); +} - if (!running) { - break; - } - // tq->tq_threads[i].tqt_wait = 1; - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); +void +nni_task_wait(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + while ((task->task_sched || task->task_run || task->task_exec) && + (!task->task_done)) { + nni_cv_wait(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); +} - if (nni_list_active(&tq->tq_tasks, task)) { - nni_list_remove(&tq->tq_tasks, task); +int +nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) +{ + nni_task *task; + + if ((task = NNI_ALLOC_STRUCT(task)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_unlock(&tq->tq_mtx); + NNI_LIST_NODE_INIT(&task->task_node); + nni_mtx_init(&task->task_mtx); + nni_cv_init(&task->task_cv, &task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + task->task_sched = false; + task->task_exec = false; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } void -nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) +nni_task_fini(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; + NNI_ASSERT(!nni_list_node_active(&task->task_node)); + nni_mtx_lock(&task->task_mtx); + if (task->task_run || task->task_exec) { + // destroy later. + task->task_fini = true; + nni_mtx_unlock(&task->task_mtx); + return; } - NNI_LIST_NODE_INIT(&task->task_node); - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq; + nni_mtx_unlock(&task->task_mtx); + nni_cv_fini(&task->task_cv); + nni_mtx_fini(&task->task_mtx); + NNI_FREE_STRUCT(task); } int |
