aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/aio.c1
-rw-r--r--src/core/aio.h1
-rw-r--r--src/core/init.c8
-rw-r--r--src/core/msgqueue.c35
-rw-r--r--src/core/msgqueue.h2
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/timer.c161
-rw-r--r--src/core/timer.h33
9 files changed, 240 insertions, 4 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 36d9e8a7..61ebe5ac 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,8 @@ set (NNG_SOURCES
core/taskq.h
core/thread.c
core/thread.h
+ core/timer.c
+ core/timer.h
core/transport.c
core/transport.h
diff --git a/src/core/aio.c b/src/core/aio.c
index ce60385e..a3f4fb28 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -24,6 +24,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
nni_cv_init(&aio->a_cv, &aio->a_lk);
aio->a_cb = cb;
aio->a_cbarg = arg;
+ aio->a_expire = NNI_TIME_NEVER;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index 76ebb7c3..f6096ca7 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -25,6 +25,7 @@ struct nni_aio {
size_t a_count; // Bytes transferred (I/O only)
nni_cb a_cb; // User specified callback.
void * a_cbarg; // Callback argument.
+ nni_time a_expire;
// These fields are private to the aio framework.
nni_mtx a_lk;
diff --git a/src/core/init.c b/src/core/init.c
index 0e27e289..8dbe50db 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -29,12 +29,18 @@ nni_init_helper(void)
if ((rv = nni_taskq_sys_init()) != 0) {
return (rv);
}
+ if ((rv = nni_timer_sys_init()) != 0) {
+ nni_taskq_sys_fini();
+ return (rv);
+ }
if ((rv = nni_random_sys_init()) != 0) {
+ nni_timer_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) {
nni_random_sys_fini();
+ nni_timer_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
@@ -43,6 +49,7 @@ nni_init_helper(void)
((rv = nni_idhash_init(&nni_sockets_x)) != 0)) {
nni_mtx_fini(&nni_idlock_x);
nni_random_sys_fini();
+ nni_timer_sys_fini();
nni_taskq_sys_fini();
return (rv);
}
@@ -77,6 +84,7 @@ nni_fini(void)
nni_mtx_fini(&nni_idlock_x);
nni_tran_sys_fini();
nni_random_sys_fini();
+ nni_timer_sys_fini();
nni_taskq_sys_fini();
nni_plat_fini();
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 66f246f8..8b9a7e1a 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -277,7 +277,6 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
-
msg = waio->a_msg;
len = nni_msg_len(msg);
@@ -324,7 +323,6 @@ nni_msgq_run_getq(nni_msgq *mq)
size_t len;
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
-
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
msg = mq->mq_msgs[mq->mq_get++];
@@ -389,6 +387,39 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
}
+void
+nni_msgq_run_timeout(nni_msgq *mq)
+{
+ nni_time now;
+ nni_aio *aio;
+ nni_aio *naio;
+ int rv;
+
+ now = nni_clock();
+
+ nni_mtx_lock(&mq->mq_lock);
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ if (now >= aio->a_expire) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
+ }
+ }
+
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_putq, aio);
+ if (now >= aio->a_expire) {
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
+ }
+ }
+
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index a350c3b3..a0af12be 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -37,8 +37,6 @@ extern void nni_msgq_fini(nni_msgq *);
extern int nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_get(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_put_until(nni_msgq *, nni_aio *, nni_time);
-extern int nni_msgq_aio_get_until(nni_msgq *, nni_aio *, nni_time);
extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index d3e3e5c6..2a72766c 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -39,6 +39,7 @@
#include "core/random.h"
#include "core/taskq.h"
#include "core/thread.h"
+#include "core/timer.h"
#include "core/transport.h"
// These have to come after the others - particularly transport.h
diff --git a/src/core/timer.c b/src/core/timer.c
new file mode 100644
index 00000000..59966822
--- /dev/null
+++ b/src/core/timer.c
@@ -0,0 +1,161 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+extern void nni_timer_schedule(nni_timer_node *);
+extern void nni_timer_cancel(nni_timer_node *);
+extern int nni_timer_init(void);
+extern void nni_timer_fini(void);
+static void nni_timer_loop(void *);
+
+struct nni_timer {
+ // We use two mutexes. One protects the list, and the other ensures
+ // that cancel is blocked if we are running a timeout calllback.
+ // The callback(s) are allowed to reschedule a timeout. The list
+ // mutex is *always* acquired before the run mutex.
+ nni_mtx t_list_mx;
+ nni_mtx t_run_mx;
+ nni_cv t_cv;
+ nni_list t_entries;
+ nni_thr t_thr;
+ int t_close;
+};
+
+typedef struct nni_timer nni_timer;
+
+static nni_timer nni_global_timer;
+
+
+int
+nni_timer_sys_init(void)
+{
+ int rv;
+ nni_timer *timer = &nni_global_timer;
+
+ memset(timer, 0, sizeof (*timer));
+ NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node);
+ timer->t_close = 0;
+
+ if (((rv = nni_mtx_init(&timer->t_list_mx)) != 0) ||
+ ((rv = nni_mtx_init(&timer->t_run_mx)) != 0) ||
+ ((rv = nni_cv_init(&timer->t_cv, &timer->t_list_mx)) != 0) ||
+ ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) {
+ nni_timer_sys_fini();
+ return (rv);
+ }
+ nni_thr_run(&timer->t_thr);
+ return (0);
+}
+
+
+void
+nni_timer_sys_fini(void)
+{
+ nni_timer *timer = &nni_global_timer;
+
+ nni_mtx_lock(&timer->t_list_mx);
+ timer->t_close = 1;
+ nni_cv_wake(&timer->t_cv);
+ nni_mtx_unlock(&timer->t_list_mx);
+
+ nni_thr_fini(&timer->t_thr);
+ nni_cv_fini(&timer->t_cv);
+ nni_mtx_fini(&timer->t_list_mx);
+ nni_mtx_fini(&timer->t_run_mx);
+}
+
+
+void
+nni_timer_cancel(nni_timer_node *node)
+{
+ nni_timer *timer = &nni_global_timer;
+
+ nni_mtx_lock(&timer->t_list_mx);
+ nni_mtx_lock(&timer->t_run_mx);
+ if (node->t_sched) {
+ nni_list_remove(&timer->t_entries, node);
+ node->t_sched = 0;
+ }
+ nni_mtx_unlock(&timer->t_run_mx);
+ nni_mtx_unlock(&timer->t_list_mx);
+}
+
+
+void
+nni_timer_schedule(nni_timer_node *node)
+{
+ nni_timer *timer = &nni_global_timer;
+ nni_timer_node *srch;
+ int wake = 1;
+
+ nni_mtx_lock(&timer->t_list_mx);
+
+ srch = nni_list_first(&timer->t_entries);
+ while ((srch != NULL) && (srch->t_expire < node->t_expire)) {
+ srch = nni_list_next(&timer->t_entries, srch);
+ wake = 0;
+ }
+ if (srch != NULL) {
+ nni_list_insert_before(&timer->t_entries, node, srch);
+ } else {
+ nni_list_append(&timer->t_entries, node);
+ }
+ node->t_sched = 1;
+ if (wake) {
+ nni_cv_wake(&timer->t_cv);
+ }
+ nni_mtx_unlock(&timer->t_list_mx);
+}
+
+
+static void
+nni_timer_loop(void *arg)
+{
+ nni_timer *timer = arg;
+ nni_time now;
+ nni_time expire;
+ nni_timer_node *node;
+
+ for (;;) {
+ nni_mtx_lock(&timer->t_list_mx);
+ if (timer->t_close) {
+ nni_mtx_unlock(&timer->t_list_mx);
+ break;
+ }
+
+ now = nni_clock();
+ if ((node = nni_list_first(&timer->t_entries)) == NULL) {
+ nni_cv_wait(&timer->t_cv);
+ nni_mtx_unlock(&timer->t_list_mx);
+ continue;
+ }
+ if (now < node->t_expire) {
+ // End of run, we have to wait for next.
+ nni_cv_until(&timer->t_cv, node->t_expire);
+ nni_mtx_unlock(&timer->t_list_mx);
+ continue;
+ }
+
+ nni_list_remove(&timer->t_entries, node);
+ node->t_sched = 0;
+
+ // The lock ordering here is important. We acquire the run
+ // lock before dropping the list lock. One the run is done,
+ // we can drop the run lock too. The reason for the second
+ // lock is so that the callback can reschedule itself.
+ nni_mtx_lock(&timer->t_run_mx);
+ nni_mtx_unlock(&timer->t_list_mx);
+ node->t_cb(node->t_arg);
+ nni_mtx_unlock(&timer->t_run_mx);
+ }
+}
diff --git a/src/core/timer.h b/src/core/timer.h
new file mode 100644
index 00000000..a5bd4633
--- /dev/null
+++ b/src/core/timer.h
@@ -0,0 +1,33 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_TIMER_H
+#define CORE_TIMER_H
+
+#include "core/defs.h"
+#include "core/list.h"
+
+// For the sake of simplicity, we just maintain a single global timer thread.
+
+struct nni_timer_node {
+ nni_time t_expire;
+ nni_cb t_cb;
+ void * t_arg;
+ nni_list_node t_node;
+ int t_sched;
+};
+
+typedef struct nni_timer_node nni_timer_node;
+
+extern void nni_timer_schedule(nni_timer_node *);
+extern void nni_timer_cancel(nni_timer_node *);
+extern int nni_timer_sys_init(void);
+extern void nni_timer_sys_fini(void);
+
+#endif // CORE_TIMER_H