aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-08 17:08:03 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-09 09:21:48 -0700
commit69190eb962f6a74be3d693f584320e42a79462d1 (patch)
tree12299f6c6eb331def5062b9a234330818aa4bb09
parent55e0fa520794aec071691814b5b9ced6db0ea3e4 (diff)
downloadnng-69190eb962f6a74be3d693f584320e42a79462d1.tar.gz
nng-69190eb962f6a74be3d693f584320e42a79462d1.tar.bz2
nng-69190eb962f6a74be3d693f584320e42a79462d1.zip
fixes #511 Want to be able to have deferred destroy of tasks and aios
Essentially, if we're destroying an aio, and we are doing so from the thread that is running the callback, then we should defer the destruction of the task until it returns. Note that calling nni_aio_wait() or anything else that calls it from the callback is still verboten and will result in a single party deadlock.
-rw-r--r--src/core/aio.c20
-rw-r--r--src/core/taskq.c28
2 files changed, 46 insertions, 2 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 99da356a..87755b1b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -130,7 +130,21 @@ void
nni_aio_fini(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_stop(aio);
+
+ nni_aio_cancelfn cancelfn;
+
+ // This is like aio_close, but we don't want to dispatch
+ // the task. And unlike aio_stop, we don't want to wait
+ // for the task. (Because we implicitly do task_fini.)
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+ aio->a_stop = true;
+ nni_mtx_unlock(&nni_aio_lk);
+
+ if (cancelfn != NULL) {
+ cancelfn(aio, NNG_ECLOSED);
+ }
// Wait for the aio to be "done"; this ensures that we don't
// destroy an aio from a "normal" completion callback while
@@ -138,6 +152,10 @@ nni_aio_fini(nni_aio *aio)
nni_mtx_lock(&nni_aio_lk);
while (nni_aio_expire_aio == aio) {
+ if (nni_thr_is_self(&nni_aio_expire_thr)) {
+ nni_aio_expire_aio = NULL;
+ break;
+ }
nni_cv_wait(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index f0712e59..ae66ec67 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -16,8 +16,10 @@ struct nni_task {
void * task_arg;
nni_cb task_cb;
nni_taskq * task_tq;
+ nni_thr * task_thr; // non-NULL if the task is running
unsigned task_busy;
bool task_prep;
+ bool task_reap; // reap task on completion
nni_mtx task_mtx;
nni_cv task_cv;
};
@@ -47,19 +49,28 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ bool reap;
+
nni_list_remove(&tq->tq_tasks, task);
+ task->task_thr = &thr->tqt_thread;
nni_mtx_unlock(&tq->tq_mtx);
task->task_cb(task->task_arg);
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
+ task->task_thr = NULL;
+ reap = task->task_reap;
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
+ if (reap) {
+ nni_task_fini(task);
+ }
nni_mtx_lock(&tq->tq_mtx);
+
continue;
}
@@ -134,6 +145,7 @@ nni_taskq_fini(nni_taskq *tq)
void
nni_task_exec(nni_task *task)
{
+ bool reap;
nni_mtx_lock(&task->task_mtx);
if (task->task_prep) {
task->task_prep = false;
@@ -151,7 +163,12 @@ nni_task_exec(nni_task *task)
if (task->task_busy == 0) {
nni_cv_wake(&task->task_cv);
}
+ reap = task->task_reap;
nni_mtx_unlock(&task->task_mtx);
+
+ if (reap) {
+ nni_task_fini(task);
+ }
}
void
@@ -210,6 +227,7 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
nni_mtx_init(&task->task_mtx);
nni_cv_init(&task->task_cv, &task->task_mtx);
task->task_prep = false;
+ task->task_reap = false;
task->task_busy = 0;
task->task_cb = cb;
task->task_arg = arg;
@@ -221,8 +239,16 @@ nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
void
nni_task_fini(nni_task *task)
{
- NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
+
+ // If we are being called from the task function, then
+ // defer the reap until after the callback has finished.
+ if (task->task_busy && (task->task_thr != NULL) &&
+ nni_thr_is_self(task->task_thr)) {
+ task->task_reap = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}