aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/msgqueue.c35
-rw-r--r--src/core/msgqueue.h6
2 files changed, 41 insertions, 0 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 41127c50..3af4e081 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -198,6 +198,41 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
+// nni_msgqueue_putback will attempt to put a single message back
+// to the head of the queue. It never blocks. Message queues always
+// have room for at least one putback.
+int
+nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg)
+{
+ nni_mutex_enter(&mq->mq_lock);
+
+ // if closed, we don't put more... this check is first!
+ if (mq->mq_closed) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ // room in the queue?
+ if (mq->mq_len >= mq->mq_cap) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ // Subtract one from the get index, possibly wrapping.
+ mq->mq_get--;
+ if (mq->mq_get == 0) {
+ mq->mq_get = mq->mq_cap;
+ }
+ mq->mq_msgs[mq->mq_get] = msg;
+ mq->mq_len++;
+ if (mq->mq_rwait) {
+ nni_cond_broadcast(&mq->mq_readable);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+ return (0);
+}
+
+
static int
nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
nni_time expire, nni_signal *signal)
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 5efbc245..900fed2b 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -45,6 +45,12 @@ extern void nni_msgqueue_destroy(nni_msgqueue *);
// the caller is not permitted to access it further.
extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *);
+// nni_msgqueue_putback returns a message to the head of the queue.
+// This is a non-blocking operation, and it returns EAGAIN if there
+// is no room. There is always at least room for one putback after
+// a message is retried with nni_msgqueue_get.
+extern int nni_msgqueue_putback(nni_msgqueue *, nni_msg *);
+
// nni_msgqueue_get gets the message from the queue. It blocks until a
// message is available, or the queue is closed, returning either 0 on
// success or NNG_ECLOSED if the queue was closed. If a message is