From 69190eb962f6a74be3d693f584320e42a79462d1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 8 Jun 2018 17:08:03 -0700 Subject: fixes #511 Want to be able to have deferred destroy of tasks and aios Essentially, if we're destroying an aio, and we are doing so from the thread that is running the callback, then we should defer the destruction of the task until it returns. Note that calling nni_aio_wait() or anything else that calls it from the callback is still verboten and will result in a single party deadlock. --- src/core/aio.c | 20 +++++++++++++++++++- src/core/taskq.c | 28 +++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/aio.c b/src/core/aio.c index 99da356a..87755b1b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -130,7 +130,21 @@ void nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_stop(aio); + + nni_aio_cancelfn cancelfn; + + // This is like aio_close, but we don't want to dispatch + // the task. And unlike aio_stop, we don't want to wait + // for the task. (Because we implicitly do task_fini.) + nni_mtx_lock(&nni_aio_lk); + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + aio->a_stop = true; + nni_mtx_unlock(&nni_aio_lk); + + if (cancelfn != NULL) { + cancelfn(aio, NNG_ECLOSED); + } // Wait for the aio to be "done"; this ensures that we don't // destroy an aio from a "normal" completion callback while @@ -138,6 +152,10 @@ nni_aio_fini(nni_aio *aio) nni_mtx_lock(&nni_aio_lk); while (nni_aio_expire_aio == aio) { + if (nni_thr_is_self(&nni_aio_expire_thr)) { + nni_aio_expire_aio = NULL; + break; + } nni_cv_wait(&nni_aio_expire_cv); } nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/taskq.c b/src/core/taskq.c index f0712e59..ae66ec67 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -16,8 +16,10 @@ struct nni_task { void * task_arg; nni_cb task_cb; nni_taskq * task_tq; + nni_thr * task_thr; // non-NULL if the task is running unsigned task_busy; bool task_prep; + bool task_reap; // reap task on completion nni_mtx task_mtx; nni_cv task_cv; }; @@ -47,19 +49,28 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + bool reap; + nni_list_remove(&tq->tq_tasks, task); + task->task_thr = &thr->tqt_thread; nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); task->task_busy--; + task->task_thr = NULL; + reap = task->task_reap; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); + if (reap) { + nni_task_fini(task); + } nni_mtx_lock(&tq->tq_mtx); + continue; } @@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq) void nni_task_exec(nni_task *task) { + bool reap; nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; @@ -151,7 +163,12 @@ nni_task_exec(nni_task *task) if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } + reap = task->task_reap; nni_mtx_unlock(&task->task_mtx); + + if (reap) { + nni_task_fini(task); + } } void @@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); task->task_prep = false; + task->task_reap = false; task->task_busy = 0; task->task_cb = cb; task->task_arg = arg; @@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) void nni_task_fini(nni_task *task) { - NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); + + // If we are being called from the task function, then + // defer the reap until after the callback has finished. + if (task->task_busy && (task->task_thr != NULL) && + nni_thr_is_self(task->task_thr)) { + task->task_reap = true; + nni_mtx_unlock(&task->task_mtx); + return; + } while (task->task_busy) { nni_cv_wait(&task->task_cv); } -- cgit v1.2.3-70-g09d2