aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-10 00:10:50 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-10 00:10:50 -0700
commitac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a (patch)
tree49f479185a08e8f4b2538b3fb69ab57319a4ba60
parent9feb54e9c7ab116ba566086a76604338f86e3bc3 (diff)
downloadnng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.tar.gz
nng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.tar.bz2
nng-ac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a.zip
Thundering herd kills performance.
A little benchmarking showed that we were encountering far too many wakeups, leading to severe performance degradation; we had a bunch of threads all sleeping on the same condition variable (taskqs) and this woke them all up, resulting in heavy mutex contention. Since we only need one of the threads to wake, and we don't care which one, let's just wake only one. This reduced RTT latency from about 240 us down to about 30 s. (1/8 of the former cost.) There's still a bunch of tuning to do; performance remains worse than we would like.
-rw-r--r--perf/perf.c6
-rw-r--r--src/core/aio.c11
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/platform.h5
-rw-r--r--src/core/taskq.c66
-rw-r--r--src/core/thread.c6
-rw-r--r--src/core/thread.h3
-rw-r--r--src/core/timer.c23
-rw-r--r--src/platform/posix/posix_thread.c11
-rw-r--r--src/platform/windows/win_thread.c7
10 files changed, 84 insertions, 57 deletions
diff --git a/perf/perf.c b/perf/perf.c
index 4827d4a2..2e631a01 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -219,13 +219,15 @@ do_inproc_lat(int argc, char **argv)
ia.count = parse_int(argv[1], "count");
ia.func = latency_server;
- // Sleep a bit.
- nng_usleep(100000);
if ((rv = nni_thr_init(&thr, do_inproc, &ia)) != 0) {
die("Cannot create thread: %s", nng_strerror(rv));
}
nni_thr_run(&thr);
+
+ // Sleep a bit.
+ nng_usleep(100000);
+
latency_client("inproc://latency_test", ia.msgsize, ia.count);
nni_thr_fini(&thr);
}
diff --git a/src/core/aio.c b/src/core/aio.c
index 3c85b78d..a39c0118 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -124,6 +124,7 @@ nni_aio_wait(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
while ((aio->a_active) && (!aio->a_done)) {
+ aio->a_waiting = 1;
nni_cv_wait(&aio->a_cv);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -201,7 +202,10 @@ nni_aio_finish_impl(
// still holding the reference.
if (!aio->a_expiring) {
aio->a_done = 1;
- nni_cv_wake(&aio->a_cv);
+ if (aio->a_waiting) {
+ aio->a_waiting = 0;
+ nni_cv_wake(&aio->a_cv);
+ }
nni_task_dispatch(&aio->a_task);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -337,7 +341,10 @@ nni_aio_expire_loop(void *arg)
NNI_ASSERT(aio->a_prov_cancel == NULL);
aio->a_expiring = 0;
aio->a_done = 1;
- nni_cv_wake(&aio->a_cv);
+ if (aio->a_waiting) {
+ aio->a_waiting = 0;
+ nni_cv_wake(&aio->a_cv);
+ }
nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 0f41c01f..d48442eb 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -34,7 +34,8 @@ struct nni_aio {
unsigned a_pend : 1; // completion routine pending
unsigned a_active : 1; // aio was started
unsigned a_expiring : 1; // expiration callback in progress
- unsigned a_pad : 27; // ensure 32-bit alignment
+ unsigned a_waiting : 1; // a thread is waiting for this to finish
+ unsigned a_pad : 26; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
diff --git a/src/core/platform.h b/src/core/platform.h
index fa93916c..7acf16ef 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -122,6 +123,10 @@ extern void nni_plat_cv_fini(nni_plat_cv *);
// called with the lock held.
extern void nni_plat_cv_wake(nni_plat_cv *);
+// nni_plat_cv_wake1 wakes only a single waiter. Use with caution
+// to avoid losing the wakeup when multiple waiters may be present.
+extern void nni_plat_cv_wake1(nni_plat_cv *);
+
// nni_plat_cv_wait waits for a wake up on the condition variable. The
// associated lock is atomically released and reacquired upon wake up.
// Callers can be spuriously woken. The associated lock must be held.
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e32983e9..8d42701e 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -20,7 +20,8 @@ struct nni_taskq_thr {
struct nni_taskq {
nni_list tq_tasks;
nni_mtx tq_mtx;
- nni_cv tq_cv;
+ nni_cv tq_sched_cv;
+ nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_run;
@@ -45,13 +46,10 @@ nni_taskq_thread(void *self)
task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
- if (thr->tqt_wait) {
- thr->tqt_wait = 0;
- nni_cv_wake(&tq->tq_cv);
- }
- if (tq->tq_waiting) {
+ if (thr->tqt_wait || tq->tq_waiting) {
+ thr->tqt_wait = 0;
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
continue;
@@ -59,12 +57,12 @@ nni_taskq_thread(void *self)
if (tq->tq_waiting) {
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
if (!tq->tq_run) {
break;
}
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_sched_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
- NNI_FREE_STRUCT(tq);
- return (rv);
- }
- if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
- nni_mtx_fini(&tq->tq_mtx);
+ if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) ==
+ NULL) {
NNI_FREE_STRUCT(tq);
- return (rv);
+ return (NNG_ENOMEM);
}
+ tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
- if (tq->tq_threads == NULL) {
- nni_cv_fini(&tq->tq_cv);
- nni_mtx_fini(&tq->tq_mtx);
- NNI_FREE_STRUCT(tq);
- return (NNG_ENOMEM);
+ if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
+ nni_taskq_fini(tq);
+ return (rv);
}
- tq->tq_nthreads = nthr;
+
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
tq->tq_threads[i].tqt_running = NULL;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
- goto fail;
+ nni_taskq_fini(tq);
+ return (rv);
}
}
- tq->tq_nthreads = nthr;
- tq->tq_run = 1;
+ tq->tq_run = 1;
for (i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
-
-fail:
-
- nni_taskq_fini(tq);
- return (rv);
}
static void
@@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq)
break;
}
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
}
@@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq)
nni_taskq_drain_locked(tq);
tq->tq_run = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr));
- nni_cv_fini(&tq->tq_cv);
+ nni_cv_fini(&tq->tq_wait_cv);
+ nni_cv_fini(&tq->tq_sched_cv);
nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads);
NNI_FREE_STRUCT(tq);
}
@@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task)
if (!nni_list_active(&tq->tq_tasks, task)) {
nni_list_append(&tq->tq_tasks, task);
}
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -221,7 +211,7 @@ nni_task_wait(nni_task *task)
}
tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task)
}
// tq->tq_threads[i].tqt_wait = 1;
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
if (nni_list_active(&tq->tq_tasks, task)) {
diff --git a/src/core/thread.c b/src/core/thread.c
index 3cfdd83f..6c3bd9f3 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -73,6 +73,12 @@ nni_cv_wake(nni_cv *cv)
nni_plat_cv_wake(cv);
}
+void
+nni_cv_wake1(nni_cv *cv)
+{
+ nni_plat_cv_wake1(cv);
+}
+
static void
nni_thr_wrap(void *arg)
{
diff --git a/src/core/thread.h b/src/core/thread.h
index b528af2c..94b2a984 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -52,6 +52,9 @@ extern void nni_cv_fini(nni_cv *cv);
// nni_cv_wake wakes all waiters on the condition variable.
extern void nni_cv_wake(nni_cv *cv);
+// nni_cv_wake wakes just one waiter on the condition variable.
+extern void nni_cv_wake1(nni_cv *cv);
+
// nni_cv_wait waits until nni_cv_wake is called on the condition variable.
// The wait is indefinite. Premature wakeups are possible, so the caller
// must verify any related condition.
diff --git a/src/core/timer.c b/src/core/timer.c
index e8aa563c..73bc7604 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -18,7 +18,8 @@ static void nni_timer_loop(void *);
// XXX: replace this timer list with a minHeap based priority queue.
struct nni_timer {
nni_mtx t_mx;
- nni_cv t_cv;
+ nni_cv t_wait_cv;
+ nni_cv t_sched_cv;
nni_list t_entries;
nni_thr t_thr;
int t_run;
@@ -40,7 +41,8 @@ nni_timer_sys_init(void)
NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node);
if (((rv = nni_mtx_init(&timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_cv, &timer->t_mx)) != 0) ||
+ ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) ||
+ ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) ||
((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) {
nni_timer_sys_fini();
return (rv);
@@ -58,12 +60,13 @@ nni_timer_sys_fini(void)
if (timer->t_run) {
nni_mtx_lock(&timer->t_mx);
timer->t_run = 0;
- nni_cv_wake(&timer->t_cv);
+ nni_cv_wake(&timer->t_sched_cv);
nni_mtx_unlock(&timer->t_mx);
}
nni_thr_fini(&timer->t_thr);
- nni_cv_fini(&timer->t_cv);
+ nni_cv_fini(&timer->t_wait_cv);
+ nni_cv_fini(&timer->t_sched_cv);
nni_mtx_fini(&timer->t_mx);
}
@@ -88,7 +91,7 @@ nni_timer_cancel(nni_timer_node *node)
nni_mtx_lock(&timer->t_mx);
while (timer->t_active == node) {
timer->t_waiting = 1;
- nni_cv_wait(&timer->t_cv);
+ nni_cv_wait(&timer->t_wait_cv);
}
if (nni_list_active(&timer->t_entries, node)) {
nni_list_remove(&timer->t_entries, node);
@@ -121,7 +124,7 @@ nni_timer_schedule(nni_timer_node *node, nni_time when)
nni_list_append(&timer->t_entries, node);
}
if (wake) {
- nni_cv_wake(&timer->t_cv);
+ nni_cv_wake1(&timer->t_sched_cv);
}
nni_mtx_unlock(&timer->t_mx);
}
@@ -137,8 +140,8 @@ nni_timer_loop(void *arg)
nni_mtx_lock(&timer->t_mx);
timer->t_active = NULL;
if (timer->t_waiting) {
- timer->t_waiting = 1;
- nni_cv_wake(&timer->t_cv);
+ timer->t_waiting = 0;
+ nni_cv_wake(&timer->t_wait_cv);
}
if (!timer->t_run) {
nni_mtx_unlock(&timer->t_mx);
@@ -147,13 +150,13 @@ nni_timer_loop(void *arg)
now = nni_clock();
if ((node = nni_list_first(&timer->t_entries)) == NULL) {
- nni_cv_wait(&timer->t_cv);
+ nni_cv_wait(&timer->t_sched_cv);
nni_mtx_unlock(&timer->t_mx);
continue;
}
if (now < node->t_expire) {
// End of run, we have to wait for next.
- nni_cv_until(&timer->t_cv, node->t_expire);
+ nni_cv_until(&timer->t_sched_cv, node->t_expire);
nni_mtx_unlock(&timer->t_mx);
continue;
}
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 071e6007..0ef4754c 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -116,11 +117,13 @@ nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
- int rv;
+ (void) pthread_cond_broadcast(&cv->cv);
+}
- if ((rv = pthread_cond_broadcast(&cv->cv)) != 0) {
- nni_panic("pthread_cond_broadcast: %s", strerror(rv));
- }
+void
+nni_plat_cv_wake1(nni_plat_cv *cv)
+{
+ (void) pthread_cond_signal(&cv->cv);
}
void
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 0859b4a4..f78bfd9a 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -70,6 +71,12 @@ nni_plat_cv_wake(nni_plat_cv *cv)
}
void
+nni_plat_cv_wake1(nni_plat_cv *cv)
+{
+ WakeConditionVariable(&cv->cv);
+}
+
+void
nni_plat_cv_wait(nni_plat_cv *cv)
{
(void) SleepConditionVariableSRW(&cv->cv, cv->srl, INFINITE, 0);