aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c123
1 files changed, 114 insertions, 9 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 11aadcb7..96e7c950 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -10,7 +10,10 @@
#include <string.h>
#include "core/nng_impl.h"
-#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_DONE (1<<1)
+#define NNI_AIO_FINI (1<<2)
+#define NNI_AIO_STOP (1<<3)
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -32,6 +35,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -41,7 +45,24 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // Cancel the AIO if it was scheduled.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ // if the task is already dispatched, cancel it (or wait for it to
+ // complete). No further dispatches will happen because of the
+ // above logic to set NNI_AIO_FINI.
nni_taskq_cancel(NULL, &aio->a_tqe);
+
+ // At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
nni_mtx_fini(&aio->a_lk);
}
@@ -82,21 +103,105 @@ nni_aio_wait(nni_aio *aio)
}
+int
+nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
+{
+ NNI_ASSERT(aio->a_prov_data == NULL);
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags &= ~(NNI_AIO_DONE|NNI_AIO_WAKE);
+ if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) {
+ // We should not reschedule anything at this point.
+ nni_mtx_unlock(&aio->a_lk);
+ return (NNG_ECANCELED);
+ }
+ aio->a_prov_cancel = cancel;
+ aio->a_prov_data = data;
+ nni_mtx_unlock(&aio->a_lk);
+ return (0);
+}
+
+
+void
+nni_aio_stop(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_DONE|NNI_AIO_STOP;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This either aborts the task, or waits for it to complete if already
+ // dispatched.
+ nni_taskq_cancel(NULL, &aio->a_tqe);
+}
+
+
+void
+nni_aio_cancel(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // The operation already completed - so there's nothing
+ // left for us to do.
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
+ aio->a_result = NNG_ECANCELED;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ // These should have already been cleared by the cancel function.
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
+}
+
+
// I/O provider related functions.
void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_cb cb;
- void *arg;
-
nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // Operation already done (canceled or timed out?)
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
aio->a_result = result;
aio->a_count = count;
- cb = aio->a_cb;
- arg = aio->a_cbarg;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
}