aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/taskq.c47
-rw-r--r--src/core/taskq.h1
2 files changed, 44 insertions, 4 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 33116ec9..5fbcdb33 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -57,14 +57,13 @@ nni_taskq_thread(void *self)
continue;
}
- if (tq->tq_close) {
- break;
- }
if (tq->tq_waiting) {
tq->tq_waiting = 0;
nni_cv_wake(&tq->tq_cv);
}
-
+ if (tq->tq_close) {
+ break;
+ }
nni_cv_wait(&tq->tq_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
@@ -122,12 +121,52 @@ fail:
return (rv);
}
+static void
+nni_taskq_drain_locked(nni_taskq *tq)
+{
+ // We need to first let the taskq completely drain.
+ for (;;) {
+ int busy = 0;
+ if (!nni_list_empty(&tq->tq_tasks)) {
+ busy = 1;
+ } else {
+ int i;
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running != 0) {
+ busy = 1;
+ break;
+ }
+ }
+ }
+ if (!busy) {
+ break;
+ }
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
+}
+
+void
+nni_taskq_drain(nni_taskq *tq)
+{
+ nni_mtx_lock(&tq->tq_mtx);
+ nni_taskq_drain_locked(tq);
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
void
nni_taskq_fini(nni_taskq *tq)
{
int i;
+ int busy;
+
+ // First drain the taskq completely. This is necessary since some
+ // tasks that are presently running may need to schedule additional
+ // tasks, and we don't want those to block.
nni_mtx_lock(&tq->tq_mtx);
+ nni_taskq_drain_locked(tq);
+
tq->tq_close = 1;
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 89b7be4a..40b5dc00 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -28,6 +28,7 @@ struct nni_task {
extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
+extern void nni_taskq_drain(nni_taskq *);
// nni_task_dispatch sends the task to the queue. It is guaranteed to
// succeed. (If the queue is shutdown, then the behavior is undefined.)