diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-21 02:46:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-21 02:46:01 -0700 |
| commit | 537e2eda8d9fda2001295c835a4720def6a237f1 (patch) | |
| tree | 615d05096e0fe4c8bfbccf31f37c7256381ccd8e /src/core/taskq.c | |
| parent | 07078e1cf761cb6e56e46bde3ade6f792368d7dd (diff) | |
| download | nng-537e2eda8d9fda2001295c835a4720def6a237f1.tar.gz nng-537e2eda8d9fda2001295c835a4720def6a237f1.tar.bz2 nng-537e2eda8d9fda2001295c835a4720def6a237f1.zip | |
Simpler taskq API.
The queue is bound at initialization time of the task, and we call
entries just tasks, so we don't have to pass around a taskq pointer
across all the calls. Further, nni_task_dispatch is now guaranteed
to succeed.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 90 |
1 files changed, 37 insertions, 53 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index 64179790..cf722596 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -12,13 +12,13 @@ typedef struct nni_taskq_thr nni_taskq_thr; struct nni_taskq_thr { - nni_taskq * tqt_tq; - nni_thr tqt_thread; - nni_taskq_ent *tqt_running; - int tqt_wait; + nni_taskq *tqt_tq; + nni_thr tqt_thread; + nni_task * tqt_running; + int tqt_wait; }; struct nni_taskq { - nni_list tq_ents; + nni_list tq_tasks; nni_mtx tq_mtx; nni_cv tq_cv; nni_taskq_thr *tq_threads; @@ -34,16 +34,15 @@ nni_taskq_thread(void *self) { nni_taskq_thr *thr = self; nni_taskq * tq = thr->tqt_tq; - nni_taskq_ent *ent; + nni_task * task; nni_mtx_lock(&tq->tq_mtx); for (;;) { - if ((ent = nni_list_first(&tq->tq_ents)) != NULL) { - nni_list_remove(&tq->tq_ents, ent); - ent->tqe_tq = NULL; - thr->tqt_running = ent; + if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_list_remove(&tq->tq_tasks, task); + thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); - ent->tqe_cb(ent->tqe_arg); + task->task_cb(task->task_arg); nni_mtx_lock(&tq->tq_mtx); thr->tqt_running = NULL; if (thr->tqt_wait) { @@ -91,7 +90,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } tq->tq_close = 0; - NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); + NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr); if (tq->tq_threads == NULL) { @@ -141,46 +140,35 @@ nni_taskq_fini(nni_taskq *tq) NNI_FREE_STRUCT(tq); } -int -nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent) +void +nni_task_dispatch(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; nni_mtx_lock(&tq->tq_mtx); - if (tq->tq_close) { - nni_mtx_unlock(&tq->tq_mtx); - return (NNG_ECLOSED); - } // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_ents, ent)) { - ent->tqe_tq = tq; - nni_list_append(&tq->tq_ents, ent); + if (!nni_list_active(&tq->tq_tasks, task)) { + nni_list_append(&tq->tq_tasks, task); } nni_cv_wake(&tq->tq_cv); nni_mtx_unlock(&tq->tq_mtx); - return (0); } void -nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) +nni_task_wait(nni_task *task) { - int i; - int running; - - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; + int i; + int running; nni_mtx_lock(&tq->tq_mtx); for (;;) { running = 0; - if (nni_list_active(&tq->tq_ents, ent)) { + if (nni_list_active(&tq->tq_tasks, task)) { running = 1; } else { for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == ent) { + if (tq->tq_threads[i].tqt_running == task) { running = 1; break; } @@ -197,21 +185,18 @@ nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent) } int -nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) +nni_task_cancel(nni_task *task) { - int i; - int running; - - if (tq == NULL) { - tq = nni_taskq_systq; - } + nni_taskq *tq = task->task_tq; + int i; + int running; nni_mtx_lock(&tq->tq_mtx); running = 1; for (;;) { running = 0; for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == ent) { + if (tq->tq_threads[i].tqt_running == task) { running = 1; break; } @@ -225,24 +210,23 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent) nni_cv_wait(&tq->tq_cv); } - if (ent->tqe_tq != tq) { - nni_mtx_unlock(&tq->tq_mtx); - return (NNG_ENOENT); - } - if (nni_list_active(&tq->tq_ents, ent)) { - nni_list_remove(&tq->tq_ents, ent); + if (nni_list_active(&tq->tq_tasks, task)) { + nni_list_remove(&tq->tq_tasks, task); } nni_mtx_unlock(&tq->tq_mtx); return (0); } void -nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg) +nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) { - NNI_LIST_NODE_INIT(&ent->tqe_node); - ent->tqe_cb = cb; - ent->tqe_arg = arg; - ent->tqe_tq = NULL; + if (tq == NULL) { + tq = nni_taskq_systq; + } + NNI_LIST_NODE_INIT(&task->task_node); + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq; } int |
