aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c53
-rw-r--r--src/core/aio.h6
-rw-r--r--src/core/stream.c2
3 files changed, 49 insertions, 12 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index f039cdc8..87c79d0d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -356,6 +356,7 @@ nni_aio_begin(nni_aio *aio)
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
@@ -365,6 +366,20 @@ nni_aio_begin(nni_aio *aio)
return (0);
}
+void
+nni_aio_reset(nni_aio *aio)
+{
+ aio->a_result = 0;
+ aio->a_count = 0;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_sleep = false;
+
+ for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
+ aio->a_outputs[i] = NULL;
+ }
+}
+
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
@@ -387,7 +402,8 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
+ aio->a_stop = true;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
return (NNG_ESTOPPED);
}
@@ -437,9 +453,10 @@ nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_mtx_lock(&eq->eq_mtx);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -496,6 +513,10 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
} else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
timeout = true;
}
+ if (!aio->a_sleep) {
+ aio->a_expire_ok = false;
+ }
+ aio->a_result = 0;
// Do this outside the lock. Note that we don't strictly need to have
// done this for the failure cases below (the task framework does the
@@ -504,26 +525,33 @@ nni_aio_start(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
nni_task_prep(&aio->a_task);
nni_mtx_lock(&eq->eq_mtx);
+ NNI_ASSERT(!aio->a_stopped);
if (aio->a_stop || eq->eq_stop) {
- aio->a_stop = true;
- aio->a_sleep = false;
- aio->a_result = NNG_ESTOPPED;
+ aio->a_stop = true;
+ aio->a_sleep = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
+ aio->a_result = NNG_ESTOPPED;
+ aio->a_stopped = true;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (aio->a_abort) {
- aio->a_sleep = false;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_abort = false;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
NNI_ASSERT(aio->a_result != 0);
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}
if (timeout) {
- aio->a_sleep = false;
- aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
- aio->a_abort = false;
+ aio->a_sleep = false;
+ aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
+ aio->a_expire_ok = false;
+ aio->a_count = 0;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
@@ -901,6 +929,7 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ nni_aio_reset(aio);
aio->a_expire_ok = true;
aio->a_sleep = true;
switch (aio->a_timeout) {
diff --git a/src/core/aio.h b/src/core/aio.h
index b09b6e49..d909853f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -175,6 +175,11 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);
// or was canceled before this call (but after nni_aio_begin).
extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);
+// nni_aio_reset is called by providers before doing any work -- it resets
+// counts other fields to their initial state. It will not reset the closed
+// state if the aio has been stopped or closed.
+extern void nni_aio_reset(nni_aio *);
+
// nni_aio_start should be called before any asynchronous operation
// is filed. It need not be called for completions that are synchronous
// at job submission.
@@ -230,6 +235,7 @@ struct nng_aio {
bool a_use_expire; // Use expire instead of timeout
bool a_abort; // Task was aborted
bool a_init; // Initialized this
+ bool a_stopped; // Debug - set when we finish stopped
nni_task a_task;
// Read/write operations.
diff --git a/src/core/stream.c b/src/core/stream.c
index 16e11eca..2c7a9ff3 100644
--- a/src/core/stream.c
+++ b/src/core/stream.c
@@ -140,12 +140,14 @@ nng_stream_free(nng_stream *s)
void
nng_stream_send(nng_stream *s, nng_aio *aio)
{
+ nni_aio_reset(aio);
s->s_send(s, aio);
}
void
nng_stream_recv(nng_stream *s, nng_aio *aio)
{
+ nni_aio_reset(aio);
s->s_recv(s, aio);
}