summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 17:38:46 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 17:38:46 -0800
commit89c847f1f52969ee2ae6ed35018eef40366ca061 (patch)
treeba3f3c2da64e4a74c69d315b2198df59bcd4441b /src/core/msgqueue.c
parent934c1316ae47754a2e368c65228c3cbfe552680f (diff)
downloadnng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.gz
nng-89c847f1f52969ee2ae6ed35018eef40366ca061.tar.bz2
nng-89c847f1f52969ee2ae6ed35018eef40366ca061.zip
Work on endpoints. More C99 & type cleanups.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 7d892b88..b8d11e8c 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -52,7 +52,7 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
- if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
+ if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -90,7 +90,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
nni_msg_free(msg);
}
- nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg_t));
+ nni_free(mq->mq_msgs, mq->mq_cap * sizeof (nng_msg *));
nni_free(mq, sizeof (*mq));
}
@@ -130,6 +130,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
(void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
+ // Once a queue is closed, you can't write to it. It can still be
+ // read from, at least until its empty.
if (mq->mq_closed) {
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
@@ -175,19 +177,19 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
(void) nni_cond_waituntil(&mq->mq_readable, expire);
}
- if (mq->mq_closed) {
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
+ // If there is any data left in the message queue, we will still
+ // provide it, so that the reader can drain.
+ if (mq->mq_len == 0) {
+ if (mq->mq_closed) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
- if ((mq->mq_len == 0) && (*signal)) {
- // We are being interrupted. We only allow an interrupt
- // if there is no data though, because we'd really prefer
- // to give back the data. Otherwise our failure to deal
- // with the data could lead to starvation; also lingering
- // relies on this not interrupting if data is pending.
- nni_mutex_exit(&mq->mq_lock);
- return (NNG_EINTR);
+ if (*signal) {
+ // We are being interrupted.
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EINTR);
+ }
}
*msgp = mq->mq_msgs[mq->mq_get];
@@ -258,7 +260,7 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
nni_msgqueue_close(nni_msgqueue *mq)
{
- nni_msg_t msg;
+ nni_msg *msg;
nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;