summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-16 01:02:09 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-16 01:02:09 -0700
commit89c4ae87f3d927c8a6a760b6706b3a8e386dfb55 (patch)
tree7fa84fbd8076ac262dcc6836653968276738ad18 /src/core/msgqueue.c
parenta1d237059c652c9e36117eed3e6387dcae128174 (diff)
downloadnng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.gz
nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.bz2
nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.zip
AIO timeouts work correctly now, using their own timer logic.
We closed a few subtle races in the AIO subsystem as well, and now we were able to eliminate the separate timer handling the MQ code. There appear to be some opportunities to further enhance the code for MQs as well -- eventually probably the only access to MQs will be with AIOs.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c87
1 files changed, 15 insertions, 72 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9f7ff7fd..71c6dff4 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -32,13 +33,8 @@ struct nni_msgq {
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
};
-static void nni_msgq_run_timeout(void *);
-
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
- nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
-
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_expire = NNI_TIME_NEVER;
mq->mq_draining = 0;
*mqp = mq;
@@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_timer_cancel(&mq->mq_timer);
nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
@@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_putq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_time expire = aio->a_expire;
-
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
@@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
- nni_msgq_run_notify(mq);
- // XXX: handle this using a generic aio timeout.
- if (expire < mq->mq_expire) {
- mq->mq_expire = expire;
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ // if this was a non-blocking operation, and we couldn't finish
+ // it synchronously in the above run_getq, then abort.
+ if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, NNG_EAGAIN, 0);
}
+ nni_msgq_run_notify(mq);
+
nni_mtx_unlock(&mq->mq_lock);
}
@@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-// XXX: Move this to generic AIO timeout...
-void
-nni_msgq_run_timeout(void *arg)
-{
- nni_msgq *mq = arg;
- nni_time now;
- nni_time exp;
- nni_aio * aio;
- nni_aio * naio;
-
- now = nni_clock();
- exp = NNI_TIME_NEVER;
-
- nni_mtx_lock(&mq->mq_lock);
- naio = nni_list_first(&mq->mq_aio_getq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_getq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- naio = nni_list_first(&mq->mq_aio_putq);
- while ((aio = naio) != NULL) {
- naio = nni_list_next(&mq->mq_aio_putq, aio);
- if (aio->a_expire == NNI_TIME_ZERO) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_EAGAIN, 0);
- } else if (now >= aio->a_expire) {
- nni_aio_list_remove(aio);
- nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
- } else if (exp > aio->a_expire) {
- exp = aio->a_expire;
- }
- }
-
- mq->mq_expire = exp;
- if (mq->mq_expire != NNI_TIME_NEVER) {
- nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{