diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-16 01:02:09 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-16 01:02:09 -0700 |
| commit | 89c4ae87f3d927c8a6a760b6706b3a8e386dfb55 (patch) | |
| tree | 7fa84fbd8076ac262dcc6836653968276738ad18 /src/core/aio.c | |
| parent | a1d237059c652c9e36117eed3e6387dcae128174 (diff) | |
| download | nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.gz nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.bz2 nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.zip | |
AIO timeouts work correctly now, using their own timer logic.
We closed a few subtle races in the AIO subsystem as well, and now
we were able to eliminate the separate timer handling the MQ code.
There appear to be some opportunities to further enhance the code
for MQs as well -- eventually probably the only access to MQs will
be with AIOs.
Diffstat (limited to 'src/core/aio.c')
| -rw-r--r-- | src/core/aio.c | 258 |
1 files changed, 211 insertions, 47 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index ba09b608..73c1dd9b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,10 +11,22 @@ #include "core/nng_impl.h" #include <string.h> -#define NNI_AIO_WAKE (1 << 0) -#define NNI_AIO_DONE (1 << 1) -#define NNI_AIO_FINI (1 << 2) -#define NNI_AIO_STOP (1 << 3) +enum nni_aio_flags { + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, +}; + +// These are used for expiration. +static nni_mtx nni_aio_expire_mtx; +static nni_cv nni_aio_expire_cv; +static int nni_aio_expire_exit; +static nni_list nni_aio_expire_aios; +static nni_thr nni_aio_expire_thr; +static nni_aio *nni_aio_expire_current; + +static void nni_aio_expire_add(nni_aio *); +static void nni_aio_expire_remove(nni_aio *); int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -53,12 +65,20 @@ nni_aio_fini(nni_aio *aio) aio->a_flags |= NNI_AIO_DONE; aio->a_result = NNG_ECANCELED; cancelfn = aio->a_prov_cancel; + } else { cancelfn = NULL; } - nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any + nni_cv_wake(&aio->a_cv); + + while (aio->a_refcnt != 0) { + nni_cv_wait(&aio->a_cv); + } nni_mtx_unlock(&aio->a_lk); + // just a list operation at this point. + nni_aio_expire_remove(aio); + // Cancel the AIO if it was scheduled. if (cancelfn != NULL) { cancelfn(aio); @@ -122,7 +142,7 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) nni_mtx_lock(&aio->a_lk); aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE); - if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) { + if (aio->a_flags & NNI_AIO_FINI) { // We should not reschedule anything at this point. nni_mtx_unlock(&aio->a_lk); return (NNG_ECANCELED); @@ -131,40 +151,11 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) aio->a_count = 0; aio->a_prov_cancel = cancel; aio->a_prov_data = data; - nni_mtx_unlock(&aio->a_lk); - return (0); -} - -// XXX: REMOVE ME... -void -nni_aio_stop(nni_aio *aio) -{ - void (*cancelfn)(nni_aio *); - - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) { - nni_mtx_unlock(&aio->a_lk); - return; - } - aio->a_result = NNG_ECANCELED; - aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP; - cancelfn = aio->a_prov_cancel; - nni_mtx_unlock(&aio->a_lk); - - // This unregisters the AIO from the provider. - if (cancelfn != NULL) { - cancelfn(aio); + if (aio->a_expire != NNI_TIME_NEVER) { + nni_aio_expire_add(aio); } - - nni_mtx_lock(&aio->a_lk); - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; - nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); - - // This either aborts the task, or waits for it to complete if already - // dispatched. - nni_taskq_cancel(NULL, &aio->a_tqe); + return (0); } void @@ -180,10 +171,14 @@ nni_aio_cancel(nni_aio *aio, int rv) return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_result = rv; - cancelfn = aio->a_prov_cancel; + aio->a_result = rv; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); - // XXX: Think about the synchronization with nni_aio_fini... + aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); // Stop any I/O at the provider level. @@ -192,16 +187,20 @@ nni_aio_cancel(nni_aio *aio, int rv) } nni_mtx_lock(&aio->a_lk); + + aio->a_refcnt--; + if (aio->a_refcnt == 0) { + nni_cv_wake(&aio->a_cv); + } + // These should have already been cleared by the cancel function. aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - // XXX: mark unbusy if (!(aio->a_flags & NNI_AIO_FINI)) { // If we are finalizing, then we are done. nni_taskq_dispatch(NULL, &aio->a_tqe); } - // XXX: else wake aio_cv.. because there is someone watching. nni_mtx_unlock(&aio->a_lk); } @@ -222,10 +221,12 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think. - if (!(aio->a_flags & NNI_AIO_STOP)) { - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // This is guaranteed to just be a list operation at this point, + // because done wasn't set. + nni_aio_expire_remove(aio); + aio->a_expire = NNI_TIME_NEVER; + + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -253,3 +254,166 @@ nni_aio_list_active(nni_aio *aio) { return (nni_list_node_active(&aio->a_prov_node)); } + +static void +nni_aio_expire_add(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + nni_aio * naio; + + nni_mtx_lock(mtx); + // This is a reverse walk of the list. We're more likely to find + // a match at the end of the list. + for (naio = nni_list_last(list); naio != NULL; + naio = nni_list_prev(list, naio)) { + if (aio->a_expire >= naio->a_expire) { + nni_list_insert_after(list, aio, naio); + break; + } + } + if (naio == NULL) { + // This has the shortest time, so insert at the start. + nni_list_prepend(list, aio); + // And, as we are the latest, kick the thing. + nni_cv_wake(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_remove(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + + nni_mtx_lock(mtx); + if (nni_list_active(list, aio)) { + nni_list_remove(list, aio); + } + while (aio == nni_aio_expire_current) { + nni_cv_wait(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_loop(void *arg) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *aios = &nni_aio_expire_aios; + nni_aio * aio; + nni_time now; + int rv; + void (*cancelfn)(nni_aio *); + + NNI_ARG_UNUSED(arg); + + for (;;) { + nni_mtx_lock(mtx); + + // If we are resuming this loop after processing an AIO, + // note that we are done with it, and wake anyone waiting + // for that to clear up. + if ((aio = nni_aio_expire_current) != NULL) { + nni_aio_expire_current = NULL; + nni_cv_wake(cv); + } + + if (nni_aio_expire_exit) { + nni_mtx_unlock(mtx); + return; + } + + if ((aio = nni_list_first(aios)) == NULL) { + nni_cv_wait(cv); + nni_mtx_unlock(mtx); + continue; + } + + now = nni_clock(); + if (now < aio->a_expire) { + // Unexpired; the list is ordered, so we just wait. + nni_cv_until(cv, aio->a_expire); + nni_mtx_unlock(mtx); + continue; + } + + // This aio's time has come. Expire it, canceling any + // outstanding I/O. + + nni_list_remove(aios, aio); + nni_aio_expire_current = aio; + nni_mtx_unlock(mtx); + + cancelfn = NULL; + + nni_mtx_lock(&aio->a_lk); + if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) { + nni_mtx_unlock(&aio->a_lk); + continue; + } + + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ETIMEDOUT; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + nni_mtx_unlock(&aio->a_lk); + + // Cancel any outstanding activity. + if (cancelfn != NULL) { + cancelfn(aio); + } + + // Arguably we could avoid dispatching, and execute the + // callback inline here as we are already on a separate + // thread. But keeping it separate is clearer, and more + // consistent with other uses. And this should not be a + // hot code path. + nni_taskq_dispatch(NULL, &aio->a_tqe); + } +} + +int +nni_aio_sys_init(void) +{ + int rv; + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + if (((rv = nni_mtx_init(mtx)) != 0) || + ((rv = nni_cv_init(cv, mtx)) != 0) || + ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) { + goto fail; + } + NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); + nni_thr_run(thr); + return (0); + +fail: + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); + return (rv); +} + +void +nni_aio_sys_fini(void) +{ + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + nni_mtx_lock(mtx); + nni_aio_expire_exit = 1; + nni_cv_wake(cv); + nni_mtx_unlock(mtx); + + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); +}
\ No newline at end of file |
