aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c60
1 files changed, 41 insertions, 19 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 212b52c2..1bb5a762 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,10 +22,10 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_besteffort;
+ bool mq_besteffort;
+ bool mq_closed;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
@@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_list_remove(aio);
@@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
@@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
@@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
void
-nni_msgq_set_best_effort(nni_msgq *mq, int on)
+nni_msgq_set_best_effort(nni_msgq *mq, bool on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
@@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ // If this is an instantaneous poll operation, and the queue has
+ // no room, nobody is waiting to receive, and we're not best effort
+ // (best effort discards), then report the error (NNG_ETIMEDOUT).
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
+ (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- if (mq->mq_geterr) {
- nni_aio_finish_error(aio, mq->mq_geterr);
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len == 0) &&
+ (nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq)
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = true;
+ mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {