aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c56
1 files changed, 33 insertions, 23 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 54ba7000..f039cdc8 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -60,11 +60,10 @@ static int nni_aio_expire_q_cnt;
// when we want to know if the operation itself is complete.
//
// In order to guard against aio reuse during teardown, we set the a_stop
-// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ECANCELED indicating this. The
-// provider that calls nni_aio_begin() MUST check the return value, and
-// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
-// request and return.
+// flag. Any attempt to submit new operation after that point will fail with
+// the status NNG_ESTOPPED indicating this. The provider that calls
+// nni_aio_begin() MUST check the return value, and if it comes back
+// NNG_ESTOPPED then it must simply discard the request and // return.
//
// Calling nni_aio_wait waits for the current outstanding operation to
// complete, but does not block another one from being started on the
@@ -118,7 +117,7 @@ nni_aio_fini(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
+ fn(aio, arg, NNG_ESTOPPED);
}
nni_task_fini(&aio->a_task);
@@ -201,7 +200,7 @@ nni_aio_stop(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECANCELED);
+ fn(aio, arg, NNG_ESTOPPED);
}
nni_aio_wait(aio);
@@ -226,7 +225,7 @@ nni_aio_close(nni_aio *aio)
nni_mtx_unlock(&eq->eq_mtx);
if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
+ fn(aio, arg, NNG_ESTOPPED);
}
}
}
@@ -353,12 +352,13 @@ nni_aio_begin(nni_aio *aio)
// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
- aio->a_result = NNG_ECANCELED;
+ aio->a_result = NNG_ESTOPPED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
+ aio->a_stop = true;
nni_mtx_unlock(&eq->eq_mtx);
- return (NNG_ECANCELED);
+ return (NNG_ESTOPPED);
}
nni_task_prep(&aio->a_task);
nni_mtx_unlock(&eq->eq_mtx);
@@ -386,15 +386,17 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
+ nni_mtx_unlock(&eq->eq_mtx);
+ return (NNG_ESTOPPED);
+ }
if (aio->a_abort) {
int rv = aio->a_result;
nni_mtx_unlock(&eq->eq_mtx);
+ NNI_ASSERT(rv != 0);
return (rv);
}
- if (aio->a_stop || eq->eq_stop) {
- nni_mtx_unlock(&eq->eq_mtx);
- return (NNG_ECLOSED);
- }
NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
@@ -434,22 +436,25 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
}
nni_mtx_lock(&eq->eq_mtx);
- if (timeout) {
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_result = NNG_ESTOPPED;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
aio->a_sleep = false;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
- if (aio->a_stop || eq->eq_stop) {
+ if (timeout) {
aio->a_sleep = false;
- aio->a_result = NNG_ECLOSED;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -499,22 +504,26 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);
nni_mtx_lock(&eq->eq_mtx);
- if (timeout) {
+ if (aio->a_stop || eq->eq_stop) {
+ aio->a_stop = true;
aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_result = NNG_ESTOPPED;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
aio->a_sleep = false;
+ aio->a_abort = false;
+ NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
- if (aio->a_stop || eq->eq_stop) {
+ if (timeout) {
aio->a_sleep = false;
- aio->a_result = NNG_ECLOSED;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_abort = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -767,7 +776,8 @@ nni_aio_expire_loop(void *arg)
for (uint32_t i = 0; i < exp_idx; i++) {
aio = expires[i];
if (q->eq_stop) {
- rv = NNG_ECANCELED;
+ rv = NNG_ESTOPPED;
+ aio->a_stop = true;
} else if (aio->a_expire_ok) {
aio->a_expire_ok = false;
rv = 0;