aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c103
1 files changed, 61 insertions, 42 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 0a9c744d..15abfe8d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -95,7 +95,8 @@ struct nng_aio {
void *a_outputs[4];
// Provider-use fields.
- nni_aio_cancelfn a_prov_cancel;
+ nni_aio_cancelfn a_cancel_fn;
+ void * a_cancel_arg;
void * a_prov_data;
nni_list_node a_prov_node;
void * a_prov_extra[4]; // Extra data used by provider
@@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
// This is like aio_close, but we don't want to dispatch
// the task. And unlike aio_stop, we don't want to wait
// for the task. (Because we implicitly do task_fini.)
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
// Wait for the aio to be "done"; this ensures that we don't
@@ -217,16 +221,19 @@ void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECANCELED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECANCELED);
}
nni_aio_wait(aio);
@@ -237,16 +244,19 @@ void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
}
}
@@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio)
aio->a_result = NNG_ECANCELED;
aio->a_count = 0;
nni_list_node_remove(&aio->a_expire_node);
- aio->a_prov_cancel = NULL;
- aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
nni_mtx_unlock(&nni_aio_lk);
nni_task_dispatch(aio->a_task);
return (NNG_ECANCELED);
}
- aio->a_result = 0;
- aio->a_count = 0;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
@@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
return (NNG_ECLOSED);
}
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancelfn;
+ aio->a_cancel_arg = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
@@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
void
nni_aio_abort(nni_aio *aio, int rv)
{
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
- if (cancelfn != NULL) {
- cancelfn(aio, rv);
+ if (fn != NULL) {
+ fn(aio, arg, rv);
}
}
@@ -430,9 +444,10 @@ nni_aio_finish_impl(
nni_list_node_remove(&aio->a_expire_node);
- aio->a_result = rv;
- aio->a_count = count;
- aio->a_prov_cancel = NULL;
+ aio->a_result = rv;
+ aio->a_count = count;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
if (msg) {
aio->a_msg = msg;
}
@@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg)
NNI_ARG_UNUSED(arg);
for (;;) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
nni_time now;
nni_aio * aio;
int rv;
@@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg)
nni_list_remove(aios, aio);
rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
- if ((cancelfn = aio->a_prov_cancel) != NULL) {
-
+ if ((fn = aio->a_cancel_fn) != NULL) {
+ void *arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
// Place a temporary hold on the aio. This prevents it
// from being destroyed.
nni_aio_expire_aio = aio;
@@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg)
// terminate the aio - we've tried, but it has to run
// to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
- cancelfn(aio, rv);
+ fn(aio, arg, rv);
nni_mtx_lock(&nni_aio_lk);
nni_aio_expire_aio = NULL;
@@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
}
static void
-nni_sleep_cancel(nng_aio *aio, int rv)
+nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
{
+ NNI_ARG_UNUSED(arg);
+
nni_mtx_lock(&nni_aio_lk);
if (!aio->a_sleep) {
nni_mtx_unlock(&nni_aio_lk);