aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c16
1 files changed, 3 insertions, 13 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 116f0907..8118c6c2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -213,8 +213,6 @@ nni_msgq_cancel(nni_aio *aio, void *arg, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -222,11 +220,8 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
// If this is an instantaneous poll operation, and the queue has
// no room, nobody is waiting to receive, then report NNG_ETIMEDOUT.
- rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
- if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
- (nni_list_empty(&mq->mq_aio_getq))) {
+ if (!nni_aio_defer(aio, nni_msgq_cancel, mq)) {
nni_mtx_unlock(&mq->mq_lock);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&mq->mq_aio_putq, aio);
@@ -239,17 +234,12 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- int rv;
-
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&mq->mq_lock);
- rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
- if ((rv != 0) && (mq->mq_len == 0) &&
- (nni_list_empty(&mq->mq_aio_putq))) {
+ if (!nni_aio_defer(aio, nni_msgq_cancel, mq)) {
nni_mtx_unlock(&mq->mq_lock);
- nni_aio_finish_error(aio, rv);
return;
}
@@ -359,7 +349,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
}
nni_mtx_lock(&mq->mq_lock);
- while (mq->mq_len > ((unsigned)cap + 1)) {
+ while (mq->mq_len > ((unsigned) cap + 1)) {
// too many messages -- we allow that one for
// the case of pushback or cap == 0.
// we delete the oldest messages first