From 89c4ae87f3d927c8a6a760b6706b3a8e386dfb55 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 16 Jul 2017 01:02:09 -0700 Subject: AIO timeouts work correctly now, using their own timer logic. We closed a few subtle races in the AIO subsystem as well, and now we were able to eliminate the separate timer handling the MQ code. There appear to be some opportunities to further enhance the code for MQs as well -- eventually probably the only access to MQs will be with AIOs. --- src/core/aio.c | 258 ++++++++++++++++++++++++++++++++++++++++++---------- src/core/aio.h | 5 + src/core/init.c | 13 ++- src/core/msgqueue.c | 87 +++--------------- 4 files changed, 240 insertions(+), 123 deletions(-) (limited to 'src') diff --git a/src/core/aio.c b/src/core/aio.c index ba09b608..73c1dd9b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,10 +11,22 @@ #include "core/nng_impl.h" #include -#define NNI_AIO_WAKE (1 << 0) -#define NNI_AIO_DONE (1 << 1) -#define NNI_AIO_FINI (1 << 2) -#define NNI_AIO_STOP (1 << 3) +enum nni_aio_flags { + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, +}; + +// These are used for expiration. +static nni_mtx nni_aio_expire_mtx; +static nni_cv nni_aio_expire_cv; +static int nni_aio_expire_exit; +static nni_list nni_aio_expire_aios; +static nni_thr nni_aio_expire_thr; +static nni_aio *nni_aio_expire_current; + +static void nni_aio_expire_add(nni_aio *); +static void nni_aio_expire_remove(nni_aio *); int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -53,12 +65,20 @@ nni_aio_fini(nni_aio *aio) aio->a_flags |= NNI_AIO_DONE; aio->a_result = NNG_ECANCELED; cancelfn = aio->a_prov_cancel; + } else { cancelfn = NULL; } - nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any + nni_cv_wake(&aio->a_cv); + + while (aio->a_refcnt != 0) { + nni_cv_wait(&aio->a_cv); + } nni_mtx_unlock(&aio->a_lk); + // just a list operation at this point. + nni_aio_expire_remove(aio); + // Cancel the AIO if it was scheduled. if (cancelfn != NULL) { cancelfn(aio); @@ -122,7 +142,7 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) nni_mtx_lock(&aio->a_lk); aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE); - if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) { + if (aio->a_flags & NNI_AIO_FINI) { // We should not reschedule anything at this point. nni_mtx_unlock(&aio->a_lk); return (NNG_ECANCELED); @@ -131,40 +151,11 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) aio->a_count = 0; aio->a_prov_cancel = cancel; aio->a_prov_data = data; - nni_mtx_unlock(&aio->a_lk); - return (0); -} - -// XXX: REMOVE ME... -void -nni_aio_stop(nni_aio *aio) -{ - void (*cancelfn)(nni_aio *); - - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) { - nni_mtx_unlock(&aio->a_lk); - return; - } - aio->a_result = NNG_ECANCELED; - aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP; - cancelfn = aio->a_prov_cancel; - nni_mtx_unlock(&aio->a_lk); - - // This unregisters the AIO from the provider. - if (cancelfn != NULL) { - cancelfn(aio); + if (aio->a_expire != NNI_TIME_NEVER) { + nni_aio_expire_add(aio); } - - nni_mtx_lock(&aio->a_lk); - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; - nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); - - // This either aborts the task, or waits for it to complete if already - // dispatched. - nni_taskq_cancel(NULL, &aio->a_tqe); + return (0); } void @@ -180,10 +171,14 @@ nni_aio_cancel(nni_aio *aio, int rv) return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_result = rv; - cancelfn = aio->a_prov_cancel; + aio->a_result = rv; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); - // XXX: Think about the synchronization with nni_aio_fini... + aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); // Stop any I/O at the provider level. @@ -192,16 +187,20 @@ nni_aio_cancel(nni_aio *aio, int rv) } nni_mtx_lock(&aio->a_lk); + + aio->a_refcnt--; + if (aio->a_refcnt == 0) { + nni_cv_wake(&aio->a_cv); + } + // These should have already been cleared by the cancel function. aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - // XXX: mark unbusy if (!(aio->a_flags & NNI_AIO_FINI)) { // If we are finalizing, then we are done. nni_taskq_dispatch(NULL, &aio->a_tqe); } - // XXX: else wake aio_cv.. because there is someone watching. nni_mtx_unlock(&aio->a_lk); } @@ -222,10 +221,12 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think. - if (!(aio->a_flags & NNI_AIO_STOP)) { - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // This is guaranteed to just be a list operation at this point, + // because done wasn't set. + nni_aio_expire_remove(aio); + aio->a_expire = NNI_TIME_NEVER; + + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -253,3 +254,166 @@ nni_aio_list_active(nni_aio *aio) { return (nni_list_node_active(&aio->a_prov_node)); } + +static void +nni_aio_expire_add(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + nni_aio * naio; + + nni_mtx_lock(mtx); + // This is a reverse walk of the list. We're more likely to find + // a match at the end of the list. + for (naio = nni_list_last(list); naio != NULL; + naio = nni_list_prev(list, naio)) { + if (aio->a_expire >= naio->a_expire) { + nni_list_insert_after(list, aio, naio); + break; + } + } + if (naio == NULL) { + // This has the shortest time, so insert at the start. + nni_list_prepend(list, aio); + // And, as we are the latest, kick the thing. + nni_cv_wake(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_remove(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + + nni_mtx_lock(mtx); + if (nni_list_active(list, aio)) { + nni_list_remove(list, aio); + } + while (aio == nni_aio_expire_current) { + nni_cv_wait(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_loop(void *arg) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *aios = &nni_aio_expire_aios; + nni_aio * aio; + nni_time now; + int rv; + void (*cancelfn)(nni_aio *); + + NNI_ARG_UNUSED(arg); + + for (;;) { + nni_mtx_lock(mtx); + + // If we are resuming this loop after processing an AIO, + // note that we are done with it, and wake anyone waiting + // for that to clear up. + if ((aio = nni_aio_expire_current) != NULL) { + nni_aio_expire_current = NULL; + nni_cv_wake(cv); + } + + if (nni_aio_expire_exit) { + nni_mtx_unlock(mtx); + return; + } + + if ((aio = nni_list_first(aios)) == NULL) { + nni_cv_wait(cv); + nni_mtx_unlock(mtx); + continue; + } + + now = nni_clock(); + if (now < aio->a_expire) { + // Unexpired; the list is ordered, so we just wait. + nni_cv_until(cv, aio->a_expire); + nni_mtx_unlock(mtx); + continue; + } + + // This aio's time has come. Expire it, canceling any + // outstanding I/O. + + nni_list_remove(aios, aio); + nni_aio_expire_current = aio; + nni_mtx_unlock(mtx); + + cancelfn = NULL; + + nni_mtx_lock(&aio->a_lk); + if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) { + nni_mtx_unlock(&aio->a_lk); + continue; + } + + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ETIMEDOUT; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + nni_mtx_unlock(&aio->a_lk); + + // Cancel any outstanding activity. + if (cancelfn != NULL) { + cancelfn(aio); + } + + // Arguably we could avoid dispatching, and execute the + // callback inline here as we are already on a separate + // thread. But keeping it separate is clearer, and more + // consistent with other uses. And this should not be a + // hot code path. + nni_taskq_dispatch(NULL, &aio->a_tqe); + } +} + +int +nni_aio_sys_init(void) +{ + int rv; + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + if (((rv = nni_mtx_init(mtx)) != 0) || + ((rv = nni_cv_init(cv, mtx)) != 0) || + ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) { + goto fail; + } + NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); + nni_thr_run(thr); + return (0); + +fail: + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); + return (rv); +} + +void +nni_aio_sys_fini(void) +{ + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + nni_mtx_lock(mtx); + nni_aio_expire_exit = 1; + nni_cv_wake(cv); + nni_mtx_unlock(mtx); + + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); +} \ No newline at end of file diff --git a/src/core/aio.h b/src/core/aio.h index 296b0682..09923d7f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -52,6 +52,9 @@ struct nni_aio { void (*a_prov_cancel)(nni_aio *); void * a_prov_data; nni_list_node a_prov_node; + + // Expire node. + nni_list_node a_expire_node; }; // nni_aio_init initializes an aio object. The callback is called with @@ -118,4 +121,6 @@ extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); // nni_fini?) // extern void nni_aio_stop(nni_aio *); +extern int nni_aio_sys_init(void); +extern void nni_aio_sys_fini(void); #endif // CORE_AIO_H diff --git a/src/core/init.c b/src/core/init.c index 4292146d..88c80f3f 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -23,21 +23,25 @@ nni_init_helper(void) nni_taskq_sys_fini(); return (rv); } + if ((rv = nni_aio_sys_init()) != 0) { + nni_taskq_sys_fini(); + return (rv); + } if ((rv = nni_random_sys_init()) != 0) { - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } if ((rv = nni_sock_sys_init()) != 0) { nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } if ((rv = nni_ep_sys_init()) != 0) { nni_sock_sys_fini(); nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -45,7 +49,7 @@ nni_init_helper(void) nni_ep_sys_fini(); nni_sock_sys_fini(); nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -71,5 +75,6 @@ nni_fini(void) nni_sock_sys_fini(); nni_random_sys_fini(); nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_plat_fini(); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f7ff7fd..71c6dff4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -32,13 +33,8 @@ struct nni_msgq { nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; }; -static void nni_msgq_run_timeout(void *); - int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } - nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); - mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_expire = NNI_TIME_NEVER; mq->mq_draining = 0; *mqp = mq; @@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_timer_cancel(&mq->mq_timer); nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); @@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_putq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_getq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } @@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -// XXX: Move this to generic AIO timeout... -void -nni_msgq_run_timeout(void *arg) -{ - nni_msgq *mq = arg; - nni_time now; - nni_time exp; - nni_aio * aio; - nni_aio * naio; - - now = nni_clock(); - exp = NNI_TIME_NEVER; - - nni_mtx_lock(&mq->mq_lock); - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - mq->mq_expire = exp; - if (mq->mq_expire != NNI_TIME_NEVER) { - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); - } - nni_mtx_unlock(&mq->mq_lock); -} - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { -- cgit v1.2.3-70-g09d2