diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index b8d11e8c..266a8e26 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -18,6 +18,7 @@ struct nni_msgqueue { nni_mutex mq_lock; nni_cond mq_readable; nni_cond mq_writeable; + nni_cond mq_drained; int mq_cap; int mq_len; int mq_get; @@ -52,7 +53,14 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } + if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) { + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); + return (NNG_ENOMEM); + } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { + nni_cond_fini(&mq->mq_drained); nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -75,6 +83,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq) { nni_msg *msg; + nni_cond_fini(&mq->mq_drained); nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -153,7 +162,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } mq->mq_len++; if (mq->mq_len == 1) { - (void) nni_cond_signal(&mq->mq_readable); + nni_cond_signal(&mq->mq_readable); } nni_mutex_exit(&mq->mq_lock); return (0); @@ -181,6 +190,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, // provide it, so that the reader can drain. if (mq->mq_len == 0) { if (mq->mq_closed) { + nni_cond_signal(&mq->mq_drained); nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } @@ -198,9 +208,8 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, if (mq->mq_get == mq->mq_cap) { mq->mq_get = 0; } - mq->mq_len++; if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(&mq->mq_writeable); + nni_cond_signal(&mq->mq_writeable); } nni_mutex_exit(&mq->mq_lock); return (0); @@ -258,10 +267,35 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void -nni_msgqueue_close(nni_msgqueue *mq) +nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) { - nni_msg *msg; + nni_mutex_enter(&mq->mq_lock); + mq->mq_closed = 1; + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_readable); + while (mq->mq_len > 0) { + if (nni_cond_waituntil(&mq->mq_drained, expire) == + NNG_ETIMEDOUT) { + break; + } + } + // If we timedout, free any remaining messages in the queue. + while (mq->mq_len > 0) { + nni_msg *msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_mutex_exit(&mq->mq_lock); +} + +void +nni_msgqueue_close(nni_msgqueue *mq) +{ nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; nni_cond_broadcast(&mq->mq_writeable); @@ -269,7 +303,7 @@ nni_msgqueue_close(nni_msgqueue *mq) // Free the messages orphaned in the queue. while (mq->mq_len > 0) { - msg = mq->mq_msgs[mq->mq_get]; + nni_msg *msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; if (mq->mq_get > mq->mq_cap) { mq->mq_get = 0; |
