aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c96
-rw-r--r--src/core/aio.h1
2 files changed, 53 insertions, 44 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 8f405ac7..d003c251 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -87,6 +87,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
nni_task_init(&aio->a_task, NULL, cb, arg);
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
+ aio->a_init = true;
aio->a_expire_q =
nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt];
}
@@ -94,33 +95,35 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- nni_aio_cancel_fn fn;
- void *arg;
- nni_aio_expire_q *eq = aio->a_expire_q;
+ if (aio != NULL && aio->a_init) {
+ nni_aio_cancel_fn fn;
+ void *arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
- // This is like aio_close, but we don't want to dispatch
- // the task. And unlike aio_stop, we don't want to wait
- // for the task. (Because we implicitly do task_fini.)
- // We also wait if the aio is being expired.
- nni_mtx_lock(&eq->eq_mtx);
- aio->a_stop = true;
- while (aio->a_expiring) {
- nni_cv_wait(&eq->eq_cv);
- }
- nni_aio_expire_rm(aio);
- fn = aio->a_cancel_fn;
- arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- nni_mtx_unlock(&eq->eq_mtx);
+ // This is like aio_close, but we don't want to dispatch
+ // the task. And unlike aio_stop, we don't want to wait
+ // for the task. (Because we implicitly do task_fini.)
+ // We also wait if the aio is being expired.
+ nni_mtx_lock(&eq->eq_mtx);
+ aio->a_stop = true;
+ while (aio->a_expiring) {
+ nni_cv_wait(&eq->eq_cv);
+ }
+ nni_aio_expire_rm(aio);
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ nni_mtx_unlock(&eq->eq_mtx);
- if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
- } else {
- nni_task_abort(&aio->a_task);
- }
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
+ } else {
+ nni_task_abort(&aio->a_task);
+ }
- nni_task_fini(&aio->a_task);
+ nni_task_fini(&aio->a_task);
+ }
}
int
@@ -148,7 +151,7 @@ nni_aio_free(nni_aio *aio)
void
nni_aio_reap(nni_aio *aio)
{
- if (aio != NULL) {
+ if (aio != NULL && aio->a_init) {
nni_reap(&aio_reap_list, aio);
}
}
@@ -181,13 +184,13 @@ nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
void
nni_aio_stop(nni_aio *aio)
{
- if (aio != NULL) {
+ if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
nni_mtx_lock(&eq->eq_mtx);
- aio->a_stop = true;
+ aio->a_stop = true;
while (aio->a_expiring) {
nni_cv_wait(&eq->eq_cv);
}
@@ -211,7 +214,7 @@ nni_aio_stop(nni_aio *aio)
void
nni_aio_close(nni_aio *aio)
{
- if (aio != NULL) {
+ if (aio != NULL && aio->a_init) {
nni_aio_cancel_fn fn;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;
@@ -314,7 +317,9 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_task_wait(&aio->a_task);
+ if (aio != NULL && aio->a_expire_q != NULL) {
+ nni_task_wait(&aio->a_task);
+ }
}
bool
@@ -333,6 +338,7 @@ nni_aio_begin(nni_aio *aio)
// checks may wish ignore or suppress these checks.
nni_aio_expire_q *eq = aio->a_expire_q;
+ NNI_ASSERT(aio->a_init);
nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
@@ -409,23 +415,25 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
void
nni_aio_abort(nni_aio *aio, int rv)
{
- nni_aio_cancel_fn fn;
- void *arg;
- nni_aio_expire_q *eq = aio->a_expire_q;
+ if (aio != NULL && aio->a_init) {
+ nni_aio_cancel_fn fn;
+ void *arg;
+ nni_aio_expire_q *eq = aio->a_expire_q;
- nni_mtx_lock(&eq->eq_mtx);
- nni_aio_expire_rm(aio);
- fn = aio->a_cancel_fn;
- arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- nni_mtx_unlock(&eq->eq_mtx);
+ nni_mtx_lock(&eq->eq_mtx);
+ nni_aio_expire_rm(aio);
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ nni_mtx_unlock(&eq->eq_mtx);
- // Stop any I/O at the provider level.
- if (fn != NULL) {
- fn(aio, arg, rv);
- } else {
- nni_task_abort(&aio->a_task);
+ // Stop any I/O at the provider level.
+ if (fn != NULL) {
+ fn(aio, arg, rv);
+ } else {
+ nni_task_abort(&aio->a_task);
+ }
}
}
diff --git a/src/core/aio.h b/src/core/aio.h
index f56d2f58..5f7ddb69 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -214,6 +214,7 @@ struct nng_aio {
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
+ bool a_init; // Initialized this
nni_task a_task;
// Read/write operations.