diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 20 | ||||
| -rw-r--r-- | src/core/taskq.c | 28 |
2 files changed, 46 insertions, 2 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 99da356a..87755b1b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -130,7 +130,21 @@ void nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_stop(aio); + + nni_aio_cancelfn cancelfn; + + // This is like aio_close, but we don't want to dispatch + // the task. And unlike aio_stop, we don't want to wait + // for the task. (Because we implicitly do task_fini.) + nni_mtx_lock(&nni_aio_lk); + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + aio->a_stop = true; + nni_mtx_unlock(&nni_aio_lk); + + if (cancelfn != NULL) { + cancelfn(aio, NNG_ECLOSED); + } // Wait for the aio to be "done"; this ensures that we don't // destroy an aio from a "normal" completion callback while @@ -138,6 +152,10 @@ nni_aio_fini(nni_aio *aio) nni_mtx_lock(&nni_aio_lk); while (nni_aio_expire_aio == aio) { + if (nni_thr_is_self(&nni_aio_expire_thr)) { + nni_aio_expire_aio = NULL; + break; + } nni_cv_wait(&nni_aio_expire_cv); } nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/taskq.c b/src/core/taskq.c index f0712e59..ae66ec67 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -16,8 +16,10 @@ struct nni_task { void * task_arg; nni_cb task_cb; nni_taskq * task_tq; + nni_thr * task_thr; // non-NULL if the task is running unsigned task_busy; bool task_prep; + bool task_reap; // reap task on completion nni_mtx task_mtx; nni_cv task_cv; }; @@ -47,19 +49,28 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + bool reap; + nni_list_remove(&tq->tq_tasks, task); + task->task_thr = &thr->tqt_thread; nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); task->task_busy--; + task->task_thr = NULL; + reap = task->task_reap; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); + if (reap) { + nni_task_fini(task); + } nni_mtx_lock(&tq->tq_mtx); + continue; } @@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq) void nni_task_exec(nni_task *task) { + bool reap; nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; @@ -151,7 +163,12 @@ nni_task_exec(nni_task *task) if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } + reap = task->task_reap; nni_mtx_unlock(&task->task_mtx); + + if (reap) { + nni_task_fini(task); + } } void @@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); task->task_prep = false; + task->task_reap = false; task->task_busy = 0; task->task_cb = cb; task->task_arg = arg; @@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) void nni_task_fini(nni_task *task) { - NNI_ASSERT(!nni_list_node_active(&task->task_node)); nni_mtx_lock(&task->task_mtx); + + // If we are being called from the task function, then + // defer the reap until after the callback has finished. + if (task->task_busy && (task->task_thr != NULL) && + nni_thr_is_self(task->task_thr)) { + task->task_reap = true; + nni_mtx_unlock(&task->task_mtx); + return; + } while (task->task_busy) { nni_cv_wait(&task->task_cv); } |
