aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c103
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/device.c15
-rw-r--r--src/core/msgqueue.c4
4 files changed, 72 insertions, 52 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 0a9c744d..15abfe8d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -95,7 +95,8 @@ struct nng_aio {
void *a_outputs[4];
// Provider-use fields.
- nni_aio_cancelfn a_prov_cancel;
+ nni_aio_cancelfn a_cancel_fn;
+ void * a_cancel_arg;
void * a_prov_data;
nni_list_node a_prov_node;
void * a_prov_extra[4]; // Extra data used by provider
@@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
// This is like aio_close, but we don't want to dispatch
// the task. And unlike aio_stop, we don't want to wait
// for the task. (Because we implicitly do task_fini.)
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
// Wait for the aio to be "done"; this ensures that we don't
@@ -217,16 +221,19 @@ void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECANCELED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECANCELED);
}
nni_aio_wait(aio);
@@ -237,16 +244,19 @@ void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
}
}
@@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio)
aio->a_result = NNG_ECANCELED;
aio->a_count = 0;
nni_list_node_remove(&aio->a_expire_node);
- aio->a_prov_cancel = NULL;
- aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
nni_mtx_unlock(&nni_aio_lk);
nni_task_dispatch(aio->a_task);
return (NNG_ECANCELED);
}
- aio->a_result = 0;
- aio->a_count = 0;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
@@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
return (NNG_ECLOSED);
}
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancelfn;
+ aio->a_cancel_arg = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
@@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
void
nni_aio_abort(nni_aio *aio, int rv)
{
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
- if (cancelfn != NULL) {
- cancelfn(aio, rv);
+ if (fn != NULL) {
+ fn(aio, arg, rv);
}
}
@@ -430,9 +444,10 @@ nni_aio_finish_impl(
nni_list_node_remove(&aio->a_expire_node);
- aio->a_result = rv;
- aio->a_count = count;
- aio->a_prov_cancel = NULL;
+ aio->a_result = rv;
+ aio->a_count = count;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
if (msg) {
aio->a_msg = msg;
}
@@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg)
NNI_ARG_UNUSED(arg);
for (;;) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
nni_time now;
nni_aio * aio;
int rv;
@@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg)
nni_list_remove(aios, aio);
rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
- if ((cancelfn = aio->a_prov_cancel) != NULL) {
-
+ if ((fn = aio->a_cancel_fn) != NULL) {
+ void *arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
// Place a temporary hold on the aio. This prevents it
// from being destroyed.
nni_aio_expire_aio = aio;
@@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg)
// terminate the aio - we've tried, but it has to run
// to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
- cancelfn(aio, rv);
+ fn(aio, arg, rv);
nni_mtx_lock(&nni_aio_lk);
nni_aio_expire_aio = NULL;
@@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
}
static void
-nni_sleep_cancel(nng_aio *aio, int rv)
+nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
{
+ NNI_ARG_UNUSED(arg);
+
nni_mtx_lock(&nni_aio_lk);
if (!aio->a_sleep) {
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/aio.h b/src/core/aio.h
index 5462c40b..0febc85e 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -18,7 +18,7 @@
typedef struct nni_aio_ops nni_aio_ops;
-typedef void (*nni_aio_cancelfn)(nni_aio *, int);
+typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int);
// nni_aio_init initializes an aio object. The callback is called with
// the supplied argument when the operation is complete. If NULL is
diff --git a/src/core/device.c b/src/core/device.c
index 0fd23add..fee108a8 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -30,28 +30,29 @@ typedef struct nni_device_data {
int npath;
nni_device_path paths[2];
nni_mtx mtx;
- int running;
+ bool running;
} nni_device_data;
typedef struct nni_device_pair nni_device_pair;
static void
-nni_device_cancel(nni_aio *aio, int rv)
+nni_device_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_device_data *dd = nni_aio_get_prov_data(aio);
+ nni_device_data *dd = arg;
// cancellation is the only path to shutting it down.
nni_mtx_lock(&dd->mtx);
- if (dd->running == 0) {
+ if ((!dd->running) || (dd->user != aio)) {
nni_mtx_unlock(&dd->mtx);
return;
}
- dd->running = 0;
+ dd->running = false;
+ dd->user = NULL;
nni_mtx_unlock(&dd->mtx);
nni_sock_shutdown(dd->paths[0].src);
nni_sock_shutdown(dd->paths[0].dst);
- nni_aio_finish_error(dd->user, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
p->state = NNI_DEVICE_STATE_RECV;
nni_sock_recv(p->src, p->aio);
}
- dd->running = 1;
+ dd->running = true;
nni_mtx_unlock(&dd->mtx);
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index e58fe6f5..62a3893b 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq)
}
static void
-nni_msgq_cancel(nni_aio *aio, int rv)
+nni_msgq_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_msgq *mq = nni_aio_get_prov_data(aio);
+ nni_msgq *mq = arg;
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_list_active(aio)) {