aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c110
1 files changed, 104 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 859c0bd9..efa0bb72 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -311,9 +311,8 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
}
// If we timedout, free any remaining messages in the queue.
while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
@@ -333,13 +332,112 @@ nni_msgqueue_close(nni_msgqueue *mq)
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+}
+
+
+int
+nni_msgqueue_len(nni_msgqueue *mq)
+{
+ int rv;
+
+ nni_mutex_enter(&mq->mq_lock);
+ rv = mq->mq_len;
+ nni_mutex_exit(&mq->mq_lock);
+ return (rv);
+}
+
+
+int
+nni_msgqueue_cap(nni_msgqueue *mq)
+{
+ int rv;
+
+ nni_mutex_enter(&mq->mq_lock);
+ rv = mq->mq_cap;
+ nni_mutex_exit(&mq->mq_lock);
+ return (rv);
+}
+
+
+int
+nni_msgqueue_resize(nni_msgqueue *mq, int cap)
+{
+ int alloc;
+ nni_msg *msg;
+ nni_msg **newq, **oldq;
+ int oldget;
+ int oldput;
+ int oldcap;
+ int oldlen;
+ int oldalloc;
+
+ alloc = cap + 2;
+
+ if (alloc > mq->mq_alloc) {
+ newq = nni_alloc(sizeof (nni_msg *) * alloc);
+ if (newq == NULL) {
+ return (NNG_ENOMEM);
+ }
+ } else {
+ newq = NULL;
+ }
+
+ nni_mutex_enter(&mq->mq_lock);
+ while (mq->mq_len > (cap + 1)) {
+ // too many messages -- we allow that one for
+ // the case of pushback or cap == 0.
+ // we delete the oldest messages first
+ msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
nni_msg_free(msg);
}
+ if (newq == NULL) {
+ // Just shrinking the queue, no changes
+ mq->mq_cap = cap;
+ goto out;
+ }
+
+ oldq = mq->mq_msgs;
+ oldget = mq->mq_get;
+ oldput = mq->mq_put;
+ oldcap = mq->mq_cap;
+ oldalloc = mq->mq_alloc;
+ oldlen = mq->mq_len;
+
+ mq->mq_msgs = newq;
+ mq->mq_len = mq->mq_get = mq->mq_put = 0;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+
+ while (oldlen) {
+ mq->mq_msgs[mq->mq_put++] = oldq[oldget++];
+ if (oldget == oldalloc) {
+ oldget = 0;
+ }
+ if (mq->mq_put == mq->mq_alloc) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ oldlen--;
+ }
+ nni_free(oldq, sizeof (nni_msg *) * oldalloc);
+
+out:
+ // Wake everyone up -- we changed everything.
+ nni_cond_broadcast(&mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
+ return (0);
}