diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 2 | ||||
| -rw-r--r-- | src/core/taskq.c | 64 | ||||
| -rw-r--r-- | src/core/taskq.h | 2 |
3 files changed, 50 insertions, 18 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 2f871f73..11aadcb7 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -41,7 +41,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { - nni_taskq_cancel(&aio->a_tqe); + nni_taskq_cancel(NULL, &aio->a_tqe); nni_cv_fini(&aio->a_cv); nni_mtx_fini(&aio->a_lk); } diff --git a/src/core/taskq.c b/src/core/taskq.c index 00b6d3cb..f33a68bf 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -9,11 +9,18 @@ #include "core/nng_impl.h" +typedef struct nni_taskq_thr nni_taskq_thr; +struct nni_taskq_thr { + nni_taskq * tqt_tq; + nni_thr tqt_thread; + nni_taskq_ent * tqt_running; + int tqt_wait; +}; struct nni_taskq { nni_list tq_ents; nni_mtx tq_mtx; nni_cv tq_cv; - nni_thr * tq_threads; + nni_taskq_thr * tq_threads; int tq_nthreads; int tq_close; }; @@ -23,7 +30,8 @@ static nni_taskq *nni_taskq_systq = NULL; static void nni_taskq_thread(void *self) { - nni_taskq *tq = self; + nni_taskq_thr *thr = self; + nni_taskq *tq = thr->tqt_tq; nni_taskq_ent *ent; nni_mtx_lock(&tq->tq_mtx); @@ -31,9 +39,15 @@ nni_taskq_thread(void *self) if ((ent = nni_list_first(&tq->tq_ents)) != NULL) { nni_list_remove(&tq->tq_ents, ent); ent->tqe_tq = NULL; + thr->tqt_running = ent; nni_mtx_unlock(&tq->tq_mtx); ent->tqe_cb(ent->tqe_arg); nni_mtx_lock(&tq->tq_mtx); + thr->tqt_running = NULL; + if (thr->tqt_wait) { + thr->tqt_wait = 0; + nni_cv_wake(&tq->tq_cv); + } continue; } @@ -69,19 +83,26 @@ nni_taskq_init(nni_taskq **tqp, int nthr) tq->tq_close = 0; NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); - if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) { + tq->tq_threads = nni_alloc(sizeof (nni_taskq_thr) * nthr); + if (tq->tq_threads == NULL) { + nni_cv_fini(&tq->tq_cv); + nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCT(tq); return (NNG_ENOMEM); } tq->tq_nthreads = nthr; for (i = 0; i < nthr; i++) { - rv = nni_thr_init(&tq->tq_threads[i], nni_taskq_thread, tq); + tq->tq_threads[i].tqt_tq = tq; + tq->tq_threads[i].tqt_running = NULL; + rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, + nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { goto fail; } } tq->tq_nthreads = nthr; for (i = 0; i < tq->tq_nthreads; i++) { - nni_thr_run(&tq->tq_threads[i]); + nni_thr_run(&tq->tq_threads[i].tqt_thread); } *tqp = tq; return (0); @@ -103,9 +124,9 @@ nni_taskq_fini(nni_taskq *tq) nni_cv_wake(&tq->tq_cv); nni_mtx_unlock(&tq->tq_mtx); for (i = 0; i < tq->tq_nthreads; i++) { - nni_thr_fini(&tq->tq_threads[i]); + nni_thr_fini(&tq->tq_threads[i].tqt_thread); } - nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_thr)); + nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_taskq_thr)); nni_cv_fini(&tq->tq_cv); nni_mtx_fini(&tq->tq_mtx); NNI_FREE_STRUCT(tq); @@ -136,22 +157,33 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) int -nni_taskq_cancel(nni_taskq_ent *ent) +nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) { - nni_taskq *tq; + int i; + int running; - if ((tq = ent->tqe_tq) == NULL) { - return (NNG_ENOENT); + if (tq == NULL) { + tq = nni_taskq_systq; } + nni_mtx_lock(&tq->tq_mtx); - if (ent->tqe_tq == NULL) { + running = 1; + do { + running = 0; + for (i = 0; i < tq->tq_nthreads; i++) { + if (tq->tq_threads[i].tqt_running == ent) { + tq->tq_threads[i].tqt_wait = 1; + nni_cv_wait(&tq->tq_cv); + running = 1; + break; + } + } + } while (running != 0); + + if (ent->tqe_tq != tq) { nni_mtx_unlock(&tq->tq_mtx); return (NNG_ENOENT); } - if ((ent->tqe_tq) != tq) { - nni_mtx_unlock(&tq->tq_mtx); - return (NNG_EBUSY); - } nni_list_remove(&tq->tq_ents, ent); nni_mtx_unlock(&tq->tq_mtx); return (0); diff --git a/src/core/taskq.h b/src/core/taskq.h index 931e6c77..7f717c3c 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -27,7 +27,7 @@ extern int nni_taskq_init(nni_taskq **, int); extern void nni_taskq_fini(nni_taskq *); extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *); -extern int nni_taskq_cancel(nni_taskq_ent *); +extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *); extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *); extern int nni_taskq_sys_init(void); |
