aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c47
1 files changed, 43 insertions, 4 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 33116ec9..5fbcdb33 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -57,14 +57,13 @@ nni_taskq_thread(void *self)
continue;
}
- if (tq->tq_close) {
- break;
- }
if (tq->tq_waiting) {
tq->tq_waiting = 0;
nni_cv_wake(&tq->tq_cv);
}
-
+ if (tq->tq_close) {
+ break;
+ }
nni_cv_wait(&tq->tq_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
@@ -122,12 +121,52 @@ fail:
return (rv);
}
+static void
+nni_taskq_drain_locked(nni_taskq *tq)
+{
+ // We need to first let the taskq completely drain.
+ for (;;) {
+ int busy = 0;
+ if (!nni_list_empty(&tq->tq_tasks)) {
+ busy = 1;
+ } else {
+ int i;
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running != 0) {
+ busy = 1;
+ break;
+ }
+ }
+ }
+ if (!busy) {
+ break;
+ }
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
+}
+
+void
+nni_taskq_drain(nni_taskq *tq)
+{
+ nni_mtx_lock(&tq->tq_mtx);
+ nni_taskq_drain_locked(tq);
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
void
nni_taskq_fini(nni_taskq *tq)
{
int i;
+ int busy;
+
+ // First drain the taskq completely. This is necessary since some
+ // tasks that are presently running may need to schedule additional
+ // tasks, and we don't want those to block.
nni_mtx_lock(&tq->tq_mtx);
+ nni_taskq_drain_locked(tq);
+
tq->tq_close = 1;
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);