diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-04 17:17:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-04 21:20:00 -0700 |
| commit | dc334d7193a2a0bc0194221b853a37e1be7f5b9a (patch) | |
| tree | 1eebf2773745a3a25e8a071fbe4f51cd5490d4e4 /src/core/aio.c | |
| parent | 6887900ae033add30ee0151b72abe927c5239588 (diff) | |
| download | nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.gz nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.tar.bz2 nng-dc334d7193a2a0bc0194221b853a37e1be7f5b9a.zip | |
Refactor AIO logic to close numerous races and reduce complexity.
This passes valgrind 100% clean for both helgrind and deep leak
checks. This represents a complete rethink of how the AIOs work,
and much simpler synchronization; the provider API is a bit simpler
to boot, as a number of failure modes have been simply eliminated.
While here a few other minor bugs were squashed.
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 320 |
1 files changed, 141 insertions, 179 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index e6157786..792b63f2 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,22 +11,50 @@ #include "core/nng_impl.h" #include <string.h> -enum nni_aio_flags { - NNI_AIO_INIT = 0x1, - NNI_AIO_DONE = 0x2, - NNI_AIO_FINI = 0x4, -}; - +static nni_mtx nni_aio_lk; // These are used for expiration. -static nni_mtx nni_aio_expire_mtx; static nni_cv nni_aio_expire_cv; static int nni_aio_expire_exit; -static nni_list nni_aio_expire_aios; static nni_thr nni_aio_expire_thr; -static nni_aio *nni_aio_expire_current; +static nni_list nni_aio_expire_aios; + +// Design notes. +// +// AIOs are only ever "completed" by the provider, which must call +// one of the nni_aio_finish variants. Until this occurs, the provider +// guarantees that the AIO is valid. The provider must guarantee that +// an AIO will be "completed" (with a call to nni_aio_finish & friends) +// exactly once. +// +// Note that the cancellation routine may be called by the framework +// several times. The framework (or the consumer) guarantees that the +// AIO will remain valid across these calls, so that the provider is +// free to examine the aio for list membership, etc. The provider must +// not call finish more than once though. +// +// A single lock, nni_aio_lk, is used to protect the flags on the AIO, +// as well as the expire list on the AIOs. We will not permit an AIO +// to be marked done if an expiration is outstanding. +// +// In order to synchronize with the expiration, we set a flag when we +// are going to cancel due to expiration, and then let the expiration +// thread dispatch the notification to the user (after ensuring that +// the provider is done with the aio.) This ensures that the completion +// task will be dispatch *exactly* once, and only after nothing in +// the provider or the framework is using it further. (The consumer +// will probably still be using, but if the consumer calls nni_aio_wait +// or nni_aio_stop, then the consumer will have exclusive access to it. +// Provided, of course, that the consumer does not reuse the aio for +// another operation in the callback.) +// +// In order to guard against aio reuse during teardown, we set a fini +// flag. Any attempt to initialize for a new operation after that point +// will fail and the caller will get NNG_ESTATE indicating this. The +// provider that calls nni_aio_start() MUST check the return value, and +// if it comes back nonzero (NNG_ESTATE) then it must simply discard the +// request and return. static void nni_aio_expire_add(nni_aio *); -static void nni_aio_expire_remove(nni_aio *); int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -34,15 +62,11 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) int rv; memset(aio, 0, sizeof(*aio)); - if ((rv = nni_mtx_init(&aio->a_lk)) != 0) { - return (rv); - } - if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) { - nni_mtx_fini(&aio->a_lk); + if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) { return (rv); } aio->a_expire = NNI_TIME_NEVER; - aio->a_flags = NNI_AIO_INIT; + aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); return (0); @@ -55,7 +79,6 @@ nni_aio_fini(nni_aio *aio) // At this point the AIO is done. nni_cv_fini(&aio->a_cv); - nni_mtx_fini(&aio->a_lk); if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) { NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs); @@ -71,30 +94,23 @@ nni_aio_fini(nni_aio *aio) void nni_aio_stop(nni_aio *aio) { - if ((aio->a_flags & NNI_AIO_INIT) == 0) { + if (!aio->a_init) { // Never initialized, so nothing should have happened. return; } - nni_mtx_lock(&aio->a_lk); - aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled - nni_mtx_unlock(&aio->a_lk); + nni_mtx_lock(&nni_aio_lk); + aio->a_fini = 1; + nni_mtx_unlock(&nni_aio_lk); nni_aio_cancel(aio, NNG_ECANCELED); - // Wait for any outstanding task to complete. We won't schedule - // new stuff because nni_aio_start will fail (due to AIO_FINI). - nni_task_wait(&aio->a_task); + nni_aio_wait(aio); } int nni_aio_result(nni_aio *aio) { - int rv; - - nni_mtx_lock(&aio->a_lk); - rv = aio->a_result; - nni_mtx_unlock(&aio->a_lk); - return (rv); + return (aio->a_result); } size_t @@ -106,131 +122,116 @@ nni_aio_count(nni_aio *aio) void nni_aio_wait(nni_aio *aio) { - nni_mtx_lock(&aio->a_lk); - while ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) == 0) { + nni_mtx_lock(&nni_aio_lk); + while ((aio->a_active) && (!aio->a_done)) { nni_cv_wait(&aio->a_cv); } - nni_mtx_unlock(&aio->a_lk); + nni_mtx_unlock(&nni_aio_lk); nni_task_wait(&aio->a_task); } int -nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) +nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { - nni_mtx_lock(&aio->a_lk); - aio->a_flags &= ~NNI_AIO_DONE; - if (aio->a_flags & NNI_AIO_FINI) { + nni_mtx_lock(&nni_aio_lk); + if (aio->a_fini) { // We should not reschedule anything at this point. - nni_mtx_unlock(&aio->a_lk); + aio->a_active = 0; + aio->a_result = NNG_ECANCELED; + nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } + aio->a_done = 0; + aio->a_pend = 0; aio->a_result = 0; aio->a_count = 0; - aio->a_prov_cancel = cancel; + aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; + aio->a_active = 1; if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } - nni_mtx_unlock(&aio->a_lk); + nni_mtx_unlock(&nni_aio_lk); return (0); } +// nni_aio_cancel is called by a consumer which guarantees that the aio +// is still valid. void nni_aio_cancel(nni_aio *aio, int rv) { - void (*cancelfn)(nni_aio *); - - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // The operation already completed - so there's nothing - // left for us to do. - nni_mtx_unlock(&aio->a_lk); - return; - } - aio->a_flags |= NNI_AIO_DONE; - aio->a_result = rv; - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - - aio->a_refcnt++; - nni_mtx_unlock(&aio->a_lk); + nni_aio_cancelfn cancelfn; - // Guaraneed to just be a list operation. - nni_aio_expire_remove(aio); + nni_mtx_lock(&nni_aio_lk); + cancelfn = aio->a_prov_cancel; + nni_mtx_unlock(&nni_aio_lk); // Stop any I/O at the provider level. if (cancelfn != NULL) { - cancelfn(aio); + cancelfn(aio, rv); } - - nni_mtx_lock(&aio->a_lk); - - aio->a_refcnt--; - nni_cv_wake(&aio->a_cv); - - // These should have already been cleared by the cancel function. - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; - - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); } // I/O provider related functions. -int -nni_aio_finish(nni_aio *aio, int result, size_t count) +static void +nni_aio_finish_impl( + nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg) { - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // Operation already done (canceled or timed out?) - nni_mtx_unlock(&aio->a_lk); - return (NNG_ESTATE); - } - aio->a_flags |= NNI_AIO_DONE; + nni_mtx_lock(&nni_aio_lk); + NNI_ASSERT(aio->a_pend == 0); // provider only calls us *once* + + nni_list_node_remove(&aio->a_expire_node); + aio->a_pend = 1; aio->a_result = result; aio->a_count = count; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; + if (pipe) { + aio->a_pipe = pipe; + } + if (msg) { + aio->a_msg = msg; + } - // This is guaranteed to just be a list operation at this point, - // because done wasn't set. - nni_aio_expire_remove(aio); aio->a_expire = NNI_TIME_NEVER; - nni_cv_wake(&aio->a_cv); - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); - return (0); + // If we are expiring, then we rely on the expiration thread to + // complete this; we must not because the expiration thread is + // still holding the reference. + if (!aio->a_expiring) { + aio->a_done = 1; + nni_cv_wake(&aio->a_cv); + nni_task_dispatch(&aio->a_task); + } + nni_mtx_unlock(&nni_aio_lk); } -int -nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) +void +nni_aio_finish(nni_aio *aio, int result, size_t count) { - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & NNI_AIO_DONE) { - // Operation already done (canceled or timed out?) - nni_mtx_unlock(&aio->a_lk); - return (NNG_ESTATE); - } - aio->a_flags |= NNI_AIO_DONE; + nni_aio_finish_impl(aio, result, count, NULL, NULL); +} - aio->a_result = result; - aio->a_count = 0; - aio->a_prov_cancel = NULL; - aio->a_prov_data = NULL; - aio->a_pipe = pipe; +void +nni_aio_finish_error(nni_aio *aio, int result) +{ + nni_aio_finish_impl(aio, result, 0, NULL, NULL); +} - // This is guaranteed to just be a list operation at this point, - // because done wasn't set. - nni_aio_expire_remove(aio); - aio->a_expire = NNI_TIME_NEVER; - nni_cv_wake(&aio->a_cv); +void +nni_aio_finish_pipe(nni_aio *aio, void *pipe) +{ + NNI_ASSERT(pipe != NULL); + nni_aio_finish_impl(aio, 0, 0, pipe, NULL); +} - nni_task_dispatch(&aio->a_task); - nni_mtx_unlock(&aio->a_lk); - return (0); +void +nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) +{ + NNI_ASSERT(msg != NULL); + nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg); } void @@ -261,12 +262,9 @@ nni_aio_list_active(nni_aio *aio) static void nni_aio_expire_add(nni_aio *aio) { - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; nni_list *list = &nni_aio_expire_aios; nni_aio * naio; - nni_mtx_lock(mtx); // This is a reverse walk of the list. We're more likely to find // a match at the end of the list. for (naio = nni_list_last(list); naio != NULL; @@ -280,105 +278,69 @@ nni_aio_expire_add(nni_aio *aio) // This has the shortest time, so insert at the start. nni_list_prepend(list, aio); // And, as we are the latest, kick the thing. - nni_cv_wake(cv); + nni_cv_wake(&nni_aio_expire_cv); } - nni_mtx_unlock(mtx); -} - -static void -nni_aio_expire_remove(nni_aio *aio) -{ - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; - nni_list *list = &nni_aio_expire_aios; - - nni_mtx_lock(mtx); - if (nni_list_active(list, aio)) { - nni_list_remove(list, aio); - } - while (aio == nni_aio_expire_current) { - nni_cv_wait(cv); - } - nni_mtx_unlock(mtx); } static void nni_aio_expire_loop(void *arg) { - nni_mtx * mtx = &nni_aio_expire_mtx; - nni_cv * cv = &nni_aio_expire_cv; - nni_list *aios = &nni_aio_expire_aios; - nni_aio * aio; - nni_time now; - - void (*cancelfn)(nni_aio *); + nni_list * aios = &nni_aio_expire_aios; + nni_aio * aio; + nni_time now; + nni_aio_cancelfn cancelfn; NNI_ARG_UNUSED(arg); for (;;) { - nni_mtx_lock(mtx); - - // If we are resuming this loop after processing an AIO, - // note that we are done with it, and wake anyone waiting - // for that to clear up. - if ((aio = nni_aio_expire_current) != NULL) { - nni_aio_expire_current = NULL; - nni_cv_wake(cv); - } + nni_mtx_lock(&nni_aio_lk); if (nni_aio_expire_exit) { - nni_mtx_unlock(mtx); + nni_mtx_unlock(&nni_aio_lk); return; } if ((aio = nni_list_first(aios)) == NULL) { - nni_cv_wait(cv); - nni_mtx_unlock(mtx); + nni_cv_wait(&nni_aio_expire_cv); + nni_mtx_unlock(&nni_aio_lk); continue; } now = nni_clock(); if (now < aio->a_expire) { // Unexpired; the list is ordered, so we just wait. - nni_cv_until(cv, aio->a_expire); - nni_mtx_unlock(mtx); + nni_cv_until(&nni_aio_expire_cv, aio->a_expire); + nni_mtx_unlock(&nni_aio_lk); continue; } // This aio's time has come. Expire it, canceling any // outstanding I/O. - nni_list_remove(aios, aio); - nni_aio_expire_current = aio; - nni_mtx_unlock(mtx); - - cancelfn = NULL; - nni_mtx_lock(&aio->a_lk); - if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) { - nni_mtx_unlock(&aio->a_lk); - continue; - } + // Mark it as expiring. This acts as a hold on + // the aio, similar to the consumers. The actual taskq + // dispatch on completion won't occur until this is cleared, + // and the done flag won't be set either. + aio->a_expiring = 1; + cancelfn = aio->a_prov_cancel; - aio->a_flags |= NNI_AIO_DONE; - - aio->a_result = NNG_ETIMEDOUT; - cancelfn = aio->a_prov_cancel; - aio->a_prov_cancel = NULL; - nni_cv_wake(&aio->a_cv); - nni_mtx_unlock(&aio->a_lk); - - // Cancel any outstanding activity. + // Cancel any outstanding activity. This is always non-NULL + // for a valid aio, and becomes NULL only when an AIO is + // already being canceled or finished. if (cancelfn != NULL) { - cancelfn(aio); + nni_mtx_unlock(&nni_aio_lk); + cancelfn(aio, NNG_ETIMEDOUT); + nni_mtx_lock(&nni_aio_lk); } - // Arguably we could avoid dispatching, and execute the - // callback inline here as we are already on a separate - // thread. But keeping it separate is clearer, and more - // consistent with other uses. And this should not be a - // hot code path. + NNI_ASSERT(aio->a_pend); // nni_aio_finish was run + NNI_ASSERT(aio->a_prov_cancel == NULL); + aio->a_expiring = 0; + aio->a_done = 1; + nni_cv_wake(&aio->a_cv); nni_task_dispatch(&aio->a_task); + nni_mtx_unlock(&nni_aio_lk); } } @@ -386,7 +348,7 @@ int nni_aio_sys_init(void) { int rv; - nni_mtx *mtx = &nni_aio_expire_mtx; + nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; @@ -409,7 +371,7 @@ fail: void nni_aio_sys_fini(void) { - nni_mtx *mtx = &nni_aio_expire_mtx; + nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; |
