summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-03 20:36:47 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-03 20:36:47 -0800
commitc17a1dd3f5333da59355ecc3f8788a0396a8f72d (patch)
tree77e6790df0fa45b57fafa749f3380fc4b8aa230e /src/core/msgqueue.c
parent97614393e450b6c6813021f0e733b864a6265872 (diff)
downloadnng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.tar.gz
nng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.tar.bz2
nng-c17a1dd3f5333da59355ecc3f8788a0396a8f72d.zip
Timer implementation. Operations can timeout now?
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c35
1 files changed, 33 insertions, 2 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 66f246f8..8b9a7e1a 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -277,7 +277,6 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
-
msg = waio->a_msg;
len = nni_msg_len(msg);
@@ -324,7 +323,6 @@ nni_msgq_run_getq(nni_msgq *mq)
size_t len;
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
-
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
msg = mq->mq_msgs[mq->mq_get++];
@@ -389,6 +387,39 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
}
+void
+nni_msgq_run_timeout(nni_msgq *mq)
+{
+ nni_time now;
+ nni_aio *aio;
+ nni_aio *naio;
+ int rv;
+
+ now = nni_clock();
+
+ nni_mtx_lock(&mq->mq_lock);
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ if (now >= aio->a_expire) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
+ }
+ }
+
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_putq, aio);
+ if (now >= aio->a_expire) {
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
+ }
+ }
+
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{