From 6e5f6a26beec0a44d25625cacb5095cdc7a94146 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 20 Aug 2018 09:00:46 -0700 Subject: fixes #664 aio cancellation could be better This changes the signature of the aio cancellation routines to take the argument for cancellation directly, so we do not need to lookup the argument using the nni_aio_get_prov_data. We should probably consider eliminating nni_aio_get_prov_data, and co, and changing the prov_extra to reflect prov_data. Later. --- src/core/aio.c | 103 +++++++++++++++++++++++++++++++--------------------- src/core/aio.h | 2 +- src/core/device.c | 15 ++++---- src/core/msgqueue.c | 4 +- 4 files changed, 72 insertions(+), 52 deletions(-) (limited to 'src/core') diff --git a/src/core/aio.c b/src/core/aio.c index 0a9c744d..15abfe8d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -95,7 +95,8 @@ struct nng_aio { void *a_outputs[4]; // Provider-use fields. - nni_aio_cancelfn a_prov_cancel; + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; void * a_prov_data; nni_list_node a_prov_node; void * a_prov_extra[4]; // Extra data used by provider @@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } // Wait for the aio to be "done"; this ensures that we don't @@ -217,16 +221,19 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); + if (fn != NULL) { + fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); @@ -237,16 +244,19 @@ void nni_aio_close(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } } } @@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio) aio->a_result = NNG_ECANCELED; aio->a_count = 0; nni_list_node_remove(&aio->a_expire_node); - aio->a_prov_cancel = NULL; - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; nni_mtx_unlock(&nni_aio_lk); nni_task_dispatch(aio->a_task); return (NNG_ECANCELED); } - aio->a_result = 0; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; + aio->a_result = 0; + aio->a_count = 0; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } @@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancelfn; + aio->a_cancel_arg = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); @@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) void nni_aio_abort(nni_aio *aio, int rv) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. - if (cancelfn != NULL) { - cancelfn(aio, rv); + if (fn != NULL) { + fn(aio, arg, rv); } } @@ -430,9 +444,10 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); - aio->a_result = rv; - aio->a_count = count; - aio->a_prov_cancel = NULL; + aio->a_result = rv; + aio->a_count = count; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } @@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; nni_time now; nni_aio * aio; int rv; @@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg) nni_list_remove(aios, aio); rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; - if ((cancelfn = aio->a_prov_cancel) != NULL) { - + if ((fn = aio->a_cancel_fn) != NULL) { + void *arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. nni_aio_expire_aio = aio; @@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg) // terminate the aio - we've tried, but it has to run // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); - cancelfn(aio, rv); + fn(aio, arg, rv); nni_mtx_lock(&nni_aio_lk); nni_aio_expire_aio = NULL; @@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) } static void -nni_sleep_cancel(nng_aio *aio, int rv) +nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { + NNI_ARG_UNUSED(arg); + nni_mtx_lock(&nni_aio_lk); if (!aio->a_sleep) { nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 5462c40b..0febc85e 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -18,7 +18,7 @@ typedef struct nni_aio_ops nni_aio_ops; -typedef void (*nni_aio_cancelfn)(nni_aio *, int); +typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // nni_aio_init initializes an aio object. The callback is called with // the supplied argument when the operation is complete. If NULL is diff --git a/src/core/device.c b/src/core/device.c index 0fd23add..fee108a8 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -30,28 +30,29 @@ typedef struct nni_device_data { int npath; nni_device_path paths[2]; nni_mtx mtx; - int running; + bool running; } nni_device_data; typedef struct nni_device_pair nni_device_pair; static void -nni_device_cancel(nni_aio *aio, int rv) +nni_device_cancel(nni_aio *aio, void *arg, int rv) { - nni_device_data *dd = nni_aio_get_prov_data(aio); + nni_device_data *dd = arg; // cancellation is the only path to shutting it down. nni_mtx_lock(&dd->mtx); - if (dd->running == 0) { + if ((!dd->running) || (dd->user != aio)) { nni_mtx_unlock(&dd->mtx); return; } - dd->running = 0; + dd->running = false; + dd->user = NULL; nni_mtx_unlock(&dd->mtx); nni_sock_shutdown(dd->paths[0].src); nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(dd->user, rv); + nni_aio_finish_error(aio, rv); } static void @@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user) p->state = NNI_DEVICE_STATE_RECV; nni_sock_recv(p->src, p->aio); } - dd->running = 1; + dd->running = true; nni_mtx_unlock(&dd->mtx); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index e58fe6f5..62a3893b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio, int rv) +nni_msgq_cancel(nni_aio *aio, void *arg, int rv) { - nni_msgq *mq = nni_aio_get_prov_data(aio); + nni_msgq *mq = arg; nni_mtx_lock(&mq->mq_lock); if (nni_aio_list_active(aio)) { -- cgit v1.2.3-70-g09d2