From 88fb04f61918b06e6e269c1960058c3df5e0a0ef Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 19 Jul 2017 18:34:59 -0700 Subject: Always run the AIO completion logic. We have seen some yet another weird situation where we had an orphaned pipe, which was caused by not completing the callback. If we are going to run nni_aio_fini, we should still run the callback (albeit with a return value of NNG_ECANCELED or somesuch) to be sure that we can't orphan stuff. --- src/core/aio.c | 30 +++++++++++++++++++++--------- src/core/taskq.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- src/core/taskq.h | 1 + 3 files changed, 74 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/core/aio.c b/src/core/aio.c index 238522b0..a9fbc50d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,9 +12,10 @@ #include enum nni_aio_flags { - NNI_AIO_WAKE = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, + NNI_AIO_START = 0x8, }; // These are used for expiration. @@ -48,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = 0; + aio->a_flags = NNI_AIO_START; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -65,6 +66,10 @@ nni_aio_fini(nni_aio *aio) aio->a_flags |= NNI_AIO_DONE; aio->a_result = NNG_ECANCELED; cancelfn = aio->a_prov_cancel; + if (aio->a_flags & NNI_AIO_START) { + aio->a_flags &= ~NNI_AIO_START; + nni_taskq_dispatch(NULL, &aio->a_tqe); + } } else { cancelfn = NULL; @@ -76,7 +81,8 @@ nni_aio_fini(nni_aio *aio) } nni_mtx_unlock(&aio->a_lk); - // just a list operation at this point. + // Stop any timeouts. If one was in flight, we wait until it + // completes (it could fire the completion callback.) nni_aio_expire_remove(aio); // Cancel the AIO if it was scheduled. @@ -84,10 +90,9 @@ nni_aio_fini(nni_aio *aio) cancelfn(aio); } - // if the task is already dispatched, cancel it (or wait for it to - // complete). No further dispatches will happen because of the - // above logic to set NNI_AIO_FINI. - nni_taskq_cancel(NULL, &aio->a_tqe); + // Wait for any outstanding task to complete. We won't schedule + // new stuff because nni_aio_start will fail (due to AIO_FINI). + nni_taskq_wait(NULL, &aio->a_tqe); // At this point the AIO is done. nni_cv_fini(&aio->a_cv); @@ -168,6 +173,7 @@ nni_aio_cancel(nni_aio *aio, int rv) return; } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; aio->a_result = rv; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; @@ -213,6 +219,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = result; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -238,6 +246,8 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = result; aio->a_count = 0; aio->a_prov_cancel = NULL; @@ -382,6 +392,8 @@ nni_aio_expire_loop(void *arg) } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = NNG_ETIMEDOUT; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; diff --git a/src/core/taskq.c b/src/core/taskq.c index e45819b5..36129bfd 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -23,6 +23,7 @@ struct nni_taskq { nni_taskq_thr *tq_threads; int tq_nthreads; int tq_close; + int tq_waiting; }; static nni_taskq *nni_taskq_systq = NULL; @@ -48,12 +49,21 @@ nni_taskq_thread(void *self) thr->tqt_wait = 0; nni_cv_wake(&tq->tq_cv); } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } + continue; } if (tq->tq_close) { break; } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } nni_cv_wait(&tq->tq_cv); } @@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) return (0); } +void +nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) +{ + int i; + int running; + + if (tq == NULL) { + tq = nni_taskq_systq; + } + + nni_mtx_lock(&tq->tq_mtx); + for (;;) { + running = 0; + if (nni_list_active(&tq->tq_ents, ent)) { + running = 1; + } else { + for (i = 0; i < tq->tq_nthreads; i++) { + if (tq->tq_threads[i].tqt_running == ent) { + running = 1; + break; + } + } + } + if (!running) { + break; + } + + tq->tq_waiting = 1; + nni_cv_wait(&tq->tq_cv); + } + nni_mtx_unlock(&tq->tq_mtx); +} + int nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) { @@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_mtx_lock(&tq->tq_mtx); running = 1; - do { + for (;;) { running = 0; for (i = 0; i < tq->tq_nthreads; i++) { if (tq->tq_threads[i].tqt_running == ent) { - tq->tq_threads[i].tqt_wait = 1; - nni_cv_wait(&tq->tq_cv); running = 1; break; } } - } while (running != 0); + + if (!running) { + break; + } + // tq->tq_threads[i].tqt_wait = 1; + tq->tq_waiting++; + nni_cv_wait(&tq->tq_cv); + } if (ent->tqe_tq != tq) { nni_mtx_unlock(&tq->tq_mtx); diff --git a/src/core/taskq.h b/src/core/taskq.h index f1e29a34..e6399706 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -28,6 +28,7 @@ extern void nni_taskq_fini(nni_taskq *); extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *); extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *); +extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *); extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *); extern int nni_taskq_sys_init(void); -- cgit v1.2.3-70-g09d2