diff options
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 7d892b88..b8d11e8c 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -52,7 +52,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } - if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { + if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -90,7 +90,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq) nni_msg_free(msg); } - nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t)); + nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *)); nni_free(mq, sizeof (*mq)); } @@ -130,6 +130,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, (void) nni_cond_waituntil(&mq->mq_writeable, expire); } + // Once a queue is closed, you can't write to it. It can still be + // read from, at least until its empty. if (mq->mq_closed) { nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); @@ -175,19 +177,19 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, (void) nni_cond_waituntil(&mq->mq_readable, expire); } - if (mq->mq_closed) { - nni_mutex_exit(&mq->mq_lock); - return (NNG_ECLOSED); - } + // If there is any data left in the message queue, we will still + // provide it, so that the reader can drain. + if (mq->mq_len == 0) { + if (mq->mq_closed) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ECLOSED); + } - if ((mq->mq_len == 0) && (*signal)) { - // We are being interrupted. We only allow an interrupt - // if there is no data though, because we'd really prefer - // to give back the data. Otherwise our failure to deal - // with the data could lead to starvation; also lingering - // relies on this not interrupting if data is pending. - nni_mutex_exit(&mq->mq_lock); - return (NNG_EINTR); + if (*signal) { + // We are being interrupted. + nni_mutex_exit(&mq->mq_lock); + return (NNG_EINTR); + } } *msgp = mq->mq_msgs[mq->mq_get]; @@ -258,7 +260,7 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void nni_msgqueue_close(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; |
