summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c87
1 files changed, 15 insertions, 72 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9f7ff7fd..71c6dff4 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -32,13 +33,8 @@ struct nni_msgq {
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
};
-static void nni_msgq_run_timeout(void *);
-
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
- nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
-
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_expire = NNI_TIME_NEVER;
mq->mq_draining = 0;
*mqp = mq;
@@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_timer_cancel(&mq->mq_timer);
nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
@@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_putq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_getq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
@@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-// XXX: Move this to generic AIO timeout...
-void
-nni_msgq_run_timeout(void *arg)
-{
- nni_msgq *mq = arg;
- nni_time now;
- nni_time exp;
- nni_aio * aio;
- nni_aio * naio;
-
- now = nni_clock();
- exp = NNI_TIME_NEVER;
-
- nni_mtx_lock(&mq->mq_lock);
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- mq->mq_expire = exp;
- if (mq->mq_expire != NNI_TIME_NEVER) {
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{