aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c30
-rw-r--r--src/core/taskq.c56
-rw-r--r--src/core/taskq.h1
3 files changed, 74 insertions, 13 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 238522b0..a9fbc50d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,9 +12,10 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
+ NNI_AIO_START = 0x8,
};
// These are used for expiration.
@@ -48,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = 0;
+ aio->a_flags = NNI_AIO_START;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -65,6 +66,10 @@ nni_aio_fini(nni_aio *aio)
aio->a_flags |= NNI_AIO_DONE;
aio->a_result = NNG_ECANCELED;
cancelfn = aio->a_prov_cancel;
+ if (aio->a_flags & NNI_AIO_START) {
+ aio->a_flags &= ~NNI_AIO_START;
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
} else {
cancelfn = NULL;
@@ -76,7 +81,8 @@ nni_aio_fini(nni_aio *aio)
}
nni_mtx_unlock(&aio->a_lk);
- // just a list operation at this point.
+ // Stop any timeouts. If one was in flight, we wait until it
+ // completes (it could fire the completion callback.)
nni_aio_expire_remove(aio);
// Cancel the AIO if it was scheduled.
@@ -84,10 +90,9 @@ nni_aio_fini(nni_aio *aio)
cancelfn(aio);
}
- // if the task is already dispatched, cancel it (or wait for it to
- // complete). No further dispatches will happen because of the
- // above logic to set NNI_AIO_FINI.
- nni_taskq_cancel(NULL, &aio->a_tqe);
+ // Wait for any outstanding task to complete. We won't schedule
+ // new stuff because nni_aio_start will fail (due to AIO_FINI).
+ nni_taskq_wait(NULL, &aio->a_tqe);
// At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
@@ -168,6 +173,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
return;
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
@@ -213,6 +219,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -238,6 +246,8 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
@@ -382,6 +392,8 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e45819b5..36129bfd 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -23,6 +23,7 @@ struct nni_taskq {
nni_taskq_thr *tq_threads;
int tq_nthreads;
int tq_close;
+ int tq_waiting;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -48,12 +49,21 @@ nni_taskq_thread(void *self)
thr->tqt_wait = 0;
nni_cv_wake(&tq->tq_cv);
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
+
continue;
}
if (tq->tq_close) {
break;
}
+ if (tq->tq_waiting) {
+ tq->tq_waiting = 0;
+ nni_cv_wake(&tq->tq_cv);
+ }
nni_cv_wait(&tq->tq_cv);
}
@@ -152,6 +162,39 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (0);
}
+void
+nni_taskq_wait(nni_taskq *tq, nni_taskq_ent *ent)
+{
+ int i;
+ int running;
+
+ if (tq == NULL) {
+ tq = nni_taskq_systq;
+ }
+
+ nni_mtx_lock(&tq->tq_mtx);
+ for (;;) {
+ running = 0;
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ running = 1;
+ } else {
+ for (i = 0; i < tq->tq_nthreads; i++) {
+ if (tq->tq_threads[i].tqt_running == ent) {
+ running = 1;
+ break;
+ }
+ }
+ }
+ if (!running) {
+ break;
+ }
+
+ tq->tq_waiting = 1;
+ nni_cv_wait(&tq->tq_cv);
+ }
+ nni_mtx_unlock(&tq->tq_mtx);
+}
+
int
nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
{
@@ -164,17 +207,22 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_lock(&tq->tq_mtx);
running = 1;
- do {
+ for (;;) {
running = 0;
for (i = 0; i < tq->tq_nthreads; i++) {
if (tq->tq_threads[i].tqt_running == ent) {
- tq->tq_threads[i].tqt_wait = 1;
- nni_cv_wait(&tq->tq_cv);
running = 1;
break;
}
}
- } while (running != 0);
+
+ if (!running) {
+ break;
+ }
+ // tq->tq_threads[i].tqt_wait = 1;
+ tq->tq_waiting++;
+ nni_cv_wait(&tq->tq_cv);
+ }
if (ent->tqe_tq != tq) {
nni_mtx_unlock(&tq->tq_mtx);
diff --git a/src/core/taskq.h b/src/core/taskq.h
index f1e29a34..e6399706 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -28,6 +28,7 @@ extern void nni_taskq_fini(nni_taskq *);
extern int nni_taskq_dispatch(nni_taskq *, nni_taskq_ent *);
extern int nni_taskq_cancel(nni_taskq *, nni_taskq_ent *);
+extern void nni_taskq_wait(nni_taskq *, nni_taskq_ent *);
extern void nni_taskq_ent_init(nni_taskq_ent *, nni_cb, void *);
extern int nni_taskq_sys_init(void);