diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-09 17:21:27 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-14 17:09:20 -0700 |
| commit | 16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch) | |
| tree | 9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/core/taskq.c | |
| parent | e0beb13b066d27ce32347a1c18c9d441828dc553 (diff) | |
| download | nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2 nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip | |
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch()
fixes #410 kqueue implementation could be smarter
fixes #411 epoll_implementation could be smarter
fixes #426 synchronous completion can lead to panic
fixes #421 pipe close race condition/duplicate destroy
This is a major refactoring of two significant parts of the code base,
which are closely interrelated.
First the aio and taskq framework have undergone a number of simplifications,
and improvements. We have ditched a few parts of the internal API (for
example tasks no longer support cancellation) that weren't terribly useful
but added a lot of complexity, and we've made aio_schedule something that
now checks for cancellation or other "premature" completions. The
aio framework now uses the tasks more tightly, so that aio wait can
devolve into just nni_task_wait(). We did have to add a "task_prep()"
step to prevent race conditions.
Second, the entire POSIX poller framework has been simplified, and made
more robust, and more scalable. There were some fairly inherent race
conditions around the shutdown/close code, where we *thought* we were
synchronizing against the other thread, but weren't doing so adequately.
With a cleaner design, we've been able to tighten up the implementation
to remove these race conditions, while substantially reducing the chance
for lock contention, thereby improving scalability. The illumos poller
also got a performance boost by polling for multiple events.
In highly "busy" systems, we expect to see vast reductions in lock
contention, and therefore greater scalability, in addition to overall
improved reliability.
One area where we currently can do better is that there is still only
a single poller thread run. Scaling this out is a task that has to be done
differently for each poller, and carefuly to ensure that close conditions
are safe on all pollers, and that no chance for deadlock/livelock waiting
for pfd finalizers can occur.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 263 |
1 files changed, 149 insertions, 114 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index b0fe160b..526fa0b4 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -11,11 +11,22 @@ #include "core/nng_impl.h" typedef struct nni_taskq_thr nni_taskq_thr; +struct nni_task { + nni_list_node task_node; + void * task_arg; + nni_cb task_cb; + nni_taskq * task_tq; + bool task_sched; + bool task_run; + bool task_done; + bool task_exec; + bool task_fini; + nni_mtx task_mtx; + nni_cv task_cv; +}; struct nni_taskq_thr { nni_taskq *tqt_tq; nni_thr tqt_thread; - nni_task * tqt_running; - int tqt_wait; }; struct nni_taskq { nni_list tq_tasks; @@ -24,8 +35,7 @@ struct nni_taskq { nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; - int tq_run; - int tq_waiting; + bool tq_run; }; static nni_taskq *nni_taskq_systq = NULL; @@ -40,25 +50,37 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_run = true; + task->task_sched = false; + nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); - thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); + task->task_cb(task->task_arg); - nni_mtx_lock(&tq->tq_mtx); - thr->tqt_running = NULL; - if (thr->tqt_wait || tq->tq_waiting) { - thr->tqt_wait = 0; - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } + nni_mtx_lock(&task->task_mtx); + if (task->task_sched || task->task_exec) { + // task resubmitted itself most likely. + // We cannot touch the rest of the flags, + // since the called function has taken control. + nni_mtx_unlock(&task->task_mtx); + } else { + task->task_done = true; + nni_cv_wake(&task->task_cv); + + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); + } + } + nni_mtx_lock(&tq->tq_mtx); continue; } - if (tq->tq_waiting) { - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } if (!tq->tq_run) { break; } @@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) for (int i = 0; i < nthr; i++) { int rv; - tq->tq_threads[i].tqt_tq = tq; - tq->tq_threads[i].tqt_running = NULL; + tq->tq_threads[i].tqt_tq = tq; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { @@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } } - tq->tq_run = 1; + tq->tq_run = true; for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } @@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (0); } -static void -nni_taskq_drain_locked(nni_taskq *tq) -{ - // We need to first let the taskq completely drain. - for (;;) { - int busy = 0; - if (!nni_list_empty(&tq->tq_tasks)) { - busy = 1; - } else { - int i; - for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running != 0) { - busy = 1; - break; - } - } - } - if (!busy) { - break; - } - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); - } -} - -void -nni_taskq_drain(nni_taskq *tq) -{ - nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - nni_mtx_unlock(&tq->tq_mtx); -} - void nni_taskq_fini(nni_taskq *tq) { - // First drain the taskq completely. This is necessary since some - // tasks that are presently running may need to schedule additional - // tasks, and we don't want those to block. if (tq == NULL) { return; } if (tq->tq_run) { nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - - tq->tq_run = 0; + tq->tq_run = false; nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } @@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task) // If there is no callback to perform, then do nothing! // The user will be none the wiser. if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } nni_mtx_lock(&tq->tq_mtx); - // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_tasks, task)) { - nni_list_append(&tq->tq_tasks, task); - } + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_run = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); + + nni_list_append(&tq->tq_tasks, task); nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_wait(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; - if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } - nni_mtx_lock(&tq->tq_mtx); - for (;;) { - bool running = false; - if (nni_list_active(&tq->tq_tasks, task)) { - running = true; - } else { - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } - } - if (!running) { - break; - } + nni_mtx_lock(&task->task_mtx); + if (task->task_exec) { + // recursive taskq_exec, run it asynchronously + nni_mtx_unlock(&task->task_mtx); + nni_task_dispatch(task); + return; + } + task->task_exec = true; + task->task_sched = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); - tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_wait_cv); + task->task_cb(task->task_arg); + + nni_mtx_lock(&task->task_mtx); + task->task_exec = false; + if (task->task_sched || task->task_run) { + // cb scheduled a task + nni_mtx_unlock(&task->task_mtx); + return; + } + task->task_done = true; + nni_cv_wake(&task->task_cv); + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); } - nni_mtx_unlock(&tq->tq_mtx); } -int -nni_task_cancel(nni_task *task) +void +nni_task_prep(nni_task *task) { - nni_taskq *tq = task->task_tq; - bool running; + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_done = false; + task->task_run = false; + nni_mtx_unlock(&task->task_mtx); +} - nni_mtx_lock(&tq->tq_mtx); - running = true; - for (;;) { - running = false; - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } +void +nni_task_unprep(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); +} - if (!running) { - break; - } - // tq->tq_threads[i].tqt_wait = 1; - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); +void +nni_task_wait(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + while ((task->task_sched || task->task_run || task->task_exec) && + (!task->task_done)) { + nni_cv_wait(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); +} - if (nni_list_active(&tq->tq_tasks, task)) { - nni_list_remove(&tq->tq_tasks, task); +int +nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) +{ + nni_task *task; + + if ((task = NNI_ALLOC_STRUCT(task)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_unlock(&tq->tq_mtx); + NNI_LIST_NODE_INIT(&task->task_node); + nni_mtx_init(&task->task_mtx); + nni_cv_init(&task->task_cv, &task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + task->task_sched = false; + task->task_exec = false; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } void -nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) +nni_task_fini(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; + NNI_ASSERT(!nni_list_node_active(&task->task_node)); + nni_mtx_lock(&task->task_mtx); + if (task->task_run || task->task_exec) { + // destroy later. + task->task_fini = true; + nni_mtx_unlock(&task->task_mtx); + return; } - NNI_LIST_NODE_INIT(&task->task_node); - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq; + nni_mtx_unlock(&task->task_mtx); + nni_cv_fini(&task->task_cv); + nni_mtx_fini(&task->task_mtx); + NNI_FREE_STRUCT(task); } int |
