aboutsummaryrefslogtreecommitdiff
path: root/src/core/taskq.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-08 17:08:03 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-09 09:21:48 -0700
commit69190eb962f6a74be3d693f584320e42a79462d1 (patch)
tree12299f6c6eb331def5062b9a234330818aa4bb09 /src/core/taskq.c
parent55e0fa520794aec071691814b5b9ced6db0ea3e4 (diff)
downloadnng-69190eb962f6a74be3d693f584320e42a79462d1.tar.gz
nng-69190eb962f6a74be3d693f584320e42a79462d1.tar.bz2
nng-69190eb962f6a74be3d693f584320e42a79462d1.zip
fixes #511 Want to be able to have deferred destroy of tasks and aios
Essentially, if we're destroying an aio, and we are doing so from the thread that is running the callback, then we should defer the destruction of the task until it returns. Note that calling nni_aio_wait() or anything else that calls it from the callback is still verboten and will result in a single party deadlock.
Diffstat (limited to 'src/core/taskq.c')
-rw-r--r--src/core/taskq.c28
1 files changed, 27 insertions, 1 deletions
diff --git a/src/core/taskq.c b/src/core/taskq.c
index f0712e59..ae66ec67 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -16,8 +16,10 @@ struct nni_task {
void * task_arg;
nni_cb task_cb;
nni_taskq * task_tq;
+ nni_thr * task_thr; // non-NULL if the task is running
unsigned task_busy;
bool task_prep;
+ bool task_reap; // reap task on completion
nni_mtx task_mtx;
nni_cv task_cv;
};
@@ -47,19 +49,28 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ bool reap;
+
nni_list_remove(&tq->tq_tasks, task);
+ task->task_thr = &thr->tqt_thread;
nni_mtx_unlock(&tq->tq_mtx);
task->task_cb(task->task_arg);
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
+ task->task_thr = NULL;
+ reap = task->task_reap;
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
+ if (reap) {
+ nni_task_fini(task);
+ }
nni_mtx_lock(&tq->tq_mtx);
+
continue;
}
@@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq)
void
nni_task_exec(nni_task *task)
{
+ bool reap;
nni_mtx_lock(&task->task_mtx);
if (task->task_prep) {
task->task_prep = false;
@@ -151,7 +163,12 @@ nni_task_exec(nni_task *task)
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
+ reap = task->task_reap;
nni_mtx_unlock(&task->task_mtx);
+
+ if (reap) {
+ nni_task_fini(task);
+ }
}
void
@@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
nni_mtx_init(&task->task_mtx);
nni_cv_init(&task->task_cv, &task->task_mtx);
task->task_prep = false;
+ task->task_reap = false;
task->task_busy = 0;
task->task_cb = cb;
task->task_arg = arg;
@@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
void
nni_task_fini(nni_task *task)
{
- NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
+
+ // If we are being called from the task function, then
+ // defer the reap until after the callback has finished.
+ if (task->task_busy && (task->task_thr != NULL) &&
+ nni_thr_is_self(task->task_thr)) {
+ task->task_reap = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}