diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-03 20:36:47 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-03 20:36:47 -0800 |
| commit | c17a1dd3f5333da59355ecc3f8788a0396a8f72d (patch) | |
| tree | 77e6790df0fa45b57fafa749f3380fc4b8aa230e | |
| parent | 97614393e450b6c6813021f0e733b864a6265872 (diff) | |
| download | nng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.tar.gz nng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.tar.bz2 nng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.zip | |
Timer implementation. Operations can timeout now?
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/aio.c | 1 | ||||
| -rw-r--r-- | src/core/aio.h | 1 | ||||
| -rw-r--r-- | src/core/init.c | 8 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 35 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 2 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/timer.c | 161 | ||||
| -rw-r--r-- | src/core/timer.h | 33 |
9 files changed, 240 insertions, 4 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 36d9e8a7..61ebe5ac 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -70,6 +70,8 @@ set (NNG_SOURCES core/taskq.h core/thread.c core/thread.h + core/timer.c + core/timer.h core/transport.c core/transport.h diff --git a/src/core/aio.c b/src/core/aio.c index ce60385e..a3f4fb28 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -24,6 +24,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) nni_cv_init(&aio->a_cv, &aio->a_lk); aio->a_cb = cb; aio->a_cbarg = arg; + aio->a_expire = NNI_TIME_NEVER; nni_taskq_ent_init(&aio->a_tqe, cb, arg); } diff --git a/src/core/aio.h b/src/core/aio.h index 76ebb7c3..f6096ca7 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -25,6 +25,7 @@ struct nni_aio { size_t a_count; // Bytes transferred (I/O only) nni_cb a_cb; // User specified callback. void * a_cbarg; // Callback argument. + nni_time a_expire; // These fields are private to the aio framework. nni_mtx a_lk; diff --git a/src/core/init.c b/src/core/init.c index 0e27e289..8dbe50db 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -29,12 +29,18 @@ nni_init_helper(void) if ((rv = nni_taskq_sys_init()) != 0) { return (rv); } + if ((rv = nni_timer_sys_init()) != 0) { + nni_taskq_sys_fini(); + return (rv); + } if ((rv = nni_random_sys_init()) != 0) { + nni_timer_sys_fini(); nni_taskq_sys_fini(); return (rv); } if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) { nni_random_sys_fini(); + nni_timer_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -43,6 +49,7 @@ nni_init_helper(void) ((rv = nni_idhash_init(&nni_sockets_x)) != 0)) { nni_mtx_fini(&nni_idlock_x); nni_random_sys_fini(); + nni_timer_sys_fini(); nni_taskq_sys_fini(); return (rv); } @@ -77,6 +84,7 @@ nni_fini(void) nni_mtx_fini(&nni_idlock_x); nni_tran_sys_fini(); nni_random_sys_fini(); + nni_timer_sys_fini(); nni_taskq_sys_fini(); nni_plat_fini(); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 66f246f8..8b9a7e1a 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -277,7 +277,6 @@ nni_msgq_run_putq(nni_msgq *mq) size_t len; while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - msg = waio->a_msg; len = nni_msg_len(msg); @@ -324,7 +323,6 @@ nni_msgq_run_getq(nni_msgq *mq) size_t len; while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { msg = mq->mq_msgs[mq->mq_get++]; @@ -389,6 +387,39 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) } +void +nni_msgq_run_timeout(nni_msgq *mq) +{ + nni_time now; + nni_aio *aio; + nni_aio *naio; + int rv; + + now = nni_clock(); + + nni_mtx_lock(&mq->mq_lock); + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + if (now >= aio->a_expire) { + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, NNG_ETIMEDOUT, 0); + } + } + + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_putq, aio); + if (now >= aio->a_expire) { + nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_finish(aio, NNG_ETIMEDOUT, 0); + } + } + + nni_mtx_unlock(&mq->mq_lock); +} + + int nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index a350c3b3..a0af12be 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -37,8 +37,6 @@ extern void nni_msgq_fini(nni_msgq *); extern int nni_msgq_aio_put(nni_msgq *, nni_aio *); extern int nni_msgq_aio_get(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_put_until(nni_msgq *, nni_aio *, nni_time); -extern int nni_msgq_aio_get_until(nni_msgq *, nni_aio *, nni_time); extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index d3e3e5c6..2a72766c 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -39,6 +39,7 @@ #include "core/random.h" #include "core/taskq.h" #include "core/thread.h" +#include "core/timer.h" #include "core/transport.h" // These have to come after the others - particularly transport.h diff --git a/src/core/timer.c b/src/core/timer.c new file mode 100644 index 00000000..59966822 --- /dev/null +++ b/src/core/timer.c @@ -0,0 +1,161 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdlib.h> +#include <string.h> + +extern void nni_timer_schedule(nni_timer_node *); +extern void nni_timer_cancel(nni_timer_node *); +extern int nni_timer_init(void); +extern void nni_timer_fini(void); +static void nni_timer_loop(void *); + +struct nni_timer { + // We use two mutexes. One protects the list, and the other ensures + // that cancel is blocked if we are running a timeout calllback. + // The callback(s) are allowed to reschedule a timeout. The list + // mutex is *always* acquired before the run mutex. + nni_mtx t_list_mx; + nni_mtx t_run_mx; + nni_cv t_cv; + nni_list t_entries; + nni_thr t_thr; + int t_close; +}; + +typedef struct nni_timer nni_timer; + +static nni_timer nni_global_timer; + + +int +nni_timer_sys_init(void) +{ + int rv; + nni_timer *timer = &nni_global_timer; + + memset(timer, 0, sizeof (*timer)); + NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node); + timer->t_close = 0; + + if (((rv = nni_mtx_init(&timer->t_list_mx)) != 0) || + ((rv = nni_mtx_init(&timer->t_run_mx)) != 0) || + ((rv = nni_cv_init(&timer->t_cv, &timer->t_list_mx)) != 0) || + ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) { + nni_timer_sys_fini(); + return (rv); + } + nni_thr_run(&timer->t_thr); + return (0); +} + + +void +nni_timer_sys_fini(void) +{ + nni_timer *timer = &nni_global_timer; + + nni_mtx_lock(&timer->t_list_mx); + timer->t_close = 1; + nni_cv_wake(&timer->t_cv); + nni_mtx_unlock(&timer->t_list_mx); + + nni_thr_fini(&timer->t_thr); + nni_cv_fini(&timer->t_cv); + nni_mtx_fini(&timer->t_list_mx); + nni_mtx_fini(&timer->t_run_mx); +} + + +void +nni_timer_cancel(nni_timer_node *node) +{ + nni_timer *timer = &nni_global_timer; + + nni_mtx_lock(&timer->t_list_mx); + nni_mtx_lock(&timer->t_run_mx); + if (node->t_sched) { + nni_list_remove(&timer->t_entries, node); + node->t_sched = 0; + } + nni_mtx_unlock(&timer->t_run_mx); + nni_mtx_unlock(&timer->t_list_mx); +} + + +void +nni_timer_schedule(nni_timer_node *node) +{ + nni_timer *timer = &nni_global_timer; + nni_timer_node *srch; + int wake = 1; + + nni_mtx_lock(&timer->t_list_mx); + + srch = nni_list_first(&timer->t_entries); + while ((srch != NULL) && (srch->t_expire < node->t_expire)) { + srch = nni_list_next(&timer->t_entries, srch); + wake = 0; + } + if (srch != NULL) { + nni_list_insert_before(&timer->t_entries, node, srch); + } else { + nni_list_append(&timer->t_entries, node); + } + node->t_sched = 1; + if (wake) { + nni_cv_wake(&timer->t_cv); + } + nni_mtx_unlock(&timer->t_list_mx); +} + + +static void +nni_timer_loop(void *arg) +{ + nni_timer *timer = arg; + nni_time now; + nni_time expire; + nni_timer_node *node; + + for (;;) { + nni_mtx_lock(&timer->t_list_mx); + if (timer->t_close) { + nni_mtx_unlock(&timer->t_list_mx); + break; + } + + now = nni_clock(); + if ((node = nni_list_first(&timer->t_entries)) == NULL) { + nni_cv_wait(&timer->t_cv); + nni_mtx_unlock(&timer->t_list_mx); + continue; + } + if (now < node->t_expire) { + // End of run, we have to wait for next. + nni_cv_until(&timer->t_cv, node->t_expire); + nni_mtx_unlock(&timer->t_list_mx); + continue; + } + + nni_list_remove(&timer->t_entries, node); + node->t_sched = 0; + + // The lock ordering here is important. We acquire the run + // lock before dropping the list lock. One the run is done, + // we can drop the run lock too. The reason for the second + // lock is so that the callback can reschedule itself. + nni_mtx_lock(&timer->t_run_mx); + nni_mtx_unlock(&timer->t_list_mx); + node->t_cb(node->t_arg); + nni_mtx_unlock(&timer->t_run_mx); + } +} diff --git a/src/core/timer.h b/src/core/timer.h new file mode 100644 index 00000000..a5bd4633 --- /dev/null +++ b/src/core/timer.h @@ -0,0 +1,33 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_TIMER_H +#define CORE_TIMER_H + +#include "core/defs.h" +#include "core/list.h" + +// For the sake of simplicity, we just maintain a single global timer thread. + +struct nni_timer_node { + nni_time t_expire; + nni_cb t_cb; + void * t_arg; + nni_list_node t_node; + int t_sched; +}; + +typedef struct nni_timer_node nni_timer_node; + +extern void nni_timer_schedule(nni_timer_node *); +extern void nni_timer_cancel(nni_timer_node *); +extern int nni_timer_sys_init(void); +extern void nni_timer_sys_fini(void); + +#endif // CORE_TIMER_H |
