aboutsummaryrefslogtreecommitdiff
path: root/src/core/msqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msqueue.c')
-rw-r--r--src/core/msqueue.c120
1 files changed, 120 insertions, 0 deletions
diff --git a/src/core/msqueue.c b/src/core/msqueue.c
index 3e4ade79..7becabfb 100644
--- a/src/core/msqueue.c
+++ b/src/core/msqueue.c
@@ -109,3 +109,123 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
nni_free(mq, sizeof (*mq));
}
+
+int
+nni_msgqueue_put(nni_msgqueue_t mq, nng_msg_t msg, int tmout)
+{
+ uint64_t expire, now;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ }
+
+ nni_mutex_enter(mq->mq_lock);
+
+ while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap)) {
+ if (tmout == 0) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ if (tmout < 0) {
+ (void) nni_cond_wait(mq->mq_writeable);
+ continue;
+ }
+
+ now = nni_clock();
+ if (now >= expire) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
+ (void) nni_cond_timedwait(mq->mq_writeable, (expire - now));
+ }
+
+ if (mq->mq_closed) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ mq->mq_msgs[mq->mq_put] = msg;
+ mq->mq_put++;
+ if (mq->mq_put == mq->mq_cap) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ if (mq->mq_len == 1) {
+ (void) nni_cond_signal(mq->mq_readable);
+ }
+ nni_mutex_exit(mq->mq_lock);
+ return (0);
+}
+
+int
+nni_msgqueue_get(nni_msgqueue_t mq, nng_msg_t *msgp, int tmout)
+{
+ uint64_t expire, now;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ }
+
+ nni_mutex_enter(mq->mq_lock);
+
+ while ((!mq->mq_closed) && (mq->mq_len == 0)) {
+ if (tmout == 0) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
+
+ if (tmout < 0) {
+ (void) nni_cond_wait(mq->mq_readable);
+ continue;
+ }
+
+ now = nni_clock();
+ if (now >= expire) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
+ (void) nni_cond_timedwait(mq->mq_readable, (expire - now));
+ }
+
+ if (mq->mq_closed) {
+ nni_mutex_exit(mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
+
+ *msgp = mq->mq_msgs[mq->mq_get];
+ mq->mq_len--;
+ mq->mq_get++;
+ if (mq->mq_get == mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len++;
+ if (mq->mq_len == (mq->mq_cap - 1)) {
+ (void) nni_cond_signal(mq->mq_writeable);
+ }
+ nni_mutex_exit(mq->mq_lock);
+ return (0);
+}
+
+void
+nni_msgqueue_close(nni_msgqueue_t mq)
+{
+ nng_msg_t msg;
+
+ nni_mutex_enter(mq->mq_lock);
+ mq->mq_closed = 1;
+ nni_cond_broadcast(mq->mq_writeable);
+ nni_cond_broadcast(mq->mq_readable);
+
+ /* Free the messages orphaned in the queue. */
+ while (mq->mq_len > 0) {
+ msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nng_msg_free(msg);
+ }
+ nni_mutex_exit(mq->mq_lock);
+}