diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 103 | ||||
| -rw-r--r-- | src/core/aio.h | 2 | ||||
| -rw-r--r-- | src/core/device.c | 15 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 4 |
4 files changed, 72 insertions, 52 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 0a9c744d..15abfe8d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -95,7 +95,8 @@ struct nng_aio { void *a_outputs[4]; // Provider-use fields. - nni_aio_cancelfn a_prov_cancel; + nni_aio_cancelfn a_cancel_fn; + void * a_cancel_arg; void * a_prov_data; nni_list_node a_prov_node; void * a_prov_extra[4]; // Extra data used by provider @@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } // Wait for the aio to be "done"; this ensures that we don't @@ -217,16 +221,19 @@ void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECANCELED); + if (fn != NULL) { + fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); @@ -237,16 +244,19 @@ void nni_aio_close(nni_aio *aio) { if (aio != NULL) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - aio->a_stop = true; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); - if (cancelfn != NULL) { - cancelfn(aio, NNG_ECLOSED); + if (fn != NULL) { + fn(aio, arg, NNG_ECLOSED); } } } @@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio) aio->a_result = NNG_ECANCELED; aio->a_count = 0; nni_list_node_remove(&aio->a_expire_node); - aio->a_prov_cancel = NULL; - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; nni_mtx_unlock(&nni_aio_lk); nni_task_dispatch(aio->a_task); return (NNG_ECANCELED); } - aio->a_result = 0; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; + aio->a_result = 0; + aio->a_count = 0; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } @@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) return (NNG_ECLOSED); } - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_prov_cancel = cancelfn; - aio->a_prov_data = data; + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancelfn; + aio->a_cancel_arg = data; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); @@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) void nni_aio_abort(nni_aio *aio, int rv) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; + void * arg; nni_mtx_lock(&nni_aio_lk); - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; + fn = aio->a_cancel_fn; + arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. - if (cancelfn != NULL) { - cancelfn(aio, rv); + if (fn != NULL) { + fn(aio, arg, rv); } } @@ -430,9 +444,10 @@ nni_aio_finish_impl( nni_list_node_remove(&aio->a_expire_node); - aio->a_result = rv; - aio->a_count = count; - aio->a_prov_cancel = NULL; + aio->a_result = rv; + aio->a_count = count; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } @@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg) NNI_ARG_UNUSED(arg); for (;;) { - nni_aio_cancelfn cancelfn; + nni_aio_cancelfn fn; nni_time now; nni_aio * aio; int rv; @@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg) nni_list_remove(aios, aio); rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; - if ((cancelfn = aio->a_prov_cancel) != NULL) { - + if ((fn = aio->a_cancel_fn) != NULL) { + void *arg = aio->a_cancel_arg; + aio->a_cancel_fn = NULL; + aio->a_cancel_arg = NULL; // Place a temporary hold on the aio. This prevents it // from being destroyed. nni_aio_expire_aio = aio; @@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg) // terminate the aio - we've tried, but it has to run // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); - cancelfn(aio, rv); + fn(aio, arg, rv); nni_mtx_lock(&nni_aio_lk); nni_aio_expire_aio = NULL; @@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) } static void -nni_sleep_cancel(nng_aio *aio, int rv) +nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { + NNI_ARG_UNUSED(arg); + nni_mtx_lock(&nni_aio_lk); if (!aio->a_sleep) { nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 5462c40b..0febc85e 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -18,7 +18,7 @@ typedef struct nni_aio_ops nni_aio_ops; -typedef void (*nni_aio_cancelfn)(nni_aio *, int); +typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int); // nni_aio_init initializes an aio object. The callback is called with // the supplied argument when the operation is complete. If NULL is diff --git a/src/core/device.c b/src/core/device.c index 0fd23add..fee108a8 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -30,28 +30,29 @@ typedef struct nni_device_data { int npath; nni_device_path paths[2]; nni_mtx mtx; - int running; + bool running; } nni_device_data; typedef struct nni_device_pair nni_device_pair; static void -nni_device_cancel(nni_aio *aio, int rv) +nni_device_cancel(nni_aio *aio, void *arg, int rv) { - nni_device_data *dd = nni_aio_get_prov_data(aio); + nni_device_data *dd = arg; // cancellation is the only path to shutting it down. nni_mtx_lock(&dd->mtx); - if (dd->running == 0) { + if ((!dd->running) || (dd->user != aio)) { nni_mtx_unlock(&dd->mtx); return; } - dd->running = 0; + dd->running = false; + dd->user = NULL; nni_mtx_unlock(&dd->mtx); nni_sock_shutdown(dd->paths[0].src); nni_sock_shutdown(dd->paths[0].dst); - nni_aio_finish_error(dd->user, rv); + nni_aio_finish_error(aio, rv); } static void @@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user) p->state = NNI_DEVICE_STATE_RECV; nni_sock_recv(p->src, p->aio); } - dd->running = 1; + dd->running = true; nni_mtx_unlock(&dd->mtx); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index e58fe6f5..62a3893b 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq) } static void -nni_msgq_cancel(nni_aio *aio, int rv) +nni_msgq_cancel(nni_aio *aio, void *arg, int rv) { - nni_msgq *mq = nni_aio_get_prov_data(aio); + nni_msgq *mq = arg; nni_mtx_lock(&mq->mq_lock); if (nni_aio_list_active(aio)) { |
