aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-20 09:00:46 -0700
committerGarrett D'Amore <garrett@damore.org>2018-08-20 09:26:20 -0700
commit6e5f6a26beec0a44d25625cacb5095cdc7a94146 (patch)
tree7d6ef720b239a30d4906f1f2303fed77707b2761 /src/core
parent3e70013111b70f1439b2f9991211c887a8eefff3 (diff)
downloadnng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.tar.gz
nng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.tar.bz2
nng-6e5f6a26beec0a44d25625cacb5095cdc7a94146.zip
fixes #664 aio cancellation could be better
This changes the signature of the aio cancellation routines to take the argument for cancellation directly, so we do not need to lookup the argument using the nni_aio_get_prov_data. We should probably consider eliminating nni_aio_get_prov_data, and co, and changing the prov_extra to reflect prov_data. Later.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c103
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/device.c15
-rw-r--r--src/core/msgqueue.c4
4 files changed, 72 insertions, 52 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 0a9c744d..15abfe8d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -95,7 +95,8 @@ struct nng_aio {
void *a_outputs[4];
// Provider-use fields.
- nni_aio_cancelfn a_prov_cancel;
+ nni_aio_cancelfn a_cancel_fn;
+ void * a_cancel_arg;
void * a_prov_data;
nni_list_node a_prov_node;
void * a_prov_extra[4]; // Extra data used by provider
@@ -136,19 +137,22 @@ nni_aio_fini(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
// This is like aio_close, but we don't want to dispatch
// the task. And unlike aio_stop, we don't want to wait
// for the task. (Because we implicitly do task_fini.)
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
// Wait for the aio to be "done"; this ensures that we don't
@@ -217,16 +221,19 @@ void
nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECANCELED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECANCELED);
}
nni_aio_wait(aio);
@@ -237,16 +244,19 @@ void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- aio->a_stop = true;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
- if (cancelfn != NULL) {
- cancelfn(aio, NNG_ECLOSED);
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
}
}
}
@@ -347,18 +357,19 @@ nni_aio_begin(nni_aio *aio)
aio->a_result = NNG_ECANCELED;
aio->a_count = 0;
nni_list_node_remove(&aio->a_expire_node);
- aio->a_prov_cancel = NULL;
- aio->a_expire = NNI_TIME_NEVER;
- aio->a_sleep = false;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
nni_mtx_unlock(&nni_aio_lk);
nni_task_dispatch(aio->a_task);
return (NNG_ECANCELED);
}
- aio->a_result = 0;
- aio->a_count = 0;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
@@ -391,9 +402,9 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
return (NNG_ECLOSED);
}
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ NNI_ASSERT(aio->a_cancel_fn == NULL);
+ aio->a_cancel_fn = cancelfn;
+ aio->a_cancel_arg = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
@@ -407,16 +418,19 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
void
nni_aio_abort(nni_aio *aio, int rv)
{
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
+ void * arg;
nni_mtx_lock(&nni_aio_lk);
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
- if (cancelfn != NULL) {
- cancelfn(aio, rv);
+ if (fn != NULL) {
+ fn(aio, arg, rv);
}
}
@@ -430,9 +444,10 @@ nni_aio_finish_impl(
nni_list_node_remove(&aio->a_expire_node);
- aio->a_result = rv;
- aio->a_count = count;
- aio->a_prov_cancel = NULL;
+ aio->a_result = rv;
+ aio->a_count = count;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
if (msg) {
aio->a_msg = msg;
}
@@ -536,7 +551,7 @@ nni_aio_expire_loop(void *arg)
NNI_ARG_UNUSED(arg);
for (;;) {
- nni_aio_cancelfn cancelfn;
+ nni_aio_cancelfn fn;
nni_time now;
nni_aio * aio;
int rv;
@@ -569,8 +584,10 @@ nni_aio_expire_loop(void *arg)
nni_list_remove(aios, aio);
rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
- if ((cancelfn = aio->a_prov_cancel) != NULL) {
-
+ if ((fn = aio->a_cancel_fn) != NULL) {
+ void *arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
// Place a temporary hold on the aio. This prevents it
// from being destroyed.
nni_aio_expire_aio = aio;
@@ -580,7 +597,7 @@ nni_aio_expire_loop(void *arg)
// terminate the aio - we've tried, but it has to run
// to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
- cancelfn(aio, rv);
+ fn(aio, arg, rv);
nni_mtx_lock(&nni_aio_lk);
nni_aio_expire_aio = NULL;
@@ -666,8 +683,10 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
}
static void
-nni_sleep_cancel(nng_aio *aio, int rv)
+nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
{
+ NNI_ARG_UNUSED(arg);
+
nni_mtx_lock(&nni_aio_lk);
if (!aio->a_sleep) {
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/aio.h b/src/core/aio.h
index 5462c40b..0febc85e 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -18,7 +18,7 @@
typedef struct nni_aio_ops nni_aio_ops;
-typedef void (*nni_aio_cancelfn)(nni_aio *, int);
+typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int);
// nni_aio_init initializes an aio object. The callback is called with
// the supplied argument when the operation is complete. If NULL is
diff --git a/src/core/device.c b/src/core/device.c
index 0fd23add..fee108a8 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -30,28 +30,29 @@ typedef struct nni_device_data {
int npath;
nni_device_path paths[2];
nni_mtx mtx;
- int running;
+ bool running;
} nni_device_data;
typedef struct nni_device_pair nni_device_pair;
static void
-nni_device_cancel(nni_aio *aio, int rv)
+nni_device_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_device_data *dd = nni_aio_get_prov_data(aio);
+ nni_device_data *dd = arg;
// cancellation is the only path to shutting it down.
nni_mtx_lock(&dd->mtx);
- if (dd->running == 0) {
+ if ((!dd->running) || (dd->user != aio)) {
nni_mtx_unlock(&dd->mtx);
return;
}
- dd->running = 0;
+ dd->running = false;
+ dd->user = NULL;
nni_mtx_unlock(&dd->mtx);
nni_sock_shutdown(dd->paths[0].src);
nni_sock_shutdown(dd->paths[0].dst);
- nni_aio_finish_error(dd->user, rv);
+ nni_aio_finish_error(aio, rv);
}
static void
@@ -209,7 +210,7 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
p->state = NNI_DEVICE_STATE_RECV;
nni_sock_recv(p->src, p->aio);
}
- dd->running = 1;
+ dd->running = true;
nni_mtx_unlock(&dd->mtx);
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index e58fe6f5..62a3893b 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -315,9 +315,9 @@ nni_msgq_run_notify(nni_msgq *mq)
}
static void
-nni_msgq_cancel(nni_aio *aio, int rv)
+nni_msgq_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_msgq *mq = nni_aio_get_prov_data(aio);
+ nni_msgq *mq = arg;
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_list_active(aio)) {