diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 212b52c2..1bb5a762 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -22,10 +22,10 @@ struct nni_msgq { int mq_len; int mq_get; int mq_put; - int mq_closed; int mq_puterr; int mq_geterr; - int mq_besteffort; + bool mq_besteffort; + bool mq_closed; nni_msg **mq_msgs; nni_list mq_aio_putq; @@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) { nni_aio_list_remove(aio); @@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { nni_aio_list_remove(aio); @@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error) // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + // If we were closed, then this error trumps all others. + error = NNG_ECLOSED; + } if (error != 0) { while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) || ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) { @@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq) } void -nni_msgq_set_best_effort(nni_msgq *mq, int on) +nni_msgq_set_best_effort(nni_msgq *mq, bool on) { nni_mtx_lock(&mq->mq_lock); mq->mq_besteffort = on; @@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv) void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&mq->mq_lock); if (mq->mq_puterr) { nni_aio_finish_error(aio, mq->mq_puterr); nni_mtx_unlock(&mq->mq_lock); return; } + // If this is an instantaneous poll operation, and the queue has + // no room, nobody is waiting to receive, and we're not best effort + // (best effort discards), then report the error (NNG_ETIMEDOUT). + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && + (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) { + nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); + int rv; + + if (nni_aio_begin(aio) != 0) { return; } - if (mq->mq_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_geterr) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, mq->mq_geterr); return; } - if (mq->mq_geterr) { - nni_aio_finish_error(aio, mq->mq_geterr); + rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + if ((rv != 0) && (mq->mq_len == 0) && + (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); + nni_aio_finish_error(aio, rv); return; } @@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq) nni_aio *aio; nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; + mq->mq_closed = true; + mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED; // Free the messages orphaned in the queue. while (mq->mq_len > 0) { |
