summaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-19 18:34:59 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-19 18:34:59 -0700
commit88fb04f61918b06e6e269c1960058c3df5e0a0ef (patch)
tree447c44bae8e2fd93d95b04fe8ab83bf02314eaef /src/core/taskq.c
parentfa24bad0f7f82b4718cc2f13f60fcdd9b0cf86fe (diff)
downloadnng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.gz
nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.bz2
nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.zip
Always run the AIO completion logic.
We have seen some yet another weird situation where we had an orphaned pipe, which was caused by not completing the callback. If we are going to run nni_aio_fini, we should still run the callback (albeit with a return value of NNG_ECANCELED or somesuch) to be sure that we can't orphan stuff.
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c56
1 files changed, 52 insertions, 4 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e45819b5..36129bfd 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -23,6 +23,7 @@ struct nni_taskq {
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_close;
+ int tq_waiting;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -48,12 +49,21 @@ nni_taskq_thread(void *self)
thr->tqt_wait = 0;
nni_cv_wake(&tq->tq_cv);
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
+
continue;
}
if (tq->tq_close) {
break;
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
nni_cv_wait(&tq->tq_cv);
}
@@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (0);
}
+void
+nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+{
+ int i;
+ int running;
+
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+
+ nni_mtx_lock(&tq->tq_mtx);
+ for (;;) {
+ running = 0;
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ running = 1;
+ } else {
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ running = 1;
+ break;
+ }
+ }
+ }
+ if (!running) {
+ break;
+ }
+
+ tq->tq_waiting = 1;
+ nni_cv_wait(&tq->tq_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
int
nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
@@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_lock(&tq->tq_mtx);
running = 1;
- do {
+ for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == ent) {
- tq->tq_threads[i].tqt_wait = 1;
- nni_cv_wait(&tq->tq_cv);
running = 1;
break;
}
}
- } while (running != 0);
+
+ if (!running) {
+ break;
+ }
+ // tq->tq_threads[i].tqt_wait = 1;
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);