aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c66
1 files changed, 28 insertions, 38 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e32983e9..8d42701e 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -20,7 +20,8 @@ struct nni_taskq_thr {
struct nni_taskq {
nni_list tq_tasks;
nni_mtx tq_mtx;
- nni_cv tq_cv;
+ nni_cv tq_sched_cv;
+ nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_run;
@@ -45,13 +46,10 @@ nni_taskq_thread(void *self)
task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
- if (thr->tqt_wait) {
- thr->tqt_wait = 0;
- nni_cv_wake(&tq->tq_cv);
- }
- if (tq->tq_waiting) {
+ if (thr->tqt_wait || tq->tq_waiting) {
+ thr->tqt_wait = 0;
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
continue;
@@ -59,12 +57,12 @@ nni_taskq_thread(void *self)
if (tq->tq_waiting) {
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
if (!tq->tq_run) {
break;
}
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_sched_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
- NNI_FREE_STRUCT(tq);
- return (rv);
- }
- if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
- nni_mtx_fini(&tq->tq_mtx);
+ if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) ==
+ NULL) {
NNI_FREE_STRUCT(tq);
- return (rv);
+ return (NNG_ENOMEM);
}
+ tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
- if (tq->tq_threads == NULL) {
- nni_cv_fini(&tq->tq_cv);
- nni_mtx_fini(&tq->tq_mtx);
- NNI_FREE_STRUCT(tq);
- return (NNG_ENOMEM);
+ if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
+ nni_taskq_fini(tq);
+ return (rv);
}
- tq->tq_nthreads = nthr;
+
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
tq->tq_threads[i].tqt_running = NULL;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
- goto fail;
+ nni_taskq_fini(tq);
+ return (rv);
}
}
- tq->tq_nthreads = nthr;
- tq->tq_run = 1;
+ tq->tq_run = 1;
for (i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
-
-fail:
-
- nni_taskq_fini(tq);
- return (rv);
}
static void
@@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq)
break;
}
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
}
@@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq)
nni_taskq_drain_locked(tq);
tq->tq_run = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr));
- nni_cv_fini(&tq->tq_cv);
+ nni_cv_fini(&tq->tq_wait_cv);
+ nni_cv_fini(&tq->tq_sched_cv);
nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads);
NNI_FREE_STRUCT(tq);
}
@@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task)
if (!nni_list_active(&tq->tq_tasks, task)) {
nni_list_append(&tq->tq_tasks, task);
}
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -221,7 +211,7 @@ nni_task_wait(nni_task *task)
}
tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task)
}
// tq->tq_threads[i].tqt_wait = 1;
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
if (nni_list_active(&tq->tq_tasks, task)) {