diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 84 |
1 files changed, 33 insertions, 51 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a9fbc50d..620a865d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,10 +12,9 @@ #include <string.h> enum nni_aio_flags { - NNI_AIO_WAKE = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, - NNI_AIO_START = 0x8, + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, }; // These are used for expiration. @@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = NNI_AIO_START; + aio->a_flags = 0; nni_taskq_ent_init(&aio->a_tqe, cb, arg); return (0); @@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { - void (*cancelfn)(nni_aio *); + nni_aio_stop(aio); - nni_mtx_lock(&aio->a_lk); - aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - if ((aio->a_flags & NNI_AIO_DONE) == 0) { - aio->a_flags |= NNI_AIO_DONE; - aio->a_result = NNG_ECANCELED; - cancelfn = aio->a_prov_cancel; - if (aio->a_flags & NNI_AIO_START) { - aio->a_flags &= ~NNI_AIO_START; - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // At this point the AIO is done. + nni_cv_fini(&aio->a_cv); + nni_mtx_fini(&aio->a_lk); - } else { - cancelfn = NULL; + if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { + NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); } - nni_cv_wake(&aio->a_cv); +} - while (aio->a_refcnt != 0) { - nni_cv_wait(&aio->a_cv); +// nni_aio_stop cancels any oustanding operation, and waits for the +// callback to complete, if still running. It also marks the AIO as +// stopped, preventing further calls to nni_aio_start from succeeding. +// To correctly tear down an AIO, call stop, and make sure any other +// calles are not also stopped, before calling nni_aio_fini to release +// actual memory. +void +nni_aio_stop(nni_aio *aio) +{ + if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) { + // Never initialized, so nothing should have happened. + return; } + nni_mtx_lock(&aio->a_lk); + aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled nni_mtx_unlock(&aio->a_lk); - // Stop any timeouts. If one was in flight, we wait until it - // completes (it could fire the completion callback.) - nni_aio_expire_remove(aio); - - // Cancel the AIO if it was scheduled. - if (cancelfn != NULL) { - cancelfn(aio); - } + nni_aio_cancel(aio, NNG_ECANCELED); // Wait for any outstanding task to complete. We won't schedule // new stuff because nni_aio_start will fail (due to AIO_FINI). nni_taskq_wait(NULL, &aio->a_tqe); - - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); - nni_mtx_fini(&aio->a_lk); - - if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { - NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); - } } int @@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv) void (*cancelfn)(nni_aio *); nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // The operation already completed - so there's nothing // left for us to do. nni_mtx_unlock(&aio->a_lk); return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = rv; cancelfn = aio->a_prov_cancel; aio->a_prov_cancel = NULL; - // Guaraneed to just be a list operation. - nni_aio_expire_remove(aio); - aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); + // Stop any I/O at the provider level. if (cancelfn != NULL) { cancelfn(aio); @@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv) aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - if (!(aio->a_flags & NNI_AIO_FINI)) { - // If we are finalizing, then we are done. - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -213,13 +198,12 @@ int nni_aio_finish(nni_aio *aio, int result, size_t count) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = count; @@ -240,13 +224,12 @@ int nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) { nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + if (aio->a_flags & NNI_AIO_DONE) { // Operation already done (canceled or timed out?) nni_mtx_unlock(&aio->a_lk); return (NNG_ESTATE); } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = result; aio->a_count = 0; @@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg) } aio->a_flags |= NNI_AIO_DONE; - aio->a_flags &= ~NNI_AIO_START; aio->a_result = NNG_ETIMEDOUT; cancelfn = aio->a_prov_cancel; |
