aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c22
1 files changed, 17 insertions, 5 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 5f4bd333..3ec194fc 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -147,6 +147,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
nni_mtx_lock(&mq->mq_lock);
mq->mq_puterr = error;
if (error) {
+ mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
nni_mtx_unlock(&mq->mq_lock);
@@ -159,6 +160,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
nni_mtx_lock(&mq->mq_lock);
mq->mq_geterr = error;
if (error) {
+ mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
nni_mtx_unlock(&mq->mq_lock);
@@ -172,6 +174,8 @@ nni_msgq_set_error(nni_msgq *mq, int error)
mq->mq_geterr = error;
mq->mq_puterr = error;
if (error) {
+ mq->mq_rwait = 0;
+ mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_writeable);
}
@@ -189,6 +193,8 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
*signal = 1;
// We have to wake everyone.
+ mq->mq_rwait = 0;
+ mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_writeable);
nni_mtx_unlock(&mq->mq_lock);
@@ -242,9 +248,8 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
}
// not writeable, so wait until something changes
- mq->mq_wwait++;
+ mq->mq_wwait = 1;
rv = nni_cv_until(&mq->mq_writeable, expire);
- mq->mq_wwait--;
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_ETIMEDOUT);
@@ -259,6 +264,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
}
mq->mq_len++;
if (mq->mq_rwait) {
+ mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
notify = NNI_MSGQ_NOTIFY_CANGET;
@@ -309,6 +315,7 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
mq->mq_msgs[mq->mq_get] = msg;
mq->mq_len++;
if (mq->mq_rwait) {
+ mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
@@ -359,13 +366,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EINTR);
}
- if ((mq->mq_cap == 0) & (mq->mq_wwait)) {
+ if ((mq->mq_cap == 0) && (mq->mq_wwait)) {
// let a write waiter know we are ready
+ mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
- mq->mq_rwait++;
+ mq->mq_rwait = 1;
rv = nni_cv_until(&mq->mq_readable, expire);
- mq->mq_rwait--;
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_ETIMEDOUT);
@@ -381,6 +388,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
mq->mq_get = 0;
}
if (mq->mq_wwait) {
+ mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
notify = NNI_MSGQ_NOTIFY_CANPUT;
@@ -463,6 +471,8 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
+ mq->mq_wwait = 0;
+ mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
while (mq->mq_len > 0) {
@@ -488,6 +498,8 @@ nni_msgq_close(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
+ mq->mq_wwait = 0;
+ mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);