aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c28
1 files changed, 13 insertions, 15 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 14b04085..e32983e9 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -23,7 +23,7 @@ struct nni_taskq {
nni_cv tq_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
- int tq_close;
+ int tq_run;
int tq_waiting;
};
@@ -61,7 +61,7 @@ nni_taskq_thread(void *self)
tq->tq_waiting = 0;
nni_cv_wake(&tq->tq_cv);
}
- if (tq->tq_close) {
+ if (!tq->tq_run) {
break;
}
nni_cv_wait(&tq->tq_cv);
@@ -88,7 +88,6 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
NNI_FREE_STRUCT(tq);
return (rv);
}
- tq->tq_close = 0;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
@@ -109,6 +108,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
}
}
tq->tq_nthreads = nthr;
+ tq->tq_run = 1;
for (i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
@@ -157,19 +157,19 @@ nni_taskq_drain(nni_taskq *tq)
void
nni_taskq_fini(nni_taskq *tq)
{
- int i;
-
// First drain the taskq completely. This is necessary since some
// tasks that are presently running may need to schedule additional
// tasks, and we don't want those to block.
- nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
+ if (tq->tq_run) {
+ nni_mtx_lock(&tq->tq_mtx);
+ nni_taskq_drain_locked(tq);
- tq->tq_close = 1;
- nni_cv_wake(&tq->tq_cv);
- nni_mtx_unlock(&tq->tq_mtx);
- for (i = 0; i < tq->tq_nthreads; i++) {
+ tq->tq_run = 0;
+ nni_cv_wake(&tq->tq_cv);
+ nni_mtx_unlock(&tq->tq_mtx);
+ }
+ for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr));
@@ -201,7 +201,6 @@ void
nni_task_wait(nni_task *task)
{
nni_taskq *tq = task->task_tq;
- int i;
int running;
nni_mtx_lock(&tq->tq_mtx);
@@ -210,7 +209,7 @@ nni_task_wait(nni_task *task)
if (nni_list_active(&tq->tq_tasks, task)) {
running = 1;
} else {
- for (i = 0; i < tq->tq_nthreads; i++) {
+ for (int i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;
@@ -231,14 +230,13 @@ int
nni_task_cancel(nni_task *task)
{
nni_taskq *tq = task->task_tq;
- int i;
int running;
nni_mtx_lock(&tq->tq_mtx);
running = 1;
for (;;) {
running = 0;
- for (i = 0; i < tq->tq_nthreads; i++) {
+ for (int i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;