diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 258 | ||||
| -rw-r--r-- | src/core/aio.h | 5 | ||||
| -rw-r--r-- | src/core/init.c | 13 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 87 |
4 files changed, 240 insertions, 123 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index ba09b608..73c1dd9b 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -11,10 +11,22 @@ #include "core/nng_impl.h" #include <string.h> -#define NNI_AIO_WAKE (1 << 0) -#define NNI_AIO_DONE (1 << 1) -#define NNI_AIO_FINI (1 << 2) -#define NNI_AIO_STOP (1 << 3) +enum nni_aio_flags { + NNI_AIO_WAKE = 0x1, + NNI_AIO_DONE = 0x2, + NNI_AIO_FINI = 0x4, +}; + +// These are used for expiration. +static nni_mtx nni_aio_expire_mtx; +static nni_cv nni_aio_expire_cv; +static int nni_aio_expire_exit; +static nni_list nni_aio_expire_aios; +static nni_thr nni_aio_expire_thr; +static nni_aio *nni_aio_expire_current; + +static void nni_aio_expire_add(nni_aio *); +static void nni_aio_expire_remove(nni_aio *); int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) @@ -53,12 +65,20 @@ nni_aio_fini(nni_aio *aio) aio->a_flags |= NNI_AIO_DONE; aio->a_result = NNG_ECANCELED; cancelfn = aio->a_prov_cancel; + } else { cancelfn = NULL; } - nni_cv_wake(&aio->a_cv); // XXX: why? aio_wait? We shouldn't have any + nni_cv_wake(&aio->a_cv); + + while (aio->a_refcnt != 0) { + nni_cv_wait(&aio->a_cv); + } nni_mtx_unlock(&aio->a_lk); + // just a list operation at this point. + nni_aio_expire_remove(aio); + // Cancel the AIO if it was scheduled. if (cancelfn != NULL) { cancelfn(aio); @@ -122,7 +142,7 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) nni_mtx_lock(&aio->a_lk); aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE); - if (aio->a_flags & (NNI_AIO_FINI | NNI_AIO_STOP)) { + if (aio->a_flags & NNI_AIO_FINI) { // We should not reschedule anything at this point. nni_mtx_unlock(&aio->a_lk); return (NNG_ECANCELED); @@ -131,40 +151,11 @@ nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) aio->a_count = 0; aio->a_prov_cancel = cancel; aio->a_prov_data = data; - nni_mtx_unlock(&aio->a_lk); - return (0); -} - -// XXX: REMOVE ME... -void -nni_aio_stop(nni_aio *aio) -{ - void (*cancelfn)(nni_aio *); - - nni_mtx_lock(&aio->a_lk); - if (aio->a_flags & (NNI_AIO_STOP | NNI_AIO_FINI | NNI_AIO_DONE)) { - nni_mtx_unlock(&aio->a_lk); - return; - } - aio->a_result = NNG_ECANCELED; - aio->a_flags |= NNI_AIO_DONE | NNI_AIO_STOP; - cancelfn = aio->a_prov_cancel; - nni_mtx_unlock(&aio->a_lk); - - // This unregisters the AIO from the provider. - if (cancelfn != NULL) { - cancelfn(aio); + if (aio->a_expire != NNI_TIME_NEVER) { + nni_aio_expire_add(aio); } - - nni_mtx_lock(&aio->a_lk); - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; - nni_cv_wake(&aio->a_cv); nni_mtx_unlock(&aio->a_lk); - - // This either aborts the task, or waits for it to complete if already - // dispatched. - nni_taskq_cancel(NULL, &aio->a_tqe); + return (0); } void @@ -180,10 +171,14 @@ nni_aio_cancel(nni_aio *aio, int rv) return; } aio->a_flags |= NNI_AIO_DONE; - aio->a_result = rv; - cancelfn = aio->a_prov_cancel; + aio->a_result = rv; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + + // Guaraneed to just be a list operation. + nni_aio_expire_remove(aio); - // XXX: Think about the synchronization with nni_aio_fini... + aio->a_refcnt++; nni_mtx_unlock(&aio->a_lk); // Stop any I/O at the provider level. @@ -192,16 +187,20 @@ nni_aio_cancel(nni_aio *aio, int rv) } nni_mtx_lock(&aio->a_lk); + + aio->a_refcnt--; + if (aio->a_refcnt == 0) { + nni_cv_wake(&aio->a_cv); + } + // These should have already been cleared by the cancel function. aio->a_prov_data = NULL; aio->a_prov_cancel = NULL; - // XXX: mark unbusy if (!(aio->a_flags & NNI_AIO_FINI)) { // If we are finalizing, then we are done. nni_taskq_dispatch(NULL, &aio->a_tqe); } - // XXX: else wake aio_cv.. because there is someone watching. nni_mtx_unlock(&aio->a_lk); } @@ -222,10 +221,12 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - // XXX: cleanup the NNI_AIO_STOP flag, it's kind of pointless I think. - if (!(aio->a_flags & NNI_AIO_STOP)) { - nni_taskq_dispatch(NULL, &aio->a_tqe); - } + // This is guaranteed to just be a list operation at this point, + // because done wasn't set. + nni_aio_expire_remove(aio); + aio->a_expire = NNI_TIME_NEVER; + + nni_taskq_dispatch(NULL, &aio->a_tqe); nni_mtx_unlock(&aio->a_lk); } @@ -253,3 +254,166 @@ nni_aio_list_active(nni_aio *aio) { return (nni_list_node_active(&aio->a_prov_node)); } + +static void +nni_aio_expire_add(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + nni_aio * naio; + + nni_mtx_lock(mtx); + // This is a reverse walk of the list. We're more likely to find + // a match at the end of the list. + for (naio = nni_list_last(list); naio != NULL; + naio = nni_list_prev(list, naio)) { + if (aio->a_expire >= naio->a_expire) { + nni_list_insert_after(list, aio, naio); + break; + } + } + if (naio == NULL) { + // This has the shortest time, so insert at the start. + nni_list_prepend(list, aio); + // And, as we are the latest, kick the thing. + nni_cv_wake(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_remove(nni_aio *aio) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *list = &nni_aio_expire_aios; + + nni_mtx_lock(mtx); + if (nni_list_active(list, aio)) { + nni_list_remove(list, aio); + } + while (aio == nni_aio_expire_current) { + nni_cv_wait(cv); + } + nni_mtx_unlock(mtx); +} + +static void +nni_aio_expire_loop(void *arg) +{ + nni_mtx * mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_list *aios = &nni_aio_expire_aios; + nni_aio * aio; + nni_time now; + int rv; + void (*cancelfn)(nni_aio *); + + NNI_ARG_UNUSED(arg); + + for (;;) { + nni_mtx_lock(mtx); + + // If we are resuming this loop after processing an AIO, + // note that we are done with it, and wake anyone waiting + // for that to clear up. + if ((aio = nni_aio_expire_current) != NULL) { + nni_aio_expire_current = NULL; + nni_cv_wake(cv); + } + + if (nni_aio_expire_exit) { + nni_mtx_unlock(mtx); + return; + } + + if ((aio = nni_list_first(aios)) == NULL) { + nni_cv_wait(cv); + nni_mtx_unlock(mtx); + continue; + } + + now = nni_clock(); + if (now < aio->a_expire) { + // Unexpired; the list is ordered, so we just wait. + nni_cv_until(cv, aio->a_expire); + nni_mtx_unlock(mtx); + continue; + } + + // This aio's time has come. Expire it, canceling any + // outstanding I/O. + + nni_list_remove(aios, aio); + nni_aio_expire_current = aio; + nni_mtx_unlock(mtx); + + cancelfn = NULL; + + nni_mtx_lock(&aio->a_lk); + if ((aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) != 0) { + nni_mtx_unlock(&aio->a_lk); + continue; + } + + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = NNG_ETIMEDOUT; + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + nni_mtx_unlock(&aio->a_lk); + + // Cancel any outstanding activity. + if (cancelfn != NULL) { + cancelfn(aio); + } + + // Arguably we could avoid dispatching, and execute the + // callback inline here as we are already on a separate + // thread. But keeping it separate is clearer, and more + // consistent with other uses. And this should not be a + // hot code path. + nni_taskq_dispatch(NULL, &aio->a_tqe); + } +} + +int +nni_aio_sys_init(void) +{ + int rv; + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + if (((rv = nni_mtx_init(mtx)) != 0) || + ((rv = nni_cv_init(cv, mtx)) != 0) || + ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) { + goto fail; + } + NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); + nni_thr_run(thr); + return (0); + +fail: + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); + return (rv); +} + +void +nni_aio_sys_fini(void) +{ + nni_mtx *mtx = &nni_aio_expire_mtx; + nni_cv * cv = &nni_aio_expire_cv; + nni_thr *thr = &nni_aio_expire_thr; + + nni_mtx_lock(mtx); + nni_aio_expire_exit = 1; + nni_cv_wake(cv); + nni_mtx_unlock(mtx); + + nni_thr_fini(thr); + nni_cv_fini(cv); + nni_mtx_fini(mtx); +}
\ No newline at end of file diff --git a/src/core/aio.h b/src/core/aio.h index 296b0682..09923d7f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -52,6 +52,9 @@ struct nni_aio { void (*a_prov_cancel)(nni_aio *); void * a_prov_data; nni_list_node a_prov_node; + + // Expire node. + nni_list_node a_expire_node; }; // nni_aio_init initializes an aio object. The callback is called with @@ -118,4 +121,6 @@ extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *); // nni_fini?) // extern void nni_aio_stop(nni_aio *); +extern int nni_aio_sys_init(void); +extern void nni_aio_sys_fini(void); #endif // CORE_AIO_H diff --git a/src/core/init.c b/src/core/init.c index 4292146d..88c80f3f 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -23,21 +23,25 @@ nni_init_helper(void) nni_taskq_sys_fini(); return (rv); } + if ((rv = nni_aio_sys_init()) != 0) { + nni_taskq_sys_fini(); + return (rv); + } if ((rv = nni_random_sys_init()) != 0) { - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } if ((rv = nni_sock_sys_init()) != 0) { nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } if ((rv = nni_ep_sys_init()) != 0) { nni_sock_sys_fini(); nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -45,7 +49,7 @@ nni_init_helper(void) nni_ep_sys_fini(); nni_sock_sys_fini(); nni_random_sys_fini(); - nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -71,5 +75,6 @@ nni_fini(void) nni_sock_sys_fini(); nni_random_sys_fini(); nni_timer_sys_fini(); + nni_aio_sys_fini(); nni_plat_fini(); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f7ff7fd..71c6dff4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -32,13 +33,8 @@ struct nni_msgq { nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; }; -static void nni_msgq_run_timeout(void *); - int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } - nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); - mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_expire = NNI_TIME_NEVER; mq->mq_draining = 0; *mqp = mq; @@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_timer_cancel(&mq->mq_timer); nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); @@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_putq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_getq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } @@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -// XXX: Move this to generic AIO timeout... -void -nni_msgq_run_timeout(void *arg) -{ - nni_msgq *mq = arg; - nni_time now; - nni_time exp; - nni_aio * aio; - nni_aio * naio; - - now = nni_clock(); - exp = NNI_TIME_NEVER; - - nni_mtx_lock(&mq->mq_lock); - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - mq->mq_expire = exp; - if (mq->mq_expire != NNI_TIME_NEVER) { - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); - } - nni_mtx_unlock(&mq->mq_lock); -} - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { |
