diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-06-08 17:08:03 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-06-09 09:21:48 -0700 |
| commit | 69190eb962f6a74be3d693f584320e42a79462d1 (patch) | |
| tree | 12299f6c6eb331def5062b9a234330818aa4bb09 /src/core/taskq.c | |
| parent | 55e0fa520794aec071691814b5b9ced6db0ea3e4 (diff) | |
| download | nng-69190eb962f6a74be3d693f584320e42a79462d1.tar.gz nng-69190eb962f6a74be3d693f584320e42a79462d1.tar.bz2 nng-69190eb962f6a74be3d693f584320e42a79462d1.zip | |
fixes #511 Want to be able to have deferred destroy of tasks and aios
Essentially, if we're destroying an aio, and we are doing so from the
thread that is running the callback, then we should defer the destruction
of the task until it returns.
Note that calling nni_aio_wait() or anything else that calls it from
the callback is still verboten and will result in a single party
deadlock.
Diffstat (limited to 'src/core/taskq.c')
| -rw-r--r-- | src/core/taskq.c | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c index f0712e59..ae66ec67 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -16,8 +16,10 @@ struct nni_task { void * task_arg; nni_cb task_cb; nni_taskq * task_tq; + nni_thr * task_thr; // non-NULL if the task is running unsigned task_busy; bool task_prep; + bool task_reap; // reap task on completion nni_mtx task_mtx; nni_cv task_cv; }; @@ -47,19 +49,28 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + bool reap; + nni_list_remove(&tq->tq_tasks, task); + task->task_thr = &thr->tqt_thread; nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); task->task_busy--; + task->task_thr = NULL; + reap = task->task_reap; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); + if (reap) { + nni_task_fini(task); + } nni_mtx_lock(&tq->tq_mtx); + continue; } @@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq) void nni_task_exec(nni_task *task) { + bool reap; nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; @@ -151,7 +163,12 @@ nni_task_exec(nni_task *task) if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } + reap = task->task_reap; nni_mtx_unlock(&task->task_mtx); + + if (reap) { + nni_task_fini(task); + } } void @@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); task->task_prep = false; + task->task_reap = false; task->task_busy = 0; task->task_cb = cb; task->task_arg = arg; @@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) void nni_task_fini(nni_task *task) { - NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); + + // If we are being called from the task function, then + // defer the reap until after the callback has finished. + if (task->task_busy && (task->task_thr != NULL) && + nni_thr_is_self(task->task_thr)) { + task->task_reap = true; + nni_mtx_unlock(&task->task_mtx); + return; + } while (task->task_busy) { nni_cv_wait(&task->task_cv); } |
