diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 110 |
1 files changed, 104 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 859c0bd9..efa0bb72 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -311,9 +311,8 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) } // If we timedout, free any remaining messages in the queue. while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; @@ -333,13 +332,112 @@ nni_msgqueue_close(nni_msgqueue *mq) // Free the messages orphaned in the queue. while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_mutex_exit(&mq->mq_lock); +} + + +int +nni_msgqueue_len(nni_msgqueue *mq) +{ + int rv; + + nni_mutex_enter(&mq->mq_lock); + rv = mq->mq_len; + nni_mutex_exit(&mq->mq_lock); + return (rv); +} + + +int +nni_msgqueue_cap(nni_msgqueue *mq) +{ + int rv; + + nni_mutex_enter(&mq->mq_lock); + rv = mq->mq_cap; + nni_mutex_exit(&mq->mq_lock); + return (rv); +} + + +int +nni_msgqueue_resize(nni_msgqueue *mq, int cap) +{ + int alloc; + nni_msg *msg; + nni_msg **newq, **oldq; + int oldget; + int oldput; + int oldcap; + int oldlen; + int oldalloc; + + alloc = cap + 2; + + if (alloc > mq->mq_alloc) { + newq = nni_alloc(sizeof (nni_msg *) * alloc); + if (newq == NULL) { + return (NNG_ENOMEM); + } + } else { + newq = NULL; + } + + nni_mutex_enter(&mq->mq_lock); + while (mq->mq_len > (cap + 1)) { + // too many messages -- we allow that one for + // the case of pushback or cap == 0. + // we delete the oldest messages first + msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; nni_msg_free(msg); } + if (newq == NULL) { + // Just shrinking the queue, no changes + mq->mq_cap = cap; + goto out; + } + + oldq = mq->mq_msgs; + oldget = mq->mq_get; + oldput = mq->mq_put; + oldcap = mq->mq_cap; + oldalloc = mq->mq_alloc; + oldlen = mq->mq_len; + + mq->mq_msgs = newq; + mq->mq_len = mq->mq_get = mq->mq_put = 0; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + + while (oldlen) { + mq->mq_msgs[mq->mq_put++] = oldq[oldget++]; + if (oldget == oldalloc) { + oldget = 0; + } + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + oldlen--; + } + nni_free(oldq, sizeof (nni_msg *) * oldalloc); + +out: + // Wake everyone up -- we changed everything. + nni_cond_broadcast(&mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_drained); nni_mutex_exit(&mq->mq_lock); + return (0); } |
