aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c20
-rw-r--r--src/core/taskq.c28
2 files changed, 46 insertions, 2 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 99da356a..87755b1b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -130,7 +130,21 @@ void
nni_aio_fini(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_stop(aio);
+
+ nni_aio_cancelfn cancelfn;
+
+ // This is like aio_close, but we don't want to dispatch
+ // the task. And unlike aio_stop, we don't want to wait
+ // for the task. (Because we implicitly do task_fini.)
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+ aio->a_stop = true;
+ nni_mtx_unlock(&nni_aio_lk);
+
+ if (cancelfn != NULL) {
+ cancelfn(aio, NNG_ECLOSED);
+ }
// Wait for the aio to be "done"; this ensures that we don't
// destroy an aio from a "normal" completion callback while
@@ -138,6 +152,10 @@ nni_aio_fini(nni_aio *aio)
nni_mtx_lock(&nni_aio_lk);
while (nni_aio_expire_aio == aio) {
+ if (nni_thr_is_self(&nni_aio_expire_thr)) {
+ nni_aio_expire_aio = NULL;
+ break;
+ }
nni_cv_wait(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index f0712e59..ae66ec67 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -16,8 +16,10 @@ struct nni_task {
void * task_arg;
nni_cb task_cb;
nni_taskq * task_tq;
+ nni_thr * task_thr; // non-NULL if the task is running
unsigned task_busy;
bool task_prep;
+ bool task_reap; // reap task on completion
nni_mtx task_mtx;
nni_cv task_cv;
};
@@ -47,19 +49,28 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ bool reap;
+
nni_list_remove(&tq->tq_tasks, task);
+ task->task_thr = &thr->tqt_thread;
nni_mtx_unlock(&tq->tq_mtx);
task->task_cb(task->task_arg);
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
+ task->task_thr = NULL;
+ reap = task->task_reap;
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
+ if (reap) {
+ nni_task_fini(task);
+ }
nni_mtx_lock(&tq->tq_mtx);
+
continue;
}
@@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq)
void
nni_task_exec(nni_task *task)
{
+ bool reap;
nni_mtx_lock(&task->task_mtx);
if (task->task_prep) {
task->task_prep = false;
@@ -151,7 +163,12 @@ nni_task_exec(nni_task *task)
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
+ reap = task->task_reap;
nni_mtx_unlock(&task->task_mtx);
+
+ if (reap) {
+ nni_task_fini(task);
+ }
}
void
@@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
nni_mtx_init(&task->task_mtx);
nni_cv_init(&task->task_cv, &task->task_mtx);
task->task_prep = false;
+ task->task_reap = false;
task->task_busy = 0;
task->task_cb = cb;
task->task_arg = arg;
@@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
void
nni_task_fini(nni_task *task)
{
- NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
+
+ // If we are being called from the task function, then
+ // defer the reap until after the callback has finished.
+ if (task->task_busy && (task->task_thr != NULL) &&
+ nni_thr_is_self(task->task_thr)) {
+ task->task_reap = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}