aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c56
1 files changed, 52 insertions, 4 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e45819b5..36129bfd 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -23,6 +23,7 @@ struct nni_taskq {
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_close;
+ int tq_waiting;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -48,12 +49,21 @@ nni_taskq_thread(void *self)
thr->tqt_wait = 0;
nni_cv_wake(&tq->tq_cv);
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
+
continue;
}
if (tq->tq_close) {
break;
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
nni_cv_wait(&tq->tq_cv);
}
@@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (0);
}
+void
+nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+{
+ int i;
+ int running;
+
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+
+ nni_mtx_lock(&tq->tq_mtx);
+ for (;;) {
+ running = 0;
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ running = 1;
+ } else {
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ running = 1;
+ break;
+ }
+ }
+ }
+ if (!running) {
+ break;
+ }
+
+ tq->tq_waiting = 1;
+ nni_cv_wait(&tq->tq_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
int
nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
@@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_lock(&tq->tq_mtx);
running = 1;
- do {
+ for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == ent) {
- tq->tq_threads[i].tqt_wait = 1;
- nni_cv_wait(&tq->tq_cv);
running = 1;
break;
}
}
- } while (running != 0);
+
+ if (!running) {
+ break;
+ }
+ // tq->tq_threads[i].tqt_wait = 1;
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);