diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 30 | ||||
| -rw-r--r-- | src/core/taskq.c | 56 | ||||
| -rw-r--r-- | src/core/taskq.h | 1 |
3 files changed, 74 insertions, 13 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 238522b0..a9fbc50d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,9 +12,10 @@ #include <string.h> enum nni_aio_flags { - NNI_AIO_WAKE = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, + NNI_AIO_START = 0x8, }; // These are used for expiration. @@ -48,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = 0; + aio->a_flags = NNI_AIO_START; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -65,6 +66,10 @@ nni_aio_fini(nni_aio *aio) aio->a_flags |= NNI_AIO_DONE; aio->a_result = NNG_ECANCELED; cancelfn = aio->a_prov_cancel; + if (aio->a_flags & NNI_AIO_START) { + aio->a_flags &= ~NNI_AIO_START; + nni_taskq_dispatch(NULL, &aio->a_tqe); + } } else { cancelfn = NULL; @@ -76,7 +81,8 @@ nni_aio_fini(nni_aio *aio) } nni_mtx_unlock(&aio->a_lk); - // just a list operation at this point. + // Stop any timeouts. If one was in flight, we wait until it + // completes (it could fire the completion callback.) nni_aio_expire_remove(aio); // Cancel the AIO if it was scheduled. @@ -84,10 +90,9 @@ nni_aio_fini(nni_aio *aio) cancelfn(aio); } - // if the task is already dispatched, cancel it (or wait for it to - // complete). No further dispatches will happen because of the - // above logic to set NNI_AIO_FINI. - nni_taskq_cancel(NULL, &aio->a_tqe); + // Wait for any outstanding task to complete. We won't schedule + // new stuff because nni_aio_start will fail (due to AIO_FINI). + nni_taskq_wait(NULL, &aio->a_tqe); // At this point the AIO is done. nni_cv_fini(&aio->a_cv); @@ -168,6 +173,7 @@ nni_aio_cancel(nni_aio *aio, int rv) return; } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; aio->a_result = rv; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; @@ -213,6 +219,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = result; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -238,6 +246,8 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = result; aio->a_count = 0; aio->a_prov_cancel = NULL; @@ -382,6 +392,8 @@ nni_aio_expire_loop(void *arg) } aio->a_flags |= NNI_AIO_DONE; + aio->a_flags &= ~NNI_AIO_START; + aio->a_result = NNG_ETIMEDOUT; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; diff --git a/src/core/taskq.c b/src/core/taskq.c index e45819b5..36129bfd 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -23,6 +23,7 @@ struct nni_taskq { nni_taskq_thr *tq_threads; int tq_nthreads; int tq_close; + int tq_waiting; }; static nni_taskq *nni_taskq_systq = NULL; @@ -48,12 +49,21 @@ nni_taskq_thread(void *self) thr->tqt_wait = 0; nni_cv_wake(&tq->tq_cv); } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } + continue; } if (tq->tq_close) { break; } + if (tq->tq_waiting) { + tq->tq_waiting = 0; + nni_cv_wake(&tq->tq_cv); + } nni_cv_wait(&tq->tq_cv); } @@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) return (0); } +void +nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) +{ + int i; + int running; + + if (tq == NULL) { + tq = nni_taskq_systq; + } + + nni_mtx_lock(&tq->tq_mtx); + for (;;) { + running = 0; + if (nni_list_active(&tq->tq_ents, ent)) { + running = 1; + } else { + for (i = 0; i < tq->tq_nthreads; i++) { + if (tq->tq_threads[i].tqt_running == ent) { + running = 1; + break; + } + } + } + if (!running) { + break; + } + + tq->tq_waiting = 1; + nni_cv_wait(&tq->tq_cv); + } + nni_mtx_unlock(&tq->tq_mtx); +} + int nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) { @@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_mtx_lock(&tq->tq_mtx); running = 1; - do { + for (;;) { running = 0; for (i = 0; i < tq->tq_nthreads; i++) { if (tq->tq_threads[i].tqt_running == ent) { - tq->tq_threads[i].tqt_wait = 1; - nni_cv_wait(&tq->tq_cv); running = 1; break; } } - } while (running != 0); + + if (!running) { + break; + } + // tq->tq_threads[i].tqt_wait = 1; + tq->tq_waiting++; + nni_cv_wait(&tq->tq_cv); + } if (ent->tqe_tq != tq) { nni_mtx_unlock(&tq->tq_mtx); diff --git a/src/core/taskq.h b/src/core/taskq.h index f1e29a34..e6399706 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -28,6 +28,7 @@ extern void nni_taskq_fini(nni_taskq *); extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *); extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *); +extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *); extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *); extern int nni_taskq_sys_init(void); |
