aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c90
1 files changed, 37 insertions, 53 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 64179790..cf722596 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -12,13 +12,13 @@
typedef struct nni_taskq_thr nni_taskq_thr;
struct nni_taskq_thr {
- nni_taskq * tqt_tq;
- nni_thr tqt_thread;
- nni_taskq_ent *tqt_running;
- int tqt_wait;
+ nni_taskq *tqt_tq;
+ nni_thr tqt_thread;
+ nni_task * tqt_running;
+ int tqt_wait;
};
struct nni_taskq {
- nni_list tq_ents;
+ nni_list tq_tasks;
nni_mtx tq_mtx;
nni_cv tq_cv;
nni_taskq_thr *tq_threads;
@@ -34,16 +34,15 @@ nni_taskq_thread(void *self)
{
nni_taskq_thr *thr = self;
nni_taskq * tq = thr->tqt_tq;
- nni_taskq_ent *ent;
+ nni_task * task;
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
- if ((ent = nni_list_first(&tq->tq_ents)) != NULL) {
- nni_list_remove(&tq->tq_ents, ent);
- ent->tqe_tq = NULL;
- thr->tqt_running = ent;
+ if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_list_remove(&tq->tq_tasks, task);
+ thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
- ent->tqe_cb(ent->tqe_arg);
+ task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
if (thr->tqt_wait) {
@@ -91,7 +90,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
tq->tq_close = 0;
- NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
+ NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
if (tq->tq_threads == NULL) {
@@ -141,46 +140,35 @@ nni_taskq_fini(nni_taskq *tq)
NNI_FREE_STRUCT(tq);
}
-int
-nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
+void
+nni_task_dispatch(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
nni_mtx_lock(&tq->tq_mtx);
- if (tq->tq_close) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_ECLOSED);
- }
// It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_ents, ent)) {
- ent->tqe_tq = tq;
- nni_list_append(&tq->tq_ents, ent);
+ if (!nni_list_active(&tq->tq_tasks, task)) {
+ nni_list_append(&tq->tq_tasks, task);
}
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);
- return (0);
}
void
-nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+nni_task_wait(nni_task *task)
{
- int i;
- int running;
-
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
+ int i;
+ int running;
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
running = 0;
- if (nni_list_active(&tq->tq_ents, ent)) {
+ if (nni_list_active(&tq->tq_tasks, task)) {
running = 1;
} else {
for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == ent) {
+ if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;
}
@@ -197,21 +185,18 @@ nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
}
int
-nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
+nni_task_cancel(nni_task *task)
{
- int i;
- int running;
-
- if (tq == NULL) {
- tq = nni_taskq_systq;
- }
+ nni_taskq *tq = task->task_tq;
+ int i;
+ int running;
nni_mtx_lock(&tq->tq_mtx);
running = 1;
for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == ent) {
+ if (tq->tq_threads[i].tqt_running == task) {
running = 1;
break;
}
@@ -225,24 +210,23 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_cv_wait(&tq->tq_cv);
}
- if (ent->tqe_tq != tq) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_ENOENT);
- }
- if (nni_list_active(&tq->tq_ents, ent)) {
- nni_list_remove(&tq->tq_ents, ent);
+ if (nni_list_active(&tq->tq_tasks, task)) {
+ nni_list_remove(&tq->tq_tasks, task);
}
nni_mtx_unlock(&tq->tq_mtx);
return (0);
}
void
-nni_taskq_ent_init(nni_taskq_ent *ent, nni_cb cb, void *arg)
+nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
{
- NNI_LIST_NODE_INIT(&ent->tqe_node);
- ent->tqe_cb = cb;
- ent->tqe_arg = arg;
- ent->tqe_tq = NULL;
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+ NNI_LIST_NODE_INIT(&task->task_node);
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq;
}
int