diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-07 16:12:41 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-07 16:12:41 -0700 |
| commit | 22d991fda77578dd03c8d477f8427631e6383cee (patch) | |
| tree | c81eb35a13fbeb1dab8f8e6ca62dc8ccf42636e0 /src/core/taskq.c | |
| parent | c5354ea49184a359df8d477e844b1c52aeb234d5 (diff) | |
| download | nng-22d991fda77578dd03c8d477f8427631e6383cee.tar.gz nng-22d991fda77578dd03c8d477f8427631e6383cee.tar.bz2 nng-22d991fda77578dd03c8d477f8427631e6383cee.zip | |
Subsystem initialize is idempotent; simplify cleanup.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index 14b04085..e32983e9 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -23,7 +23,7 @@ struct nni_taskq { nni_cv tq_cv; nni_taskq_thr *tq_threads; int tq_nthreads; - int tq_close; + int tq_run; int tq_waiting; }; @@ -61,7 +61,7 @@ nni_taskq_thread(void *self) tq->tq_waiting = 0; nni_cv_wake(&tq->tq_cv); } - if (tq->tq_close) { + if (!tq->tq_run) { break; } nni_cv_wait(&tq->tq_cv); @@ -88,7 +88,6 @@ nni_taskq_init(nni_taskq **tqp, int nthr) NNI_FREE_STRUCT(tq); return (rv); } - tq->tq_close = 0; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr); @@ -109,6 +108,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) } } tq->tq_nthreads = nthr; + tq->tq_run = 1; for (i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } @@ -157,19 +157,19 @@ nni_taskq_drain(nni_taskq *tq) void nni_taskq_fini(nni_taskq *tq) { - int i; - // First drain the taskq completely. This is necessary since some // tasks that are presently running may need to schedule additional // tasks, and we don't want those to block. - nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); + if (tq->tq_run) { + nni_mtx_lock(&tq->tq_mtx); + nni_taskq_drain_locked(tq); - tq->tq_close = 1; - nni_cv_wake(&tq->tq_cv); - nni_mtx_unlock(&tq->tq_mtx); - for (i = 0; i < tq->tq_nthreads; i++) { + tq->tq_run = 0; + nni_cv_wake(&tq->tq_cv); + nni_mtx_unlock(&tq->tq_mtx); + } + for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_fini(&tq->tq_threads[i].tqt_thread); } nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr)); @@ -201,7 +201,6 @@ void nni_task_wait(nni_task *task) { nni_taskq *tq = task->task_tq; - int i; int running; nni_mtx_lock(&tq->tq_mtx); @@ -210,7 +209,7 @@ nni_task_wait(nni_task *task) if (nni_list_active(&tq->tq_tasks, task)) { running = 1; } else { - for (i = 0; i < tq->tq_nthreads; i++) { + for (int i = 0; i < tq->tq_nthreads; i++) { if (tq->tq_threads[i].tqt_running == task) { running = 1; break; @@ -231,14 +230,13 @@ int nni_task_cancel(nni_task *task) { nni_taskq *tq = task->task_tq; - int i; int running; nni_mtx_lock(&tq->tq_mtx); running = 1; for (;;) { running = 0; - for (i = 0; i < tq->tq_nthreads; i++) { + for (int i = 0; i < tq->tq_nthreads; i++) { if (tq->tq_threads[i].tqt_running == task) { running = 1; break; |
