aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c2
-rw-r--r--src/core/taskq.c64
-rw-r--r--src/core/taskq.h2
3 files changed, 50 insertions, 18 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 2f871f73..11aadcb7 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -41,7 +41,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- nni_taskq_cancel(&aio->a_tqe);
+ nni_taskq_cancel(NULL, &aio->a_tqe);
nni_cv_fini(&aio->a_cv);
nni_mtx_fini(&aio->a_lk);
}
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 00b6d3cb..f33a68bf 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -9,11 +9,18 @@
#include "core/nng_impl.h"
+typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_taskq_thr {
+ nni_taskq * tqt_tq;
+ nni_thr tqt_thread;
+ nni_taskq_ent * tqt_running;
+ int tqt_wait;
+};
struct nni_taskq {
nni_list tq_ents;
nni_mtx tq_mtx;
nni_cv tq_cv;
- nni_thr * tq_threads;
+ nni_taskq_thr * tq_threads;
int tq_nthreads;
int tq_close;
};
@@ -23,7 +30,8 @@ static nni_taskq *nni_taskq_systq = NULL;
static void
nni_taskq_thread(void *self)
{
- nni_taskq *tq = self;
+ nni_taskq_thr *thr = self;
+ nni_taskq *tq = thr->tqt_tq;
nni_taskq_ent *ent;
nni_mtx_lock(&tq->tq_mtx);
@@ -31,9 +39,15 @@ nni_taskq_thread(void *self)
if ((ent = nni_list_first(&tq->tq_ents)) != NULL) {
nni_list_remove(&tq->tq_ents, ent);
ent->tqe_tq = NULL;
+ thr->tqt_running = ent;
nni_mtx_unlock(&tq->tq_mtx);
ent->tqe_cb(ent->tqe_arg);
nni_mtx_lock(&tq->tq_mtx);
+ thr->tqt_running = NULL;
+ if (thr->tqt_wait) {
+ thr->tqt_wait = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
continue;
}
@@ -69,19 +83,26 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
tq->tq_close = 0;
NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
- if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) {
+ tq->tq_threads = nni_alloc(sizeof (nni_taskq_thr) * nthr);
+ if (tq->tq_threads == NULL) {
+ nni_cv_fini(&tq->tq_cv);
+ nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCT(tq);
return (NNG_ENOMEM);
}
tq->tq_nthreads = nthr;
for (i = 0; i < nthr; i++) {
- rv = nni_thr_init(&tq->tq_threads[i], nni_taskq_thread, tq);
+ tq->tq_threads[i].tqt_tq = tq;
+ tq->tq_threads[i].tqt_running = NULL;
+ rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
+ nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
goto fail;
}
}
tq->tq_nthreads = nthr;
for (i = 0; i < tq->tq_nthreads; i++) {
- nni_thr_run(&tq->tq_threads[i]);
+ nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
@@ -103,9 +124,9 @@ nni_taskq_fini(nni_taskq *tq)
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);
for (i = 0; i < tq->tq_nthreads; i++) {
- nni_thr_fini(&tq->tq_threads[i]);
+ nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_thr));
+ nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_taskq_thr));
nni_cv_fini(&tq->tq_cv);
nni_mtx_fini(&tq->tq_mtx);
NNI_FREE_STRUCT(tq);
@@ -136,22 +157,33 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
int
-nni_taskq_cancel(nni_taskq_ent *ent)
+nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
- nni_taskq *tq;
+ int i;
+ int running;
- if ((tq = ent->tqe_tq) == NULL) {
- return (NNG_ENOENT);
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
}
+
nni_mtx_lock(&tq->tq_mtx);
- if (ent->tqe_tq == NULL) {
+ running = 1;
+ do {
+ running = 0;
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ tq->tq_threads[i].tqt_wait = 1;
+ nni_cv_wait(&tq->tq_cv);
+ running = 1;
+ break;
+ }
+ }
+ } while (running != 0);
+
+ if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);
return (NNG_ENOENT);
}
- if ((ent->tqe_tq) != tq) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_EBUSY);
- }
nni_list_remove(&tq->tq_ents, ent);
nni_mtx_unlock(&tq->tq_mtx);
return (0);
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 931e6c77..7f717c3c 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -27,7 +27,7 @@ extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *);
-extern int nni_taskq_cancel(nni_taskq_ent *);
+extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *);
extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *);
extern int nni_taskq_sys_init(void);