aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c263
1 files changed, 149 insertions, 114 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index b0fe160b..526fa0b4 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -11,11 +11,22 @@
#include "core/nng_impl.h"
typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_task {
+ nni_list_node task_node;
+ void * task_arg;
+ nni_cb task_cb;
+ nni_taskq * task_tq;
+ bool task_sched;
+ bool task_run;
+ bool task_done;
+ bool task_exec;
+ bool task_fini;
+ nni_mtx task_mtx;
+ nni_cv task_cv;
+};
struct nni_taskq_thr {
nni_taskq *tqt_tq;
nni_thr tqt_thread;
- nni_task * tqt_running;
- int tqt_wait;
};
struct nni_taskq {
nni_list tq_tasks;
@@ -24,8 +35,7 @@ struct nni_taskq {
nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
- int tq_run;
- int tq_waiting;
+ bool tq_run;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -40,25 +50,37 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_run = true;
+ task->task_sched = false;
+ nni_mtx_unlock(&task->task_mtx);
nni_list_remove(&tq->tq_tasks, task);
- thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
+
task->task_cb(task->task_arg);
- nni_mtx_lock(&tq->tq_mtx);
- thr->tqt_running = NULL;
- if (thr->tqt_wait || tq->tq_waiting) {
- thr->tqt_wait = 0;
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_sched || task->task_exec) {
+ // task resubmitted itself most likely.
+ // We cannot touch the rest of the flags,
+ // since the called function has taken control.
+ nni_mtx_unlock(&task->task_mtx);
+ } else {
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
+ }
+ }
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
- if (tq->tq_waiting) {
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
if (!tq->tq_run) {
break;
}
@@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
for (int i = 0; i < nthr; i++) {
int rv;
- tq->tq_threads[i].tqt_tq = tq;
- tq->tq_threads[i].tqt_running = NULL;
+ tq->tq_threads[i].tqt_tq = tq;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
@@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
}
- tq->tq_run = 1;
+ tq->tq_run = true;
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
@@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (0);
}
-static void
-nni_taskq_drain_locked(nni_taskq *tq)
-{
- // We need to first let the taskq completely drain.
- for (;;) {
- int busy = 0;
- if (!nni_list_empty(&tq->tq_tasks)) {
- busy = 1;
- } else {
- int i;
- for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running != 0) {
- busy = 1;
- break;
- }
- }
- }
- if (!busy) {
- break;
- }
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
- }
-}
-
-void
-nni_taskq_drain(nni_taskq *tq)
-{
- nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
- nni_mtx_unlock(&tq->tq_mtx);
-}
-
void
nni_taskq_fini(nni_taskq *tq)
{
- // First drain the taskq completely. This is necessary since some
- // tasks that are presently running may need to schedule additional
- // tasks, and we don't want those to block.
if (tq == NULL) {
return;
}
if (tq->tq_run) {
nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
-
- tq->tq_run = 0;
+ tq->tq_run = false;
nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task)
// If there is no callback to perform, then do nothing!
// The user will be none the wiser.
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
nni_mtx_lock(&tq->tq_mtx);
- // It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_tasks, task)) {
- nni_list_append(&tq->tq_tasks, task);
- }
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_run = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
+
+ nni_list_append(&tq->tq_tasks, task);
nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
void
-nni_task_wait(nni_task *task)
+nni_task_exec(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
-
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
- nni_mtx_lock(&tq->tq_mtx);
- for (;;) {
- bool running = false;
- if (nni_list_active(&tq->tq_tasks, task)) {
- running = true;
- } else {
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
- }
- if (!running) {
- break;
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_exec) {
+ // recursive taskq_exec, run it asynchronously
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_dispatch(task);
+ return;
+ }
+ task->task_exec = true;
+ task->task_sched = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
- tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_wait_cv);
+ task->task_cb(task->task_arg);
+
+ nni_mtx_lock(&task->task_mtx);
+ task->task_exec = false;
+ if (task->task_sched || task->task_run) {
+ // cb scheduled a task
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
}
- nni_mtx_unlock(&tq->tq_mtx);
}
-int
-nni_task_cancel(nni_task *task)
+void
+nni_task_prep(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
- bool running;
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_done = false;
+ task->task_run = false;
+ nni_mtx_unlock(&task->task_mtx);
+}
- nni_mtx_lock(&tq->tq_mtx);
- running = true;
- for (;;) {
- running = false;
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
+void
+nni_task_unprep(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (!running) {
- break;
- }
- // tq->tq_threads[i].tqt_wait = 1;
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
+void
+nni_task_wait(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ while ((task->task_sched || task->task_run || task->task_exec) &&
+ (!task->task_done)) {
+ nni_cv_wait(&task->task_cv);
}
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (nni_list_active(&tq->tq_tasks, task)) {
- nni_list_remove(&tq->tq_tasks, task);
+int
+nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
+{
+ nni_task *task;
+
+ if ((task = NNI_ALLOC_STRUCT(task)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_unlock(&tq->tq_mtx);
+ NNI_LIST_NODE_INIT(&task->task_node);
+ nni_mtx_init(&task->task_mtx);
+ nni_cv_init(&task->task_cv, &task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ task->task_sched = false;
+ task->task_exec = false;
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq != NULL ? tq : nni_taskq_systq;
+ *taskp = task;
return (0);
}
void
-nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
+nni_task_fini(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
+ NNI_ASSERT(!nni_list_node_active(&task->task_node));
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_run || task->task_exec) {
+ // destroy later.
+ task->task_fini = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
}
- NNI_LIST_NODE_INIT(&task->task_node);
- task->task_cb = cb;
- task->task_arg = arg;
- task->task_tq = tq;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_cv_fini(&task->task_cv);
+ nni_mtx_fini(&task->task_mtx);
+ NNI_FREE_STRUCT(task);
}
int