diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 11 | ||||
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/platform.h | 5 | ||||
| -rw-r--r-- | src/core/taskq.c | 66 | ||||
| -rw-r--r-- | src/core/thread.c | 6 | ||||
| -rw-r--r-- | src/core/thread.h | 3 | ||||
| -rw-r--r-- | src/core/timer.c | 23 |
7 files changed, 66 insertions, 51 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 3c85b78d..a39c0118 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -124,6 +124,7 @@ nni_aio_wait(nni_aio *aio) { nni_mtx_lock(&nni_aio_lk); while ((aio->a_active) && (!aio->a_done)) { + aio->a_waiting = 1; nni_cv_wait(&aio->a_cv); } nni_mtx_unlock(&nni_aio_lk); @@ -201,7 +202,10 @@ nni_aio_finish_impl( // still holding the reference. if (!aio->a_expiring) { aio->a_done = 1; - nni_cv_wake(&aio->a_cv); + if (aio->a_waiting) { + aio->a_waiting = 0; + nni_cv_wake(&aio->a_cv); + } nni_task_dispatch(&aio->a_task); } nni_mtx_unlock(&nni_aio_lk); @@ -337,7 +341,10 @@ nni_aio_expire_loop(void *arg) NNI_ASSERT(aio->a_prov_cancel == NULL); aio->a_expiring = 0; aio->a_done = 1; - nni_cv_wake(&aio->a_cv); + if (aio->a_waiting) { + aio->a_waiting = 0; + nni_cv_wake(&aio->a_cv); + } nni_task_dispatch(&aio->a_task); nni_mtx_unlock(&nni_aio_lk); } diff --git a/src/core/aio.h b/src/core/aio.h index 0f41c01f..d48442eb 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -34,7 +34,8 @@ struct nni_aio { unsigned a_pend : 1; // completion routine pending unsigned a_active : 1; // aio was started unsigned a_expiring : 1; // expiration callback in progress - unsigned a_pad : 27; // ensure 32-bit alignment + unsigned a_waiting : 1; // a thread is waiting for this to finish + unsigned a_pad : 26; // ensure 32-bit alignment nni_task a_task; // Read/write operations. diff --git a/src/core/platform.h b/src/core/platform.h index fa93916c..7acf16ef 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -122,6 +123,10 @@ extern void nni_plat_cv_fini(nni_plat_cv *); // called with the lock held. extern void nni_plat_cv_wake(nni_plat_cv *); +// nni_plat_cv_wake1 wakes only a single waiter. Use with caution +// to avoid losing the wakeup when multiple waiters may be present. +extern void nni_plat_cv_wake1(nni_plat_cv *); + // nni_plat_cv_wait waits for a wake up on the condition variable. The // associated lock is atomically released and reacquired upon wake up. // Callers can be spuriously woken. The associated lock must be held. diff --git a/src/core/taskq.c b/src/core/taskq.c index e32983e9..8d42701e 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -20,7 +20,8 @@ struct nni_taskq_thr { struct nni_taskq { nni_list tq_tasks; nni_mtx tq_mtx; - nni_cv tq_cv; + nni_cv tq_sched_cv; + nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; int tq_run; @@ -45,13 +46,10 @@ nni_taskq_thread(void *self) task->task_cb(task->task_arg); nni_mtx_lock(&tq->tq_mtx); thr->tqt_running = NULL; - if (thr->tqt_wait) { - thr->tqt_wait = 0; - nni_cv_wake(&tq->tq_cv); - } - if (tq->tq_waiting) { + if (thr->tqt_wait || tq->tq_waiting) { + thr->tqt_wait = 0; tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_wait_cv); } continue; @@ -59,12 +57,12 @@ nni_taskq_thread(void *self) if (tq->tq_waiting) { tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_wait_cv); } if (!tq->tq_run) { break; } - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_sched_cv); } nni_mtx_unlock(&tq->tq_mtx); } @@ -79,46 +77,37 @@ nni_taskq_init(nni_taskq **tqp, int nthr) if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) { - NNI_FREE_STRUCT(tq); - return (rv); - } - if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) { - nni_mtx_fini(&tq->tq_mtx); + if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) == + NULL) { NNI_FREE_STRUCT(tq); - return (rv); + return (NNG_ENOMEM); } + tq->tq_nthreads = nthr; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); - tq->tq_threads = nni_alloc(sizeof(nni_taskq_thr) * nthr); - if (tq->tq_threads == NULL) { - nni_cv_fini(&tq->tq_cv); - nni_mtx_fini(&tq->tq_mtx); - NNI_FREE_STRUCT(tq); - return (NNG_ENOMEM); + if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) || + ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) || + ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) { + nni_taskq_fini(tq); + return (rv); } - tq->tq_nthreads = nthr; + for (i = 0; i < nthr; i++) { tq->tq_threads[i].tqt_tq = tq; tq->tq_threads[i].tqt_running = NULL; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { - goto fail; + nni_taskq_fini(tq); + return (rv); } } - tq->tq_nthreads = nthr; - tq->tq_run = 1; + tq->tq_run = 1; for (i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } *tqp = tq; return (0); - -fail: - - nni_taskq_fini(tq); - return (rv); } static void @@ -142,7 +131,7 @@ nni_taskq_drain_locked(nni_taskq *tq) break; } tq->tq_waiting++; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } } @@ -166,15 +155,16 @@ nni_taskq_fini(nni_taskq *tq) nni_taskq_drain_locked(tq); tq->tq_run = 0; - nni_cv_wake(&tq->tq_cv); + nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_fini(&tq->tq_threads[i].tqt_thread); } - nni_free(tq->tq_threads, tq->tq_nthreads * sizeof(nni_taskq_thr)); - nni_cv_fini(&tq->tq_cv); + nni_cv_fini(&tq->tq_wait_cv); + nni_cv_fini(&tq->tq_sched_cv); nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads); NNI_FREE_STRUCT(tq); } @@ -193,7 +183,7 @@ nni_task_dispatch(nni_task *task) if (!nni_list_active(&tq->tq_tasks, task)) { nni_list_append(&tq->tq_tasks, task); } - nni_cv_wake(&tq->tq_cv); + nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } @@ -221,7 +211,7 @@ nni_task_wait(nni_task *task) } tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } nni_mtx_unlock(&tq->tq_mtx); } @@ -248,7 +238,7 @@ nni_task_cancel(nni_task *task) } // tq->tq_threads[i].tqt_wait = 1; tq->tq_waiting++; - nni_cv_wait(&tq->tq_cv); + nni_cv_wait(&tq->tq_wait_cv); } if (nni_list_active(&tq->tq_tasks, task)) { diff --git a/src/core/thread.c b/src/core/thread.c index 3cfdd83f..6c3bd9f3 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -73,6 +73,12 @@ nni_cv_wake(nni_cv *cv) nni_plat_cv_wake(cv); } +void +nni_cv_wake1(nni_cv *cv) +{ + nni_plat_cv_wake1(cv); +} + static void nni_thr_wrap(void *arg) { diff --git a/src/core/thread.h b/src/core/thread.h index b528af2c..94b2a984 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -52,6 +52,9 @@ extern void nni_cv_fini(nni_cv *cv); // nni_cv_wake wakes all waiters on the condition variable. extern void nni_cv_wake(nni_cv *cv); +// nni_cv_wake wakes just one waiter on the condition variable. +extern void nni_cv_wake1(nni_cv *cv); + // nni_cv_wait waits until nni_cv_wake is called on the condition variable. // The wait is indefinite. Premature wakeups are possible, so the caller // must verify any related condition. diff --git a/src/core/timer.c b/src/core/timer.c index e8aa563c..73bc7604 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -18,7 +18,8 @@ static void nni_timer_loop(void *); // XXX: replace this timer list with a minHeap based priority queue. struct nni_timer { nni_mtx t_mx; - nni_cv t_cv; + nni_cv t_wait_cv; + nni_cv t_sched_cv; nni_list t_entries; nni_thr t_thr; int t_run; @@ -40,7 +41,8 @@ nni_timer_sys_init(void) NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node); if (((rv = nni_mtx_init(&timer->t_mx)) != 0) || - ((rv = nni_cv_init(&timer->t_cv, &timer->t_mx)) != 0) || + ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) || + ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) || ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) { nni_timer_sys_fini(); return (rv); @@ -58,12 +60,13 @@ nni_timer_sys_fini(void) if (timer->t_run) { nni_mtx_lock(&timer->t_mx); timer->t_run = 0; - nni_cv_wake(&timer->t_cv); + nni_cv_wake(&timer->t_sched_cv); nni_mtx_unlock(&timer->t_mx); } nni_thr_fini(&timer->t_thr); - nni_cv_fini(&timer->t_cv); + nni_cv_fini(&timer->t_wait_cv); + nni_cv_fini(&timer->t_sched_cv); nni_mtx_fini(&timer->t_mx); } @@ -88,7 +91,7 @@ nni_timer_cancel(nni_timer_node *node) nni_mtx_lock(&timer->t_mx); while (timer->t_active == node) { timer->t_waiting = 1; - nni_cv_wait(&timer->t_cv); + nni_cv_wait(&timer->t_wait_cv); } if (nni_list_active(&timer->t_entries, node)) { nni_list_remove(&timer->t_entries, node); @@ -121,7 +124,7 @@ nni_timer_schedule(nni_timer_node *node, nni_time when) nni_list_append(&timer->t_entries, node); } if (wake) { - nni_cv_wake(&timer->t_cv); + nni_cv_wake1(&timer->t_sched_cv); } nni_mtx_unlock(&timer->t_mx); } @@ -137,8 +140,8 @@ nni_timer_loop(void *arg) nni_mtx_lock(&timer->t_mx); timer->t_active = NULL; if (timer->t_waiting) { - timer->t_waiting = 1; - nni_cv_wake(&timer->t_cv); + timer->t_waiting = 0; + nni_cv_wake(&timer->t_wait_cv); } if (!timer->t_run) { nni_mtx_unlock(&timer->t_mx); @@ -147,13 +150,13 @@ nni_timer_loop(void *arg) now = nni_clock(); if ((node = nni_list_first(&timer->t_entries)) == NULL) { - nni_cv_wait(&timer->t_cv); + nni_cv_wait(&timer->t_sched_cv); nni_mtx_unlock(&timer->t_mx); continue; } if (now < node->t_expire) { // End of run, we have to wait for next. - nni_cv_until(&timer->t_cv, node->t_expire); + nni_cv_until(&timer->t_sched_cv, node->t_expire); nni_mtx_unlock(&timer->t_mx); continue; } |
