aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-08 22:56:45 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-08 22:56:45 -0700
commit23898a85c30d8f4f230303c362e91074a2b29b71 (patch)
tree4777d63a25cf229ea65465978013721b36633b10 /src/core/taskq.c
parent402c355eee701976a6c8ee00f6a915d1e417e163 (diff)
downloadnng-23898a85c30d8f4f230303c362e91074a2b29b71.tar.gz
nng-23898a85c30d8f4f230303c362e91074a2b29b71.tar.bz2
nng-23898a85c30d8f4f230303c362e91074a2b29b71.zip
Fix taskq_cancel race.
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c64
1 files changed, 48 insertions, 16 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 00b6d3cb..f33a68bf 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -9,11 +9,18 @@
#include "core/nng_impl.h"
+typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_taskq_thr {
+ nni_taskq * tqt_tq;
+ nni_thr tqt_thread;
+ nni_taskq_ent * tqt_running;
+ int tqt_wait;
+};
struct nni_taskq {
nni_list tq_ents;
nni_mtx tq_mtx;
nni_cv tq_cv;
- nni_thr * tq_threads;
+ nni_taskq_thr * tq_threads;
int tq_nthreads;
int tq_close;
};
@@ -23,7 +30,8 @@ static nni_taskq *nni_taskq_systq = NULL;
static void
nni_taskq_thread(void *self)
{
- nni_taskq *tq = self;
+ nni_taskq_thr *thr = self;
+ nni_taskq *tq = thr->tqt_tq;
nni_taskq_ent *ent;
nni_mtx_lock(&tq->tq_mtx);
@@ -31,9 +39,15 @@ nni_taskq_thread(void *self)
if ((ent = nni_list_first(&tq->tq_ents)) != NULL) {
nni_list_remove(&tq->tq_ents, ent);
ent->tqe_tq = NULL;
+ thr->tqt_running = ent;
nni_mtx_unlock(&tq->tq_mtx);
ent->tqe_cb(ent->tqe_arg);
nni_mtx_lock(&tq->tq_mtx);
+ thr->tqt_running = NULL;
+ if (thr->tqt_wait) {
+ thr->tqt_wait = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
continue;
}
@@ -69,19 +83,26 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
tq->tq_close = 0;
NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
- if ((tq->tq_threads = nni_alloc(sizeof (nni_thr) * nthr)) == NULL) {
+ tq->tq_threads = nni_alloc(sizeof (nni_taskq_thr) * nthr);
+ if (tq->tq_threads == NULL) {
+ nni_cv_fini(&tq->tq_cv);
+ nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCT(tq);
return (NNG_ENOMEM);
}
tq->tq_nthreads = nthr;
for (i = 0; i < nthr; i++) {
- rv = nni_thr_init(&tq->tq_threads[i], nni_taskq_thread, tq);
+ tq->tq_threads[i].tqt_tq = tq;
+ tq->tq_threads[i].tqt_running = NULL;
+ rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
+ nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
goto fail;
}
}
tq->tq_nthreads = nthr;
for (i = 0; i < tq->tq_nthreads; i++) {
- nni_thr_run(&tq->tq_threads[i]);
+ nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
@@ -103,9 +124,9 @@ nni_taskq_fini(nni_taskq *tq)
nni_cv_wake(&tq->tq_cv);
nni_mtx_unlock(&tq->tq_mtx);
for (i = 0; i < tq->tq_nthreads; i++) {
- nni_thr_fini(&tq->tq_threads[i]);
+ nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_thr));
+ nni_free(tq->tq_threads, tq->tq_nthreads * sizeof (nni_taskq_thr));
nni_cv_fini(&tq->tq_cv);
nni_mtx_fini(&tq->tq_mtx);
NNI_FREE_STRUCT(tq);
@@ -136,22 +157,33 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
int
-nni_taskq_cancel(nni_taskq_ent *ent)
+nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
- nni_taskq *tq;
+ int i;
+ int running;
- if ((tq = ent->tqe_tq) == NULL) {
- return (NNG_ENOENT);
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
}
+
nni_mtx_lock(&tq->tq_mtx);
- if (ent->tqe_tq == NULL) {
+ running = 1;
+ do {
+ running = 0;
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ tq->tq_threads[i].tqt_wait = 1;
+ nni_cv_wait(&tq->tq_cv);
+ running = 1;
+ break;
+ }
+ }
+ } while (running != 0);
+
+ if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);
return (NNG_ENOENT);
}
- if ((ent->tqe_tq) != tq) {
- nni_mtx_unlock(&tq->tq_mtx);
- return (NNG_EBUSY);
- }
nni_list_remove(&tq->tq_ents, ent);
nni_mtx_unlock(&tq->tq_mtx);
return (0);