diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 35 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 6 |
2 files changed, 41 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 41127c50..3af4e081 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -198,6 +198,41 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } +// nni_msgqueue_putback will attempt to put a single message back +// to the head of the queue. It never blocks. Message queues always +// have room for at least one putback. +int +nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) +{ + nni_mutex_enter(&mq->mq_lock); + + // if closed, we don't put more... this check is first! + if (mq->mq_closed) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ECLOSED); + } + + // room in the queue? + if (mq->mq_len >= mq->mq_cap) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_EAGAIN); + } + + // Subtract one from the get index, possibly wrapping. + mq->mq_get--; + if (mq->mq_get == 0) { + mq->mq_get = mq->mq_cap; + } + mq->mq_msgs[mq->mq_get] = msg; + mq->mq_len++; + if (mq->mq_rwait) { + nni_cond_broadcast(&mq->mq_readable); + } + nni_mutex_exit(&mq->mq_lock); + return (0); +} + + static int nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, nni_time expire, nni_signal *signal) diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 5efbc245..900fed2b 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -45,6 +45,12 @@ extern void nni_msgqueue_destroy(nni_msgqueue *); // the caller is not permitted to access it further. extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *); +// nni_msgqueue_putback returns a message to the head of the queue. +// This is a non-blocking operation, and it returns EAGAIN if there +// is no room. There is always at least room for one putback after +// a message is retried with nni_msgqueue_get. +extern int nni_msgqueue_putback(nni_msgqueue *, nni_msg *); + // nni_msgqueue_get gets the message from the queue. It blocks until a // message is available, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If a message is |
