aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c84
1 files changed, 33 insertions, 51 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a9fbc50d..620a865d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,10 +12,9 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
- NNI_AIO_START = 0x8,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
};
// These are used for expiration.
@@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_START;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- void (*cancelfn)(nni_aio *);
+ nni_aio_stop(aio);
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- if ((aio->a_flags & NNI_AIO_DONE) == 0) {
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
- cancelfn = aio->a_prov_cancel;
- if (aio->a_flags & NNI_AIO_START) {
- aio->a_flags &= ~NNI_AIO_START;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+ nni_mtx_fini(&aio->a_lk);
- } else {
- cancelfn = NULL;
+ if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
+ NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
}
- nni_cv_wake(&aio->a_cv);
+}
- while (aio->a_refcnt != 0) {
- nni_cv_wait(&aio->a_cv);
+// nni_aio_stop cancels any oustanding operation, and waits for the
+// callback to complete, if still running. It also marks the AIO as
+// stopped, preventing further calls to nni_aio_start from succeeding.
+// To correctly tear down an AIO, call stop, and make sure any other
+// calles are not also stopped, before calling nni_aio_fini to release
+// actual memory.
+void
+nni_aio_stop(nni_aio *aio)
+{
+ if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ // Never initialized, so nothing should have happened.
+ return;
}
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
nni_mtx_unlock(&aio->a_lk);
- // Stop any timeouts. If one was in flight, we wait until it
- // completes (it could fire the completion callback.)
- nni_aio_expire_remove(aio);
-
- // Cancel the AIO if it was scheduled.
- if (cancelfn != NULL) {
- cancelfn(aio);
- }
+ nni_aio_cancel(aio, NNG_ECANCELED);
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
nni_taskq_wait(NULL, &aio->a_tqe);
-
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
-
- if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
- NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
- }
}
int
@@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv)
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
-
aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
+
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & NNI_AIO_FINI)) {
- // If we are finalizing, then we are done.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -213,13 +198,12 @@ int
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = count;
@@ -240,13 +224,12 @@ int
nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = 0;
@@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;