aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-09 13:01:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-09 13:01:22 -0800
commit848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3 (patch)
tree24a31a22803355a4a85293cb35c066b32c651b58 /src/core/msgqueue.c
parent02231e715c4cb67d3394ee363a758f6b3e6b53b8 (diff)
downloadnng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.gz
nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.bz2
nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.zip
Add survey test (and fix survey pattern).
As part of this, we've added a way to unblock callers in a message queue with an error, even without a signal channel. This was necessary to interrupt blockers upon survey timeout. They will get NNG_ETIMEDOUT, but afterwards callers get NNG_ESTATE.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c50
1 files changed, 49 insertions, 1 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 931fda68..9ef48f2d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -25,6 +25,8 @@ struct nni_msgq {
int mq_get;
int mq_put;
int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
int mq_rwait; // readers waiting (unbuffered)
int mq_wwait;
nni_msg ** mq_msgs;
@@ -120,6 +122,44 @@ nni_msgq_fini(nni_msgq *mq)
}
+void
+nni_msgq_set_put_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_puterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_writeable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_set_get_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_geterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_readable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_set_error(nni_msgq *mq, int error)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_geterr = error;
+ mq->mq_puterr = error;
+ if (error) {
+ nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_writeable);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
// nni_msgq_signal raises a signal on the signal object. This allows a
// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
// Note that the signal object must be *zero* if no signal is raised.
@@ -150,6 +190,11 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
return (NNG_ECLOSED);
}
+ if ((rv = mq->mq_puterr) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+
// room in the queue?
if (mq->mq_len < mq->mq_cap) {
break;
@@ -185,7 +230,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
}
// Writeable! Yay!!
-
mq->mq_msgs[mq->mq_put] = msg;
mq->mq_put++;
if (mq->mq_put == mq->mq_alloc) {
@@ -251,6 +295,10 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_mtx_unlock(&mq->mq_lock);
return (NNG_ECLOSED);
}
+ if ((rv = mq->mq_geterr) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
if (expire == NNI_TIME_ZERO) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);