diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 87 |
1 files changed, 15 insertions, 72 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9f7ff7fd..71c6dff4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -32,13 +33,8 @@ struct nni_msgq { nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; - - nni_timer_node mq_timer; - nni_time mq_expire; }; -static void nni_msgq_run_timeout(void *); - int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -76,8 +72,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } - nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); - mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -86,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_closed = 0; mq->mq_puterr = 0; mq->mq_geterr = 0; - mq->mq_expire = NNI_TIME_NEVER; mq->mq_draining = 0; *mqp = mq; @@ -110,7 +103,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_timer_cancel(&mq->mq_timer); nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); @@ -348,8 +340,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -368,21 +358,21 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_putq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_time expire = aio->a_expire; - nni_mtx_lock(&mq->mq_lock); if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { nni_mtx_unlock(&mq->mq_lock); @@ -401,13 +391,15 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); - nni_msgq_run_notify(mq); - // XXX: handle this using a generic aio timeout. - if (expire < mq->mq_expire) { - mq->mq_expire = expire; - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + // if this was a non-blocking operation, and we couldn't finish + // it synchronously in the above run_getq, then abort. + if ((aio->a_expire == NNI_TIME_ZERO) && (nni_aio_list_active(aio))) { + nni_aio_list_remove(aio); + nni_aio_finish(aio, NNG_EAGAIN, 0); } + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); } @@ -483,55 +475,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -// XXX: Move this to generic AIO timeout... -void -nni_msgq_run_timeout(void *arg) -{ - nni_msgq *mq = arg; - nni_time now; - nni_time exp; - nni_aio * aio; - nni_aio * naio; - - now = nni_clock(); - exp = NNI_TIME_NEVER; - - nni_mtx_lock(&mq->mq_lock); - naio = nni_list_first(&mq->mq_aio_getq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_getq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - naio = nni_list_first(&mq->mq_aio_putq); - while ((aio = naio) != NULL) { - naio = nni_list_next(&mq->mq_aio_putq, aio); - if (aio->a_expire == NNI_TIME_ZERO) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_EAGAIN, 0); - } else if (now >= aio->a_expire) { - nni_aio_list_remove(aio); - nni_aio_finish(aio, NNG_ETIMEDOUT, 0); - } else if (exp > aio->a_expire) { - exp = aio->a_expire; - } - } - - mq->mq_expire = exp; - if (mq->mq_expire != NNI_TIME_NEVER) { - nni_timer_schedule(&mq->mq_timer, mq->mq_expire); - } - nni_mtx_unlock(&mq->mq_lock); -} - int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { |
