aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--perf/perf.c6
-rw-r--r--src/core/aio.c11
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/platform.h5
-rw-r--r--src/core/taskq.c66
-rw-r--r--src/core/thread.c6
-rw-r--r--src/core/thread.h3
-rw-r--r--src/core/timer.c23
-rw-r--r--src/platform/posix/posix_thread.c11
-rw-r--r--src/platform/windows/win_thread.c7
10 files changed, 84 insertions, 57 deletions
diff --git a/perf/perf.c b/perf/perf.c
index 4827d4a2..2e631a01 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -219,13 +219,15 @@ do_inproc_lat(int argc, char **argv)
ia.count = parse_int(argv[1], "count");
ia.func = latency_server;
- // Sleep a bit.
- nng_usleep(100000);
if ((rv = nni_thr_init(&thr, do_inproc, &ia)) != 0) {
die("Cannot create thread: %s", nng_strerror(rv));
}
nni_thr_run(&thr);
+
+ // Sleep a bit.
+ nng_usleep(100000);
+
latency_client("inproc://latency_test", ia.msgsize, ia.count);
nni_thr_fini(&thr);
}
diff --git a/src/core/aio.c b/src/core/aio.c
index 3c85b78d..a39c0118 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -124,6 +124,7 @@ nni_aio_wait(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
while ((aio->a_active) && (!aio->a_done)) {
+ aio->a_waiting = 1;
nni_cv_wait(&aio->a_cv);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -201,7 +202,10 @@ nni_aio_finish_impl(
// still holding the reference.
if (!aio->a_expiring) {
aio->a_done = 1;
- nni_cv_wake(&aio->a_cv);
+ if (aio->a_waiting) {
+ aio->a_waiting = 0;
+ nni_cv_wake(&aio->a_cv);
+ }
nni_task_dispatch(&aio->a_task);
}
nni_mtx_unlock(&nni_aio_lk);
@@ -337,7 +341,10 @@ nni_aio_expire_loop(void *arg)
NNI_ASSERT(aio->a_prov_cancel == NULL);
aio->a_expiring = 0;
aio->a_done = 1;
- nni_cv_wake(&aio->a_cv);
+ if (aio->a_waiting) {
+ aio->a_waiting = 0;
+ nni_cv_wake(&aio->a_cv);
+ }
nni_task_dispatch(&aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 0f41c01f..d48442eb 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -34,7 +34,8 @@ struct nni_aio {
unsigned a_pend : 1; // completion routine pending
unsigned a_active : 1; // aio was started
unsigned a_expiring : 1; // expiration callback in progress
- unsigned a_pad : 27; // ensure 32-bit alignment
+ unsigned a_waiting : 1; // a thread is waiting for this to finish
+ unsigned a_pad : 26; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
diff --git a/src/core/platform.h b/src/core/platform.h
index fa93916c..7acf16ef 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -122,6 +123,10 @@ extern void nni_plat_cv_fini(nni_plat_cv *);
// called with the lock held.
extern void nni_plat_cv_wake(nni_plat_cv *);
+// nni_plat_cv_wake1 wakes only a single waiter. Use with caution
+// to avoid losing the wakeup when multiple waiters may be present.
+extern void nni_plat_cv_wake1(nni_plat_cv *);
+
// nni_plat_cv_wait waits for a wake up on the condition variable. The
// associated lock is atomically released and reacquired upon wake up.
// Callers can be spuriously woken. The associated lock must be held.
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e32983e9..8d42701e 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -20,7 +20,8 @@ struct nni_taskq_thr {
struct nni_taskq {
nni_list tq_tasks;
nni_mtx tq_mtx;
- nni_cv tq_cv;
+ nni_cv tq_sched_cv;
+ nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_run;
@@ -45,13 +46,10 @@ nni_taskq_thread(void *self)
task->task_cb(task->task_arg);
nni_mtx_lock(&tq->tq_mtx);
thr->tqt_running = NULL;
- if (thr->tqt_wait) {
- thr->tqt_wait = 0;
- nni_cv_wake(&tq->tq_cv);
- }
- if (tq->tq_waiting) {
+ if (thr->tqt_wait || tq->tq_waiting) {
+ thr->tqt_wait = 0;
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
continue;
@@ -59,12 +57,12 @@ nni_taskq_thread(void *self)
if (tq->tq_waiting) {
tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_wait_cv);
}
if (!tq->tq_run) {
break;
}
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_sched_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
- NNI_FREE_STRUCT(tq);
- return (rv);
- }
- if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
- nni_mtx_fini(&tq->tq_mtx);
+ if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) ==
+ NULL) {
NNI_FREE_STRUCT(tq);
- return (rv);
+ return (NNG_ENOMEM);
}
+ tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr);
- if (tq->tq_threads == NULL) {
- nni_cv_fini(&tq->tq_cv);
- nni_mtx_fini(&tq->tq_mtx);
- NNI_FREE_STRUCT(tq);
- return (NNG_ENOMEM);
+ if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
+ ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
+ nni_taskq_fini(tq);
+ return (rv);
}
- tq->tq_nthreads = nthr;
+
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
tq->tq_threads[i].tqt_running = NULL;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
- goto fail;
+ nni_taskq_fini(tq);
+ return (rv);
}
}
- tq->tq_nthreads = nthr;
- tq->tq_run = 1;
+ tq->tq_run = 1;
for (i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
*tqp = tq;
return (0);
-
-fail:
-
- nni_taskq_fini(tq);
- return (rv);
}
static void
@@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq)
break;
}
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
}
@@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq)
nni_taskq_drain_locked(tq);
tq->tq_run = 0;
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_fini(&tq->tq_threads[i].tqt_thread);
}
- nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr));
- nni_cv_fini(&tq->tq_cv);
+ nni_cv_fini(&tq->tq_wait_cv);
+ nni_cv_fini(&tq->tq_sched_cv);
nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads);
NNI_FREE_STRUCT(tq);
}
@@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task)
if (!nni_list_active(&tq->tq_tasks, task)) {
nni_list_append(&tq->tq_tasks, task);
}
- nni_cv_wake(&tq->tq_cv);
+ nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -221,7 +211,7 @@ nni_task_wait(nni_task *task)
}
tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task)
}
// tq->tq_threads[i].tqt_wait = 1;
tq->tq_waiting++;
- nni_cv_wait(&tq->tq_cv);
+ nni_cv_wait(&tq->tq_wait_cv);
}
if (nni_list_active(&tq->tq_tasks, task)) {
diff --git a/src/core/thread.c b/src/core/thread.c
index 3cfdd83f..6c3bd9f3 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -73,6 +73,12 @@ nni_cv_wake(nni_cv *cv)
nni_plat_cv_wake(cv);
}
+void
+nni_cv_wake1(nni_cv *cv)
+{
+ nni_plat_cv_wake1(cv);
+}
+
static void
nni_thr_wrap(void *arg)
{
diff --git a/src/core/thread.h b/src/core/thread.h
index b528af2c..94b2a984 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -52,6 +52,9 @@ extern void nni_cv_fini(nni_cv *cv);
// nni_cv_wake wakes all waiters on the condition variable.
extern void nni_cv_wake(nni_cv *cv);
+// nni_cv_wake wakes just one waiter on the condition variable.
+extern void nni_cv_wake1(nni_cv *cv);
+
// nni_cv_wait waits until nni_cv_wake is called on the condition variable.
// The wait is indefinite. Premature wakeups are possible, so the caller
// must verify any related condition.
diff --git a/src/core/timer.c b/src/core/timer.c
index e8aa563c..73bc7604 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -18,7 +18,8 @@ static void nni_timer_loop(void *);
// XXX: replace this timer list with a minHeap based priority queue.
struct nni_timer {
nni_mtx t_mx;
- nni_cv t_cv;
+ nni_cv t_wait_cv;
+ nni_cv t_sched_cv;
nni_list t_entries;
nni_thr t_thr;
int t_run;
@@ -40,7 +41,8 @@ nni_timer_sys_init(void)
NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node);
if (((rv = nni_mtx_init(&timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_cv, &timer->t_mx)) != 0) ||
+ ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) ||
+ ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) ||
((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) {
nni_timer_sys_fini();
return (rv);
@@ -58,12 +60,13 @@ nni_timer_sys_fini(void)
if (timer->t_run) {
nni_mtx_lock(&timer->t_mx);
timer->t_run = 0;
- nni_cv_wake(&timer->t_cv);
+ nni_cv_wake(&timer->t_sched_cv);
nni_mtx_unlock(&timer->t_mx);
}
nni_thr_fini(&timer->t_thr);
- nni_cv_fini(&timer->t_cv);
+ nni_cv_fini(&timer->t_wait_cv);
+ nni_cv_fini(&timer->t_sched_cv);
nni_mtx_fini(&timer->t_mx);
}
@@ -88,7 +91,7 @@ nni_timer_cancel(nni_timer_node *node)
nni_mtx_lock(&timer->t_mx);
while (timer->t_active == node) {
timer->t_waiting = 1;
- nni_cv_wait(&timer->t_cv);
+ nni_cv_wait(&timer->t_wait_cv);
}
if (nni_list_active(&timer->t_entries, node)) {
nni_list_remove(&timer->t_entries, node);
@@ -121,7 +124,7 @@ nni_timer_schedule(nni_timer_node *node, nni_time when)
nni_list_append(&timer->t_entries, node);
}
if (wake) {
- nni_cv_wake(&timer->t_cv);
+ nni_cv_wake1(&timer->t_sched_cv);
}
nni_mtx_unlock(&timer->t_mx);
}
@@ -137,8 +140,8 @@ nni_timer_loop(void *arg)
nni_mtx_lock(&timer->t_mx);
timer->t_active = NULL;
if (timer->t_waiting) {
- timer->t_waiting = 1;
- nni_cv_wake(&timer->t_cv);
+ timer->t_waiting = 0;
+ nni_cv_wake(&timer->t_wait_cv);
}
if (!timer->t_run) {
nni_mtx_unlock(&timer->t_mx);
@@ -147,13 +150,13 @@ nni_timer_loop(void *arg)
now = nni_clock();
if ((node = nni_list_first(&timer->t_entries)) == NULL) {
- nni_cv_wait(&timer->t_cv);
+ nni_cv_wait(&timer->t_sched_cv);
nni_mtx_unlock(&timer->t_mx);
continue;
}
if (now < node->t_expire) {
// End of run, we have to wait for next.
- nni_cv_until(&timer->t_cv, node->t_expire);
+ nni_cv_until(&timer->t_sched_cv, node->t_expire);
nni_mtx_unlock(&timer->t_mx);
continue;
}
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 071e6007..0ef4754c 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -116,11 +117,13 @@ nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
- int rv;
+ (void) pthread_cond_broadcast(&cv->cv);
+}
- if ((rv = pthread_cond_broadcast(&cv->cv)) != 0) {
- nni_panic("pthread_cond_broadcast: %s", strerror(rv));
- }
+void
+nni_plat_cv_wake1(nni_plat_cv *cv)
+{
+ (void) pthread_cond_signal(&cv->cv);
}
void
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 0859b4a4..f78bfd9a 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -70,6 +71,12 @@ nni_plat_cv_wake(nni_plat_cv *cv)
}
void
+nni_plat_cv_wake1(nni_plat_cv *cv)
+{
+ WakeConditionVariable(&cv->cv);
+}
+
+void
nni_plat_cv_wait(nni_plat_cv *cv)
{
(void) SleepConditionVariableSRW(&cv->cv, cv->srl, INFINITE, 0);