aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c1
-rw-r--r--src/core/taskq.c157
-rw-r--r--src/core/taskq.h7
3 files changed, 58 insertions, 107 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 6276589b..99da356a 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -315,7 +315,6 @@ nni_aio_begin(nni_aio *aio)
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
if (aio->a_stop) {
- nni_task_unprep(aio->a_task);
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index d945e713..15351840 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -16,10 +16,8 @@ struct nni_task {
void * task_arg;
nni_cb task_cb;
nni_taskq * task_tq;
- bool task_sched;
- bool task_run;
- bool task_done;
- bool task_exec;
+ unsigned task_busy;
+ bool task_prep;
bool task_fini;
nni_mtx task_mtx;
nni_cv task_cv;
@@ -50,33 +48,26 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
- nni_mtx_lock(&task->task_mtx);
- task->task_run = true;
- task->task_sched = false;
- nni_mtx_unlock(&task->task_mtx);
nni_list_remove(&tq->tq_tasks, task);
nni_mtx_unlock(&tq->tq_mtx);
task->task_cb(task->task_arg);
nni_mtx_lock(&task->task_mtx);
- if (task->task_sched || task->task_exec) {
- // task resubmitted itself most likely.
- // We cannot touch the rest of the flags,
- // since the called function has taken control.
- nni_mtx_unlock(&task->task_mtx);
- } else {
- task->task_done = true;
- nni_cv_wake(&task->task_cv);
-
+ task->task_busy--;
+ if (task->task_busy == 0) {
if (task->task_fini) {
task->task_fini = false;
nni_mtx_unlock(&task->task_mtx);
nni_task_fini(task);
- } else {
- nni_mtx_unlock(&task->task_mtx);
+
+ nni_mtx_lock(&tq->tq_mtx);
+ continue;
}
+ nni_cv_wake(&task->task_cv);
}
+ nni_mtx_unlock(&task->task_mtx);
+
nni_mtx_lock(&tq->tq_mtx);
continue;
}
@@ -150,99 +141,65 @@ nni_taskq_fini(nni_taskq *tq)
}
void
-nni_task_dispatch(nni_task *task)
+nni_task_exec(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_prep) {
+ task->task_prep = false;
+ } else {
+ task->task_busy++;
+ }
+ nni_mtx_unlock(&task->task_mtx);
- // If there is no callback to perform, then do nothing!
- // The user will be none the wiser.
- if (task->task_cb == NULL) {
- nni_mtx_lock(&task->task_mtx);
- task->task_sched = false;
- task->task_run = false;
- task->task_exec = false;
- task->task_done = true;
- nni_cv_wake(&task->task_cv);
- nni_mtx_unlock(&task->task_mtx);
- return;
+ if (task->task_cb != NULL) {
+ task->task_cb(task->task_arg);
}
- nni_mtx_lock(&tq->tq_mtx);
+
nni_mtx_lock(&task->task_mtx);
- task->task_sched = true;
- task->task_run = false;
- task->task_exec = false;
- task->task_done = false;
+ task->task_busy--;
+ if (task->task_busy == 0) {
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ return;
+ }
+ nni_cv_wake(&task->task_cv);
+ }
nni_mtx_unlock(&task->task_mtx);
-
- nni_list_append(&tq->tq_tasks, task);
- nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
- nni_mtx_unlock(&tq->tq_mtx);
}
void
-nni_task_exec(nni_task *task)
+nni_task_dispatch(nni_task *task)
{
+ nni_taskq *tq = task->task_tq;
+
+ // If there is no callback to perform, then do nothing!
+ // The user will be none the wiser.
if (task->task_cb == NULL) {
- nni_mtx_lock(&task->task_mtx);
- task->task_sched = false;
- task->task_run = false;
- task->task_exec = false;
- task->task_done = true;
- nni_cv_wake(&task->task_cv);
- nni_mtx_unlock(&task->task_mtx);
+ nni_task_exec(task);
return;
}
nni_mtx_lock(&task->task_mtx);
- if (task->task_exec) {
- // recursive taskq_exec, run it asynchronously
- nni_mtx_unlock(&task->task_mtx);
- nni_task_dispatch(task);
- return;
+ if (task->task_prep) {
+ task->task_prep = false;
+ } else {
+ task->task_busy++;
}
- task->task_exec = true;
- task->task_sched = false;
- task->task_done = false;
- task->task_run = false;
nni_mtx_unlock(&task->task_mtx);
- task->task_cb(task->task_arg);
-
- nni_mtx_lock(&task->task_mtx);
- task->task_exec = false;
- if (task->task_sched || task->task_run) {
- // cb scheduled a task
- nni_mtx_unlock(&task->task_mtx);
- return;
- }
- task->task_done = true;
- nni_cv_wake(&task->task_cv);
- if (task->task_fini) {
- task->task_fini = false;
- nni_mtx_unlock(&task->task_mtx);
- nni_task_fini(task);
- } else {
- nni_mtx_unlock(&task->task_mtx);
- }
+ nni_mtx_lock(&tq->tq_mtx);
+ nni_list_append(&tq->tq_tasks, task);
+ nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
+ nni_mtx_unlock(&tq->tq_mtx);
}
void
nni_task_prep(nni_task *task)
{
nni_mtx_lock(&task->task_mtx);
- task->task_sched = true;
- task->task_done = false;
- task->task_run = false;
- nni_mtx_unlock(&task->task_mtx);
-}
-
-void
-nni_task_unprep(nni_task *task)
-{
- nni_mtx_lock(&task->task_mtx);
- task->task_sched = false;
- task->task_done = false;
- task->task_run = false;
- nni_cv_wake(&task->task_cv);
+ task->task_busy++;
+ task->task_prep = true;
nni_mtx_unlock(&task->task_mtx);
}
@@ -250,8 +207,7 @@ void
nni_task_wait(nni_task *task)
{
nni_mtx_lock(&task->task_mtx);
- while ((task->task_sched || task->task_run || task->task_exec) &&
- (!task->task_done)) {
+ while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
@@ -268,15 +224,12 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
NNI_LIST_NODE_INIT(&task->task_node);
nni_mtx_init(&task->task_mtx);
nni_cv_init(&task->task_cv, &task->task_mtx);
- task->task_sched = false;
- task->task_done = false;
- task->task_run = false;
- task->task_sched = false;
- task->task_exec = false;
- task->task_cb = cb;
- task->task_arg = arg;
- task->task_tq = tq != NULL ? tq : nni_taskq_systq;
- *taskp = task;
+ task->task_prep = false;
+ task->task_busy = 0;
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq != NULL ? tq : nni_taskq_systq;
+ *taskp = task;
return (0);
}
@@ -285,7 +238,7 @@ nni_task_fini(nni_task *task)
{
NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
- if ((task->task_run || task->task_exec) && (!task->task_done)) {
+ if (task->task_busy) {
// destroy later.
task->task_fini = true;
nni_mtx_unlock(&task->task_mtx);
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 918c6705..8328e4eb 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -31,13 +31,12 @@ extern void nni_task_dispatch(nni_task *);
// doubt, use nni_task_dispatch instead.)
extern void nni_task_exec(nni_task *);
-// nni_task_prep and nni_task_unprep are used by and exclusively for the aio
-// framework. nni_task_prep marks the task as "scheduled" without actually
+// nni_task_prep is used by and exclusively for the aio framework.
+// nni_task_prep marks the task as "scheduled" without actually
// dispatching anything to it yet; nni_task_wait will block waiting for the
// task to complete normally (after a call to nni_task_dispatch or
-// nni_task_exec), or for nni_task_unprep to be called.
+// nni_task_exec).
extern void nni_task_prep(nni_task *);
-extern void nni_task_unprep(nni_task *);
extern void nni_task_wait(nni_task *);
extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *);