aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
commita37093079b492e966344416445aae354b147d30e (patch)
tree2f21fc2bc716f2423ba02f4713b25038c429ec4e /src/core/aio.c
parent88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff)
downloadnng-a37093079b492e966344416445aae354b147d30e.tar.gz
nng-a37093079b492e966344416445aae354b147d30e.tar.bz2
nng-a37093079b492e966344416445aae354b147d30e.zip
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c84
1 files changed, 33 insertions, 51 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a9fbc50d..620a865d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,10 +12,9 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
- NNI_AIO_START = 0x8,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
};
// These are used for expiration.
@@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_START;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- void (*cancelfn)(nni_aio *);
+ nni_aio_stop(aio);
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- if ((aio->a_flags & NNI_AIO_DONE) == 0) {
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
- cancelfn = aio->a_prov_cancel;
- if (aio->a_flags & NNI_AIO_START) {
- aio->a_flags &= ~NNI_AIO_START;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+ nni_mtx_fini(&aio->a_lk);
- } else {
- cancelfn = NULL;
+ if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
+ NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
}
- nni_cv_wake(&aio->a_cv);
+}
- while (aio->a_refcnt != 0) {
- nni_cv_wait(&aio->a_cv);
+// nni_aio_stop cancels any oustanding operation, and waits for the
+// callback to complete, if still running. It also marks the AIO as
+// stopped, preventing further calls to nni_aio_start from succeeding.
+// To correctly tear down an AIO, call stop, and make sure any other
+// calles are not also stopped, before calling nni_aio_fini to release
+// actual memory.
+void
+nni_aio_stop(nni_aio *aio)
+{
+ if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ // Never initialized, so nothing should have happened.
+ return;
}
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
nni_mtx_unlock(&aio->a_lk);
- // Stop any timeouts. If one was in flight, we wait until it
- // completes (it could fire the completion callback.)
- nni_aio_expire_remove(aio);
-
- // Cancel the AIO if it was scheduled.
- if (cancelfn != NULL) {
- cancelfn(aio);
- }
+ nni_aio_cancel(aio, NNG_ECANCELED);
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
nni_taskq_wait(NULL, &aio->a_tqe);
-
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
-
- if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
- NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
- }
}
int
@@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv)
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
-
aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
+
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & NNI_AIO_FINI)) {
- // If we are finalizing, then we are done.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -213,13 +198,12 @@ int
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = count;
@@ -240,13 +224,12 @@ int
nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = 0;
@@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;