diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-09 13:01:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-09 13:01:22 -0800 |
| commit | 848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3 (patch) | |
| tree | 24a31a22803355a4a85293cb35c066b32c651b58 /src/core/msgqueue.c | |
| parent | 02231e715c4cb67d3394ee363a758f6b3e6b53b8 (diff) | |
| download | nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.gz nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.tar.bz2 nng-848f8f62d7c6d6ea061dd0513a6bffc1ef358ff3.zip | |
Add survey test (and fix survey pattern).
As part of this, we've added a way to unblock callers in a message
queue with an error, even without a signal channel. This was necessary
to interrupt blockers upon survey timeout. They will get NNG_ETIMEDOUT,
but afterwards callers get NNG_ESTATE.
Diffstat (limited to 'src/core/msgqueue.c')
| -rw-r--r-- | src/core/msgqueue.c | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 931fda68..9ef48f2d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -25,6 +25,8 @@ struct nni_msgq { int mq_get; int mq_put; int mq_closed; + int mq_puterr; + int mq_geterr; int mq_rwait; // readers waiting (unbuffered) int mq_wwait; nni_msg ** mq_msgs; @@ -120,6 +122,44 @@ nni_msgq_fini(nni_msgq *mq) } +void +nni_msgq_set_put_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_puterr = error; + if (error) { + nni_cv_wake(&mq->mq_writeable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_set_get_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_geterr = error; + if (error) { + nni_cv_wake(&mq->mq_readable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_set_error(nni_msgq *mq, int error) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_geterr = error; + mq->mq_puterr = error; + if (error) { + nni_cv_wake(&mq->mq_readable); + nni_cv_wake(&mq->mq_writeable); + } + nni_mtx_unlock(&mq->mq_lock); +} + + // nni_msgq_signal raises a signal on the signal object. This allows a // waiter to be signaled, so that it can be woken e.g. due to a pipe closing. // Note that the signal object must be *zero* if no signal is raised. @@ -150,6 +190,11 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) return (NNG_ECLOSED); } + if ((rv = mq->mq_puterr) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + // room in the queue? if (mq->mq_len < mq->mq_cap) { break; @@ -185,7 +230,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) } // Writeable! Yay!! - mq->mq_msgs[mq->mq_put] = msg; mq->mq_put++; if (mq->mq_put == mq->mq_alloc) { @@ -251,6 +295,10 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_mtx_unlock(&mq->mq_lock); return (NNG_ECLOSED); } + if ((rv = mq->mq_geterr) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } if (expire == NNI_TIME_ZERO) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); |
