aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-10 00:10:50 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-10 00:10:50 -0700
commitac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a (patch)
tree49f479185a08e8f4b2538b3fb69ab57319a4ba60 /src/core/taskq.c
parent9feb54e9c7ab116ba566086a76604338f86e3bc3 (diff)
downloadnng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.tar.gz
nng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.tar.bz2
nng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.zip
Thundering herd kills performance.
A little benchmarking showed that we were encountering far too many wakeups, leading to severe performance degradation; we had a bunch of threads all sleeping on the same condition variable (taskqs) and this woke them all up, resulting in heavy mutex contention. Since we only need one of the threads to wake, and we don't care which one, let's just wake only one. This reduced RTT latency from about 240 us down to about 30 s. (1/8 of the former cost.) There's still a bunch of tuning to do; performance remains worse than we would like.
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c66
1 files changed, 28 insertions, 38 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e32983e9..8d42701e 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -20,7 +20,8 @@ struct nni_taskq_thr {
struct nni_taskq {
nni_list tq_tasks;
nni_mtx tq_mtx;
- nni_cv tq_cv;
+ nni_cv tq_sched_cv;
+ nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_run;
@@ -45,13 +46,10 @@ nni_taskq_thread(void *self)
task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
- if (thr->tqt_wait) {
- thr->tqt_wait = 0;
- nni_cv_wake(&tq->tq_cv);
- }
- if (tq->tq_waiting) {
+ if (thr->tqt_wait || tq->tq_waiting) {
+ thr->tqt_wait = 0;
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
continue;
@@ -59,12 +57,12 @@ nni_taskq_thread(void *self)
if (tq->tq_waiting) {
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
if (!tq->tq_run) {
break;
}
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_sched_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
- NNI_FREE_STRUCT(tq);
- return (rv);
- }
- if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
- nni_mtx_fini(&tq->tq_mtx);
+ if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) ==
+ NULL) {
NNI_FREE_STRUCT(tq);
- return (rv);
+ return (NNG_ENOMEM);
}
+ tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
- if (tq->tq_threads == NULL) {
- nni_cv_fini(&tq->tq_cv);
- nni_mtx_fini(&tq->tq_mtx);
- NNI_FREE_STRUCT(tq);
- return (NNG_ENOMEM);
+ if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
+ nni_taskq_fini(tq);
+ return (rv);
}
- tq->tq_nthreads = nthr;
+
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
tq->tq_threads[i].tqt_running = NULL;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
- goto fail;
+ nni_taskq_fini(tq);
+ return (rv);
}
}
- tq->tq_nthreads = nthr;
- tq->tq_run = 1;
+ tq->tq_run = 1;
for (i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
-
-fail:
-
- nni_taskq_fini(tq);
- return (rv);
}
static void
@@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq)
break;
}
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
}
@@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq)
nni_taskq_drain_locked(tq);
tq->tq_run = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr));
- nni_cv_fini(&tq->tq_cv);
+ nni_cv_fini(&tq->tq_wait_cv);
+ nni_cv_fini(&tq->tq_sched_cv);
nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads);
NNI_FREE_STRUCT(tq);
}
@@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task)
if (!nni_list_active(&tq->tq_tasks, task)) {
nni_list_append(&tq->tq_tasks, task);
}
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -221,7 +211,7 @@ nni_task_wait(nni_task *task)
}
tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task)
}
// tq->tq_threads[i].tqt_wait = 1;
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
if (nni_list_active(&tq->tq_tasks, task)) {