diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-19 18:34:59 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-19 18:34:59 -0700 |
| commit | 88fb04f61918b06e6e269c1960058c3df5e0a0ef (patch) | |
| tree | 447c44bae8e2fd93d95b04fe8ab83bf02314eaef /src/core/taskq.c | |
| parent | fa24bad0f7f82b4718cc2f13f60fcdd9b0cf86fe (diff) | |
| download | nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.gz nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.bz2 nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.zip | |
Always run the AIO completion logic.
We have seen some yet another weird situation where we had an orphaned
pipe, which was caused by not completing the callback. If we are going
to run nni_aio_fini, we should still run the callback (albeit with a
return value of NNG_ECANCELED or somesuch) to be sure that we can't
orphan stuff.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index e45819b5..36129bfd 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -23,6 +23,7 @@ struct nni_taskq { nni_taskq_thr *tq_threads; int tq_nthreads; int tq_close; + int tq_waiting; }; static nni_taskq *nni_taskq_systq = NULL; @@ -48,12 +49,21 @@ nni_taskq_thread(void *self) thr->tqt_wait = 0; nni_cv_wake(&tq->tq_cv); } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } + continue; } if (tq->tq_close) { break; } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } nni_cv_wait(&tq->tq_cv); } @@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) return (0); } +void +nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) +{ + int i; + int running; + + if (tq == NULL) { + tq = nni_taskq_systq; + } + + nni_mtx_lock(&tq->tq_mtx); + for (;;) { + running = 0; + if (nni_list_active(&tq->tq_ents, ent)) { + running = 1; + } else { + for (i = 0; i < tq->tq_nthreads; i++) { + if (tq->tq_threads[i].tqt_running == ent) { + running = 1; + break; + } + } + } + if (!running) { + break; + } + + tq->tq_waiting = 1; + nni_cv_wait(&tq->tq_cv); + } + nni_mtx_unlock(&tq->tq_mtx); +} + int nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) { @@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_mtx_lock(&tq->tq_mtx); running = 1; - do { + for (;;) { running = 0; for (i = 0; i < tq->tq_nthreads; i++) { if (tq->tq_threads[i].tqt_running == ent) { - tq->tq_threads[i].tqt_wait = 1; - nni_cv_wait(&tq->tq_cv); running = 1; break; } } - } while (running != 0); + + if (!running) { + break; + } + // tq->tq_threads[i].tqt_wait = 1; + tq->tq_waiting++; + nni_cv_wait(&tq->tq_cv); + } if (ent->tqe_tq != tq) { nni_mtx_unlock(&tq->tq_mtx); |
