aboutsummaryrefslogtreecommitdiff
path: root/src/core/aio.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-04 17:17:42 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-04 21:20:00 -0700
commitdc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch)
tree1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/core/aio.c
parent6887900ae033add30ee0151b72abe927c5239588 (diff)
downloadnng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2
nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak checks. This represents a complete rethink of how the AIOs work, and much simpler synchronization; the provider API is a bit simpler to boot, as a number of failure modes have been simply eliminated. While here a few other minor bugs were squashed.
Diffstat (limited to 'src/core/aio.c')
-rw-r--r--src/core/aio.c320
1 files changed, 141 insertions, 179 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index e6157786..792b63f2 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -11,22 +11,50 @@
#include "core/nng_impl.h"
#include <string.h>
-enum nni_aio_flags {
- NNI_AIO_INIT = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
-};
-
+static nni_mtx nni_aio_lk;
// These are used for expiration.
-static nni_mtx nni_aio_expire_mtx;
static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_exit;
-static nni_list nni_aio_expire_aios;
static nni_thr nni_aio_expire_thr;
-static nni_aio *nni_aio_expire_current;
+static nni_list nni_aio_expire_aios;
+
+// Design notes.
+//
+// AIOs are only ever "completed" by the provider, which must call
+// one of the nni_aio_finish variants. Until this occurs, the provider
+// guarantees that the AIO is valid. The provider must guarantee that
+// an AIO will be "completed" (with a call to nni_aio_finish & friends)
+// exactly once.
+//
+// Note that the cancellation routine may be called by the framework
+// several times. The framework (or the consumer) guarantees that the
+// AIO will remain valid across these calls, so that the provider is
+// free to examine the aio for list membership, etc. The provider must
+// not call finish more than once though.
+//
+// A single lock, nni_aio_lk, is used to protect the flags on the AIO,
+// as well as the expire list on the AIOs. We will not permit an AIO
+// to be marked done if an expiration is outstanding.
+//
+// In order to synchronize with the expiration, we set a flag when we
+// are going to cancel due to expiration, and then let the expiration
+// thread dispatch the notification to the user (after ensuring that
+// the provider is done with the aio.) This ensures that the completion
+// task will be dispatch *exactly* once, and only after nothing in
+// the provider or the framework is using it further. (The consumer
+// will probably still be using, but if the consumer calls nni_aio_wait
+// or nni_aio_stop, then the consumer will have exclusive access to it.
+// Provided, of course, that the consumer does not reuse the aio for
+// another operation in the callback.)
+//
+// In order to guard against aio reuse during teardown, we set a fini
+// flag. Any attempt to initialize for a new operation after that point
+// will fail and the caller will get NNG_ESTATE indicating this. The
+// provider that calls nni_aio_start() MUST check the return value, and
+// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// request and return.
static void nni_aio_expire_add(nni_aio *);
-static void nni_aio_expire_remove(nni_aio *);
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -34,15 +62,11 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
int rv;
memset(aio, 0, sizeof(*aio));
- if ((rv = nni_mtx_init(&aio->a_lk)) != 0) {
- return (rv);
- }
- if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) {
- nni_mtx_fini(&aio->a_lk);
+ if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) {
return (rv);
}
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_INIT;
+ aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
return (0);
@@ -55,7 +79,6 @@ nni_aio_fini(nni_aio *aio)
// At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
@@ -71,30 +94,23 @@ nni_aio_fini(nni_aio *aio)
void
nni_aio_stop(nni_aio *aio)
{
- if ((aio->a_flags & NNI_AIO_INIT) == 0) {
+ if (!aio->a_init) {
// Never initialized, so nothing should have happened.
return;
}
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_fini = 1;
+ nni_mtx_unlock(&nni_aio_lk);
nni_aio_cancel(aio, NNG_ECANCELED);
- // Wait for any outstanding task to complete. We won't schedule
- // new stuff because nni_aio_start will fail (due to AIO_FINI).
- nni_task_wait(&aio->a_task);
+ nni_aio_wait(aio);
}
int
nni_aio_result(nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&aio->a_lk);
- rv = aio->a_result;
- nni_mtx_unlock(&aio->a_lk);
- return (rv);
+ return (aio->a_result);
}
size_t
@@ -106,131 +122,116 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&aio->a_lk);
- while ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) == 0) {
+ nni_mtx_lock(&nni_aio_lk);
+ while ((aio->a_active) && (!aio->a_done)) {
nni_cv_wait(&aio->a_cv);
}
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_unlock(&nni_aio_lk);
nni_task_wait(&aio->a_task);
}
int
-nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
+nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags &= ~NNI_AIO_DONE;
- if (aio->a_flags & NNI_AIO_FINI) {
+ nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_fini) {
// We should not reschedule anything at this point.
- nni_mtx_unlock(&aio->a_lk);
+ aio->a_active = 0;
+ aio->a_result = NNG_ECANCELED;
+ nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
+ aio->a_done = 0;
+ aio->a_pend = 0;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancel;
+ aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
+ aio->a_active = 1;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
- nni_mtx_unlock(&aio->a_lk);
+ nni_mtx_unlock(&nni_aio_lk);
return (0);
}
+// nni_aio_cancel is called by a consumer which guarantees that the aio
+// is still valid.
void
nni_aio_cancel(nni_aio *aio, int rv)
{
- void (*cancelfn)(nni_aio *);
-
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // The operation already completed - so there's nothing
- // left for us to do.
- nni_mtx_unlock(&aio->a_lk);
- return;
- }
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = rv;
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
-
- aio->a_refcnt++;
- nni_mtx_unlock(&aio->a_lk);
+ nni_aio_cancelfn cancelfn;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&nni_aio_lk);
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
- cancelfn(aio);
+ cancelfn(aio, rv);
}
-
- nni_mtx_lock(&aio->a_lk);
-
- aio->a_refcnt--;
- nni_cv_wake(&aio->a_cv);
-
- // These should have already been cleared by the cancel function.
- aio->a_prov_data = NULL;
- aio->a_prov_cancel = NULL;
-
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
}
// I/O provider related functions.
-int
-nni_aio_finish(nni_aio *aio, int result, size_t count)
+static void
+nni_aio_finish_impl(
+ nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg)
{
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // Operation already done (canceled or timed out?)
- nni_mtx_unlock(&aio->a_lk);
- return (NNG_ESTATE);
- }
- aio->a_flags |= NNI_AIO_DONE;
+ nni_mtx_lock(&nni_aio_lk);
+ NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once*
+
+ nni_list_node_remove(&aio->a_expire_node);
+ aio->a_pend = 1;
aio->a_result = result;
aio->a_count = count;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
+ if (pipe) {
+ aio->a_pipe = pipe;
+ }
+ if (msg) {
+ aio->a_msg = msg;
+ }
- // This is guaranteed to just be a list operation at this point,
- // because done wasn't set.
- nni_aio_expire_remove(aio);
aio->a_expire = NNI_TIME_NEVER;
- nni_cv_wake(&aio->a_cv);
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
- return (0);
+ // If we are expiring, then we rely on the expiration thread to
+ // complete this; we must not because the expiration thread is
+ // still holding the reference.
+ if (!aio->a_expiring) {
+ aio->a_done = 1;
+ nni_cv_wake(&aio->a_cv);
+ nni_task_dispatch(&aio->a_task);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
}
-int
-nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
+void
+nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & NNI_AIO_DONE) {
- // Operation already done (canceled or timed out?)
- nni_mtx_unlock(&aio->a_lk);
- return (NNG_ESTATE);
- }
- aio->a_flags |= NNI_AIO_DONE;
+ nni_aio_finish_impl(aio, result, count, NULL, NULL);
+}
- aio->a_result = result;
- aio->a_count = 0;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
- aio->a_pipe = pipe;
+void
+nni_aio_finish_error(nni_aio *aio, int result)
+{
+ nni_aio_finish_impl(aio, result, 0, NULL, NULL);
+}
- // This is guaranteed to just be a list operation at this point,
- // because done wasn't set.
- nni_aio_expire_remove(aio);
- aio->a_expire = NNI_TIME_NEVER;
- nni_cv_wake(&aio->a_cv);
+void
+nni_aio_finish_pipe(nni_aio *aio, void *pipe)
+{
+ NNI_ASSERT(pipe != NULL);
+ nni_aio_finish_impl(aio, 0, 0, pipe, NULL);
+}
- nni_task_dispatch(&aio->a_task);
- nni_mtx_unlock(&aio->a_lk);
- return (0);
+void
+nni_aio_finish_msg(nni_aio *aio, nni_msg *msg)
+{
+ NNI_ASSERT(msg != NULL);
+ nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg);
}
void
@@ -261,12 +262,9 @@ nni_aio_list_active(nni_aio *aio)
static void
nni_aio_expire_add(nni_aio *aio)
{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
nni_list *list = &nni_aio_expire_aios;
nni_aio * naio;
- nni_mtx_lock(mtx);
// This is a reverse walk of the list. We're more likely to find
// a match at the end of the list.
for (naio = nni_list_last(list); naio != NULL;
@@ -280,105 +278,69 @@ nni_aio_expire_add(nni_aio *aio)
// This has the shortest time, so insert at the start.
nni_list_prepend(list, aio);
// And, as we are the latest, kick the thing.
- nni_cv_wake(cv);
+ nni_cv_wake(&nni_aio_expire_cv);
}
- nni_mtx_unlock(mtx);
-}
-
-static void
-nni_aio_expire_remove(nni_aio *aio)
-{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_list *list = &nni_aio_expire_aios;
-
- nni_mtx_lock(mtx);
- if (nni_list_active(list, aio)) {
- nni_list_remove(list, aio);
- }
- while (aio == nni_aio_expire_current) {
- nni_cv_wait(cv);
- }
- nni_mtx_unlock(mtx);
}
static void
nni_aio_expire_loop(void *arg)
{
- nni_mtx * mtx = &nni_aio_expire_mtx;
- nni_cv * cv = &nni_aio_expire_cv;
- nni_list *aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
-
- void (*cancelfn)(nni_aio *);
+ nni_list * aios = &nni_aio_expire_aios;
+ nni_aio * aio;
+ nni_time now;
+ nni_aio_cancelfn cancelfn;
NNI_ARG_UNUSED(arg);
for (;;) {
- nni_mtx_lock(mtx);
-
- // If we are resuming this loop after processing an AIO,
- // note that we are done with it, and wake anyone waiting
- // for that to clear up.
- if ((aio = nni_aio_expire_current) != NULL) {
- nni_aio_expire_current = NULL;
- nni_cv_wake(cv);
- }
+ nni_mtx_lock(&nni_aio_lk);
if (nni_aio_expire_exit) {
- nni_mtx_unlock(mtx);
+ nni_mtx_unlock(&nni_aio_lk);
return;
}
if ((aio = nni_list_first(aios)) == NULL) {
- nni_cv_wait(cv);
- nni_mtx_unlock(mtx);
+ nni_cv_wait(&nni_aio_expire_cv);
+ nni_mtx_unlock(&nni_aio_lk);
continue;
}
now = nni_clock();
if (now < aio->a_expire) {
// Unexpired; the list is ordered, so we just wait.
- nni_cv_until(cv, aio->a_expire);
- nni_mtx_unlock(mtx);
+ nni_cv_until(&nni_aio_expire_cv, aio->a_expire);
+ nni_mtx_unlock(&nni_aio_lk);
continue;
}
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
-
nni_list_remove(aios, aio);
- nni_aio_expire_current = aio;
- nni_mtx_unlock(mtx);
-
- cancelfn = NULL;
- nni_mtx_lock(&aio->a_lk);
- if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) {
- nni_mtx_unlock(&aio->a_lk);
- continue;
- }
+ // Mark it as expiring. This acts as a hold on
+ // the aio, similar to the consumers. The actual taskq
+ // dispatch on completion won't occur until this is cleared,
+ // and the done flag won't be set either.
+ aio->a_expiring = 1;
+ cancelfn = aio->a_prov_cancel;
- aio->a_flags |= NNI_AIO_DONE;
-
- aio->a_result = NNG_ETIMEDOUT;
- cancelfn = aio->a_prov_cancel;
- aio->a_prov_cancel = NULL;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
-
- // Cancel any outstanding activity.
+ // Cancel any outstanding activity. This is always non-NULL
+ // for a valid aio, and becomes NULL only when an AIO is
+ // already being canceled or finished.
if (cancelfn != NULL) {
- cancelfn(aio);
+ nni_mtx_unlock(&nni_aio_lk);
+ cancelfn(aio, NNG_ETIMEDOUT);
+ nni_mtx_lock(&nni_aio_lk);
}
- // Arguably we could avoid dispatching, and execute the
- // callback inline here as we are already on a separate
- // thread. But keeping it separate is clearer, and more
- // consistent with other uses. And this should not be a
- // hot code path.
+ NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
+ aio->a_expiring = 0;
+ aio->a_done = 1;
+ nni_cv_wake(&aio->a_cv);
nni_task_dispatch(&aio->a_task);
+ nni_mtx_unlock(&nni_aio_lk);
}
}
@@ -386,7 +348,7 @@ int
nni_aio_sys_init(void)
{
int rv;
- nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
@@ -409,7 +371,7 @@ fail:
void
nni_aio_sys_fini(void)
{
- nni_mtx *mtx = &nni_aio_expire_mtx;
+ nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;