diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-16 01:02:09 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-16 01:02:09 -0700 |
| commit | 89c4ae87f3d927c8a6a760b6706b3a8e386dfb55 (patch) | |
| tree | 7fa84fbd8076ac262dcc6836653968276738ad18 /src/core/msgqueue.c | |
| parent | a1d237059c652c9e36117eed3e6387dcae128174 (diff) | |
| download | nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.gz nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.tar.bz2 nng-89c4ae87f3d927c8a6a760b6706b3a8e386dfb55.zip | |
AIO timeouts work correctly now, using their own timer logic.
We closed a few subtle races in the AIO subsystem as well, and now
we were able to eliminate the separate timer handling the MQ code.
There appear to be some opportunities to further enhance the code
for MQs as well -- eventually probably the only access to MQs will
be with AIOs.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 87 |
1 files changed, 15 insertions, 72 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f7ff7fd..71c6dff4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -32,13 +33,8 @@ struct nni_msgq { nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; }; -static void nni_msgq_run_timeout(void *); - int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } - nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); - mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_expire = NNI_TIME_NEVER; mq->mq_draining = 0; *mqp = mq; @@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_timer_cancel(&mq->mq_timer); nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); @@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_putq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_getq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } @@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -// XXX: Move this to generic AIO timeout... -void -nni_msgq_run_timeout(void *arg) -{ - nni_msgq *mq = arg; - nni_time now; - nni_time exp; - nni_aio * aio; - nni_aio * naio; - - now = nni_clock(); - exp = NNI_TIME_NEVER; - - nni_mtx_lock(&mq->mq_lock); - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - mq->mq_expire = exp; - if (mq->mq_expire != NNI_TIME_NEVER) { - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); - } - nni_mtx_unlock(&mq->mq_lock); -} - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { |
