aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-19 18:34:59 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-19 18:34:59 -0700
commit88fb04f61918b06e6e269c1960058c3df5e0a0ef (patch)
tree447c44bae8e2fd93d95b04fe8ab83bf02314eaef /src/core
parentfa24bad0f7f82b4718cc2f13f60fcdd9b0cf86fe (diff)
downloadnng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.gz
nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.tar.bz2
nng-88fb04f61918b06e6e269c1960058c3df5e0a0ef.zip
Always run the AIO completion logic.
We have seen some yet another weird situation where we had an orphaned pipe, which was caused by not completing the callback. If we are going to run nni_aio_fini, we should still run the callback (albeit with a return value of NNG_ECANCELED or somesuch) to be sure that we can't orphan stuff.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c30
-rw-r--r--src/core/taskq.c56
-rw-r--r--src/core/taskq.h1
3 files changed, 74 insertions, 13 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 238522b0..a9fbc50d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,9 +12,10 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
+ NNI_AIO_START = 0x8,
};
// These are used for expiration.
@@ -48,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = 0;
+ aio->a_flags = NNI_AIO_START;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -65,6 +66,10 @@ nni_aio_fini(nni_aio *aio)
aio->a_flags |= NNI_AIO_DONE;
aio->a_result = NNG_ECANCELED;
cancelfn = aio->a_prov_cancel;
+ if (aio->a_flags & NNI_AIO_START) {
+ aio->a_flags &= ~NNI_AIO_START;
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
} else {
cancelfn = NULL;
@@ -76,7 +81,8 @@ nni_aio_fini(nni_aio *aio)
}
nni_mtx_unlock(&aio->a_lk);
- // just a list operation at this point.
+ // Stop any timeouts. If one was in flight, we wait until it
+ // completes (it could fire the completion callback.)
nni_aio_expire_remove(aio);
// Cancel the AIO if it was scheduled.
@@ -84,10 +90,9 @@ nni_aio_fini(nni_aio *aio)
cancelfn(aio);
}
- // if the task is already dispatched, cancel it (or wait for it to
- // complete). No further dispatches will happen because of the
- // above logic to set NNI_AIO_FINI.
- nni_taskq_cancel(NULL, &aio->a_tqe);
+ // Wait for any outstanding task to complete. We won't schedule
+ // new stuff because nni_aio_start will fail (due to AIO_FINI).
+ nni_taskq_wait(NULL, &aio->a_tqe);
// At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
@@ -168,6 +173,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
return;
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
@@ -213,6 +219,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -238,6 +246,8 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
@@ -382,6 +392,8 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e45819b5..36129bfd 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -23,6 +23,7 @@ struct nni_taskq {
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_close;
+ int tq_waiting;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -48,12 +49,21 @@ nni_taskq_thread(void *self)
thr->tqt_wait = 0;
nni_cv_wake(&tq->tq_cv);
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
+
continue;
}
if (tq->tq_close) {
break;
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
nni_cv_wait(&tq->tq_cv);
}
@@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (0);
}
+void
+nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+{
+ int i;
+ int running;
+
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+
+ nni_mtx_lock(&tq->tq_mtx);
+ for (;;) {
+ running = 0;
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ running = 1;
+ } else {
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ running = 1;
+ break;
+ }
+ }
+ }
+ if (!running) {
+ break;
+ }
+
+ tq->tq_waiting = 1;
+ nni_cv_wait(&tq->tq_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
int
nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
@@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_lock(&tq->tq_mtx);
running = 1;
- do {
+ for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == ent) {
- tq->tq_threads[i].tqt_wait = 1;
- nni_cv_wait(&tq->tq_cv);
running = 1;
break;
}
}
- } while (running != 0);
+
+ if (!running) {
+ break;
+ }
+ // tq->tq_threads[i].tqt_wait = 1;
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);
diff --git a/src/core/taskq.h b/src/core/taskq.h
index f1e29a34..e6399706 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -28,6 +28,7 @@ extern void nni_taskq_fini(nni_taskq *);
extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *);
extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *);
+extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *);
extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *);
extern int nni_taskq_sys_init(void);