diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 123 |
1 files changed, 114 insertions, 9 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 11aadcb7..96e7c950 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -10,7 +10,10 @@ #include <string.h> #include "core/nng_impl.h" -#define NNI_AIO_WAKE (1<<0) +#define NNI_AIO_WAKE (1<<0) +#define NNI_AIO_DONE (1<<1) +#define NNI_AIO_FINI (1<<2) +#define NNI_AIO_STOP (1<<3) int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -32,6 +35,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; + aio->a_flags = 0; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -41,7 +45,24 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // Cancel the AIO if it was scheduled. + if (cancelfn != NULL) { + cancelfn(aio); + } + + // if the task is already dispatched, cancel it (or wait for it to + // complete). No further dispatches will happen because of the + // above logic to set NNI_AIO_FINI. nni_taskq_cancel(NULL, &aio->a_tqe); + + // At this point the AIO is done. nni_cv_fini(&aio->a_cv); nni_mtx_fini(&aio->a_lk); } @@ -82,21 +103,105 @@ nni_aio_wait(nni_aio *aio) } +int +nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) +{ + NNI_ASSERT(aio->a_prov_data == NULL); + NNI_ASSERT(aio->a_prov_cancel == NULL); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags &= ~(NNI_AIO_DONE|NNI_AIO_WAKE); + if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) { + // We should not reschedule anything at this point. + nni_mtx_unlock(&aio->a_lk); + return (NNG_ECANCELED); + } + aio->a_prov_cancel = cancel; + aio->a_prov_data = data; + nni_mtx_unlock(&aio->a_lk); + return (0); +} + + +void +nni_aio_stop(nni_aio *aio) +{ + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_DONE|NNI_AIO_STOP; + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // This unregisters the AIO from the provider. + if (cancelfn != NULL) { + cancelfn(aio); + } + + nni_mtx_lock(&aio->a_lk); + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + nni_mtx_unlock(&aio->a_lk); + + // This either aborts the task, or waits for it to complete if already + // dispatched. + nni_taskq_cancel(NULL, &aio->a_tqe); +} + + +void +nni_aio_cancel(nni_aio *aio) +{ + void (*cancelfn)(nni_aio *); + + nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & NNI_AIO_DONE) { + // The operation already completed - so there's nothing + // left for us to do. + nni_mtx_unlock(&aio->a_lk); + return; + } + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ECANCELED; + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&aio->a_lk); + + // This unregisters the AIO from the provider. + if (cancelfn != NULL) { + cancelfn(aio); + } + + nni_mtx_lock(&aio->a_lk); + // These should have already been cleared by the cancel function. + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + + if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) { + nni_taskq_dispatch(NULL, &aio->a_tqe); + } + nni_mtx_unlock(&aio->a_lk); +} + + // I/O provider related functions. void nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_cb cb; - void *arg; - nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & NNI_AIO_DONE) { + // Operation already done (canceled or timed out?) + nni_mtx_unlock(&aio->a_lk); + return; + } + aio->a_flags |= NNI_AIO_DONE; aio->a_result = result; aio->a_count = count; - cb = aio->a_cb; - arg = aio->a_cbarg; - nni_cv_wake(&aio->a_cv); - nni_mtx_unlock(&aio->a_lk); + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; - nni_taskq_dispatch(NULL, &aio->a_tqe); + if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) { + nni_taskq_dispatch(NULL, &aio->a_tqe); + } + nni_mtx_unlock(&aio->a_lk); } |
