diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 216 |
1 files changed, 99 insertions, 117 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 3606aa14..a5b6c088 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv; static int nni_aio_expire_run; static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_aios; +static nni_aio *nni_aio_expire_aio; // Design notes. // @@ -62,16 +63,11 @@ struct nng_aio { nni_duration a_timeout; // Relative timeout // These fields are private to the aio framework. - nni_cv a_cv; - bool a_fini : 1; // shutting down (no new operations) - bool a_done : 1; // operation has completed - bool a_pend : 1; // completion routine pending - bool a_active : 1; // aio was started - bool a_expiring : 1; // expiration callback in progress - bool a_waiting : 1; // a thread is waiting for this to finish - bool a_synch : 1; // run completion synchronously - bool a_sleep : 1; // sleeping with no action - nni_task a_task; + bool a_stop; // shutting down (no new operations) + bool a_sleep; // sleeping with no action + int a_sleeprv; // result when sleep wakes + int a_cancelrv; // if canceled between begin and schedule + nni_task *a_task; // Read/write operations. nni_iov *a_iov; @@ -109,12 +105,16 @@ int nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) { nni_aio *aio; + int rv; if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { return (NNG_ENOMEM); } memset(aio, 0, sizeof(*aio)); - nni_cv_init(&aio->a_cv, &nni_aio_lk); + if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) { + NNI_FREE_STRUCT(aio); + return (rv); + } aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; aio->a_iov = aio->a_iovinl; @@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) if (arg == NULL) { arg = aio; } - nni_task_init(NULL, &aio->a_task, cb, arg); *aiop = aio; return (0); } @@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio) if (aio != NULL) { nni_aio_stop(aio); - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. + + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(aio->a_task); + + // At this point the AIO is done. if (aio->a_niovalloc > 0) { NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc); } @@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_mtx_lock(&nni_aio_lk); - aio->a_fini = true; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); nni_aio_abort(aio, NNG_ECANCELED); @@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio) void nni_aio_wait(nni_aio *aio) { - nni_mtx_lock(&nni_aio_lk); - // Wait until we're done, and the synchronous completion flag - // is cleared (meaning any synch completion is finished). - while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) { - aio->a_waiting = true; - nni_cv_wait(&aio->a_cv); - } - nni_mtx_unlock(&nni_aio_lk); - nni_task_wait(&aio->a_task); + nni_task_wait(aio->a_task); } int @@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio) { nni_mtx_lock(&nni_aio_lk); // We should not reschedule anything at this point. - if (aio->a_fini) { - aio->a_active = false; + if (aio->a_stop) { + nni_task_unprep(aio->a_task); aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - aio->a_done = false; - aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - aio->a_active = true; + aio->a_cancelrv = 0; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } + nni_task_prep(aio->a_task); nni_mtx_unlock(&nni_aio_lk); return (0); } -void +int nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { + int rv; if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: - aio->a_expire = nni_clock(); - break; + return (NNG_ETIMEDOUT); case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; @@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } nni_mtx_lock(&nni_aio_lk); + if (aio->a_stop) { + nni_mtx_unlock(&nni_aio_lk); + return (NNG_ECANCELED); + } + if ((rv = aio->a_cancelrv) != 0) { + nni_mtx_unlock(&nni_aio_lk); + return (rv); + } + + // If cancellation occurred in between "begin" and "schedule", + // then cancel it right now. aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - if (aio->a_expire != NNI_TIME_NEVER) { + if ((rv = aio->a_cancelrv) != 0) { + aio->a_expire = 0; + nni_aio_expire_add(aio); + } else if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); -} - -int -nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) -{ - - if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) { - return (NNG_ETIMEDOUT); - } - nni_aio_schedule(aio, cancelfn, data); return (0); } @@ -379,11 +383,8 @@ nni_aio_finish_impl( { nni_mtx_lock(&nni_aio_lk); - NNI_ASSERT(!aio->a_pend); // provider only calls us *once* - nni_list_node_remove(&aio->a_expire_node); - aio->a_pend = true; aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -393,38 +394,13 @@ nni_aio_finish_impl( aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; - - // If we are expiring, then we rely on the expiration thread to - // complete this; we must not because the expiration thread is - // still holding the reference. - - if (aio->a_expiring) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - - aio->a_done = true; - aio->a_synch = synch; + nni_mtx_unlock(&nni_aio_lk); if (synch) { - if (aio->a_task.task_cb != NULL) { - nni_mtx_unlock(&nni_aio_lk); - aio->a_task.task_cb(aio->a_task.task_arg); - nni_mtx_lock(&nni_aio_lk); - } + nni_task_exec(aio->a_task); } else { - nni_task_dispatch(&aio->a_task); + nni_task_dispatch(aio->a_task); } - aio->a_synch = false; - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); - } - - // This has to be done with the lock still held, in order - // to prevent taskq wait from returning prematurely. - nni_mtx_unlock(&nni_aio_lk); } void @@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio) static void nni_aio_expire_loop(void *arg) { - nni_list * aios = &nni_aio_expire_aios; - nni_aio * aio; - nni_time now; - nni_aio_cancelfn cancelfn; - int rv; + nni_list *aios = &nni_aio_expire_aios; NNI_ARG_UNUSED(arg); for (;;) { + nni_aio_cancelfn cancelfn; + nni_time now; + nni_aio * aio; + int rv; + now = nni_clock(); nni_mtx_lock(&nni_aio_lk); - if (nni_aio_expire_run == 0) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - if ((aio = nni_list_first(aios)) == NULL) { + + if (nni_aio_expire_run == 0) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + nni_cv_wait(&nni_aio_expire_cv); nni_mtx_unlock(&nni_aio_lk); continue; @@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg) // This aio's time has come. Expire it, canceling any // outstanding I/O. nni_list_remove(aios, aio); + rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; + + if ((cancelfn = aio->a_prov_cancel) != NULL) { + + // Place a temporary hold on the aio. This prevents it + // from being destroyed. + nni_aio_expire_aio = aio; - // Mark it as expiring. This acts as a hold on - // the aio, similar to the consumers. The actual taskq - // dispatch on completion won't occur until this is cleared, - // and the done flag won't be set either. - aio->a_expiring = true; - cancelfn = aio->a_prov_cancel; - rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT; - aio->a_sleep = false; - - // Cancel any outstanding activity. This is always non-NULL - // for a valid aio, and becomes NULL only when an AIO is - // already being canceled or finished. - if (cancelfn != NULL) { + // We let the cancel function handle the completion. + // If there is no cancellation function, then we cannot + // terminate the aio - we've tried, but it has to run + // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); cancelfn(aio, rv); nni_mtx_lock(&nni_aio_lk); - } else { - aio->a_pend = true; - aio->a_result = rv; - } - - NNI_ASSERT(aio->a_pend); // nni_aio_finish was run - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_expiring = false; - aio->a_done = true; - nni_task_dispatch(&aio->a_task); - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); + nni_aio_expire_aio = NULL; + nni_cv_wake(&nni_aio_expire_cv); } nni_mtx_unlock(&nni_aio_lk); } @@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) return (resid); // we might not have used all of n for this iov } +static void +nni_sleep_cancel(nng_aio *aio, int rv) +{ + nni_mtx_lock(&nni_aio_lk); + if (!aio->a_sleep) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + + aio->a_sleep = false; + nni_list_node_remove(&aio->a_expire_node); + nni_mtx_unlock(&nni_aio_lk); + + nni_aio_finish_error(aio, rv); +} + void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } + aio->a_sleeprv = 0; + aio->a_sleep = true; switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: @@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { - aio->a_sleep = false; - (void) nni_aio_schedule(aio, NULL, NULL); - return; + aio->a_sleeprv = NNG_ETIMEDOUT; + ms = aio->a_timeout; } } - aio->a_sleep = true; aio->a_expire = nni_clock() + ms; - // There is no cancellation, apart from just unexpiring. - nni_aio_schedule(aio, NULL, NULL); + if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { + nni_aio_finish_error(aio, rv); + } } void |
