diff options
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 103 |
1 files changed, 61 insertions, 42 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 0a9c744d..15abfe8d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -95,7 +95,8 @@ struct nng_aio { void *a_outputs[4]; // Provider-use fields. - nni_aio_cancelfn a_prov_cancel; + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; void * a_prov_data; nni_list_node a_prov_node; void * a_prov_extra[4]; // Extra data used by provider @@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } // Wait for the aio to be "done"; this ensures that we don't @@ -217,16 +221,19 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); + if (fn != NULL) { + fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); @@ -237,16 +244,19 @@ void nni_aio_close(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } } } @@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio) aio->a_result = NNG_ECANCELED; aio->a_count = 0; nni_list_node_remove(&aio->a_expire_node); - aio->a_prov_cancel = NULL; - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; nni_mtx_unlock(&nni_aio_lk); nni_task_dispatch(aio->a_task); return (NNG_ECANCELED); } - aio->a_result = 0; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; + aio->a_result = 0; + aio->a_count = 0; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } @@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancelfn; + aio->a_cancel_arg = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); @@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) void nni_aio_abort(nni_aio *aio, int rv) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. - if (cancelfn != NULL) { - cancelfn(aio, rv); + if (fn != NULL) { + fn(aio, arg, rv); } } @@ -430,9 +444,10 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); - aio->a_result = rv; - aio->a_count = count; - aio->a_prov_cancel = NULL; + aio->a_result = rv; + aio->a_count = count; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } @@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; nni_time now; nni_aio * aio; int rv; @@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg) nni_list_remove(aios, aio); rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; - if ((cancelfn = aio->a_prov_cancel) != NULL) { - + if ((fn = aio->a_cancel_fn) != NULL) { + void *arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. nni_aio_expire_aio = aio; @@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg) // terminate the aio - we've tried, but it has to run // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); - cancelfn(aio, rv); + fn(aio, arg, rv); nni_mtx_lock(&nni_aio_lk); nni_aio_expire_aio = NULL; @@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) } static void -nni_sleep_cancel(nng_aio *aio, int rv) +nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { + NNI_ARG_UNUSED(arg); + nni_mtx_lock(&nni_aio_lk); if (!aio->a_sleep) { nni_mtx_unlock(&nni_aio_lk); |
