aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c70
1 files changed, 38 insertions, 32 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index efa0bb72..41127c50 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -26,6 +26,7 @@ struct nni_msgqueue {
int mq_put;
int mq_closed;
int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
nni_msg ** mq_msgs;
};
@@ -136,6 +137,8 @@ int
nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
for (;;) {
@@ -151,7 +154,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
// unbuffered, room for one, and a reader waiting?
- if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) {
+ if (mq->mq_rwait &&
+ (mq->mq_cap == 0) &&
+ (mq->mq_len == mq->mq_cap)) {
break;
}
@@ -167,14 +172,14 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
return (NNG_EAGAIN);
}
- // timedout?
- if (expire <= nni_clock()) {
+ // not writeable, so wait until something changes
+ mq->mq_wwait++;
+ rv = nni_cond_waituntil(&mq->mq_writeable, expire);
+ mq->mq_wwait--;
+ if (rv == NNG_ETIMEDOUT) {
nni_mutex_exit(&mq->mq_lock);
return (NNG_ETIMEDOUT);
}
-
- // not writeable, so wait until something changes
- (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
// Writeable! Yay!!
@@ -185,8 +190,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
mq->mq_put = 0;
}
mq->mq_len++;
- if (mq->mq_len == 1) {
- nni_cond_signal(&mq->mq_readable);
+ if (mq->mq_rwait) {
+ nni_cond_broadcast(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -197,49 +202,50 @@ static int
nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
- if (expire <= nni_clock()) {
- nni_mutex_exit(&mq->mq_lock);
- if (expire == NNI_TIME_ZERO) {
- return (NNG_EAGAIN);
- }
- return (NNG_ETIMEDOUT);
- }
- mq->mq_rwait++;
- if (mq->mq_cap == 0) {
- // If a writer is blocked, unblock him.
- nni_cond_signal(&mq->mq_writeable);
+ for (;;) {
+ // always prefer to deliver data if its there
+ if (mq->mq_len != 0) {
+ break;
}
- (void) nni_cond_waituntil(&mq->mq_readable, expire);
- mq->mq_rwait--;
- }
-
- // If there is any data left in the message queue, we will still
- // provide it, so that the reader can drain.
- if (mq->mq_len == 0) {
if (mq->mq_closed) {
- nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
-
+ if (expire == NNI_TIME_ZERO) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
if (*signal) {
- // We are being interrupted.
nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
+ if ((mq->mq_cap == 0) & (mq->mq_wwait)) {
+ // let a write waiter know we are ready
+ nni_cond_broadcast(&mq->mq_writeable);
+ }
+ mq->mq_rwait++;
+ rv = nni_cond_waituntil(&mq->mq_readable, expire);
+ mq->mq_rwait--;
+ if (rv == NNG_ETIMEDOUT) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
}
+ // Readable! Yay!!
+
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
- if (mq->mq_len == (mq->mq_cap - 1)) {
- nni_cond_signal(&mq->mq_writeable);
+ if (mq->mq_wwait) {
+ nni_cond_broadcast(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);