diff options
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 66 |
1 files changed, 28 insertions, 38 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index e32983e9..8d42701e 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -20,7 +20,8 @@ struct nni_taskq_thr { struct nni_taskq { nni_list tq_tasks; nni_mtx tq_mtx; - nni_cv tq_cv; + nni_cv tq_sched_cv; + nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; int tq_run; @@ -45,13 +46,10 @@ nni_taskq_thread(void *self) task->task_cb(task->task_arg); nni_mtx_lock(&tq->tq_mtx); thr->tqt_running = NULL; - if (thr->tqt_wait) { - thr->tqt_wait = 0; - nni_cv_wake(&tq->tq_cv); - } - if (tq->tq_waiting) { + if (thr->tqt_wait || tq->tq_waiting) { + thr->tqt_wait = 0; tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_wait_cv); } continue; @@ -59,12 +57,12 @@ nni_taskq_thread(void *self) if (tq->tq_waiting) { tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_wait_cv); } if (!tq->tq_run) { break; } - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_sched_cv); } nni_mtx_unlock(&tq->tq_mtx); } @@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr) if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) { - NNI_FREE_STRUCT(tq); - return (rv); - } - if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) { - nni_mtx_fini(&tq->tq_mtx); + if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) == + NULL) { NNI_FREE_STRUCT(tq); - return (rv); + return (NNG_ENOMEM); } + tq->tq_nthreads = nthr; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); - tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr); - if (tq->tq_threads == NULL) { - nni_cv_fini(&tq->tq_cv); - nni_mtx_fini(&tq->tq_mtx); - NNI_FREE_STRUCT(tq); - return (NNG_ENOMEM); + if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) || + ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) || + ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) { + nni_taskq_fini(tq); + return (rv); } - tq->tq_nthreads = nthr; + for (i = 0; i < nthr; i++) { tq->tq_threads[i].tqt_tq = tq; tq->tq_threads[i].tqt_running = NULL; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { - goto fail; + nni_taskq_fini(tq); + return (rv); } } - tq->tq_nthreads = nthr; - tq->tq_run = 1; + tq->tq_run = 1; for (i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } *tqp = tq; return (0); - -fail: - - nni_taskq_fini(tq); - return (rv); } static void @@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq) break; } tq->tq_waiting++; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } } @@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq) nni_taskq_drain_locked(tq); tq->tq_run = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_fini(&tq->tq_threads[i].tqt_thread); } - nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr)); - nni_cv_fini(&tq->tq_cv); + nni_cv_fini(&tq->tq_wait_cv); + nni_cv_fini(&tq->tq_sched_cv); nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads); NNI_FREE_STRUCT(tq); } @@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task) if (!nni_list_active(&tq->tq_tasks, task)) { nni_list_append(&tq->tq_tasks, task); } - nni_cv_wake(&tq->tq_cv); + nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } @@ -221,7 +211,7 @@ nni_task_wait(nni_task *task) } tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } nni_mtx_unlock(&tq->tq_mtx); } @@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task) } // tq->tq_threads[i].tqt_wait = 1; tq->tq_waiting++; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } if (nni_list_active(&tq->tq_tasks, task)) { |
