aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c30
1 files changed, 21 insertions, 9 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 238522b0..a9fbc50d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,9 +12,10 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
+ NNI_AIO_START = 0x8,
};
// These are used for expiration.
@@ -48,7 +49,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = 0;
+ aio->a_flags = NNI_AIO_START;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -65,6 +66,10 @@ nni_aio_fini(nni_aio *aio)
aio->a_flags |= NNI_AIO_DONE;
aio->a_result = NNG_ECANCELED;
cancelfn = aio->a_prov_cancel;
+ if (aio->a_flags & NNI_AIO_START) {
+ aio->a_flags &= ~NNI_AIO_START;
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
} else {
cancelfn = NULL;
@@ -76,7 +81,8 @@ nni_aio_fini(nni_aio *aio)
}
nni_mtx_unlock(&aio->a_lk);
- // just a list operation at this point.
+ // Stop any timeouts. If one was in flight, we wait until it
+ // completes (it could fire the completion callback.)
nni_aio_expire_remove(aio);
// Cancel the AIO if it was scheduled.
@@ -84,10 +90,9 @@ nni_aio_fini(nni_aio *aio)
cancelfn(aio);
}
- // if the task is already dispatched, cancel it (or wait for it to
- // complete). No further dispatches will happen because of the
- // above logic to set NNI_AIO_FINI.
- nni_taskq_cancel(NULL, &aio->a_tqe);
+ // Wait for any outstanding task to complete. We won't schedule
+ // new stuff because nni_aio_start will fail (due to AIO_FINI).
+ nni_taskq_wait(NULL, &aio->a_tqe);
// At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
@@ -168,6 +173,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
return;
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
@@ -213,6 +219,8 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -238,6 +246,8 @@ nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = result;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
@@ -382,6 +392,8 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
+ aio->a_flags &= ~NNI_AIO_START;
+
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;