aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/msgqueue.c56
-rw-r--r--src/core/msgqueue.h13
2 files changed, 2 insertions, 67 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3419f029..773fb9aa 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,7 +22,6 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_puterr;
int mq_geterr;
bool mq_closed;
nni_msg **mq_msgs;
@@ -81,7 +80,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
mq->mq_get = 0;
mq->mq_put = 0;
mq->mq_closed = 0;
- mq->mq_puterr = 0;
mq->mq_geterr = 0;
*mqp = mq;
@@ -147,50 +145,6 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
}
void
-nni_msgq_set_put_error(nni_msgq *mq, int error)
-{
- // Let all pending blockers know we are closing the queue.
- nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_closed) {
- // If we were closed, then this error trumps all others.
- error = NNG_ECLOSED;
- }
- if (error != 0) {
- nni_aio *aio;
- while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, error);
- }
- }
- mq->mq_puterr = error;
- nni_msgq_run_notify(mq);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
-nni_msgq_set_error(nni_msgq *mq, int error)
-{
- // Let all pending blockers know we are closing the queue.
- nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_closed) {
- // If we were closed, then this error trumps all others.
- error = NNG_ECLOSED;
- }
- if (error != 0) {
- nni_aio *aio;
- while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
- ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, error);
- }
- }
- mq->mq_puterr = error;
- mq->mq_geterr = error;
- nni_msgq_run_notify(mq);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_flush(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
@@ -375,20 +329,14 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_puterr) {
- nni_atomic_inc64(&mq->mq_put_errs, 1);
- nni_aio_finish_error(aio, mq->mq_puterr);
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
// If this is an instantaneous poll operation, and the queue has
// no room, nobody is waiting to receive, then report NNG_ETIMEDOUT.
rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
- nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_mtx_unlock(&mq->mq_lock);
+ nni_atomic_inc64(&mq->mq_put_errs, 1);
nni_aio_finish_error(aio, rv);
return;
}
@@ -492,7 +440,7 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = true;
- mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
+ mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 1af8461c..3340e846 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -44,19 +44,6 @@ extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
// the message queue.
extern int nni_msgq_tryput(nni_msgq *, nni_msg *);
-// nni_msgq_set_error sets an error condition on the message queue,
-// which causes all current and future readers/writes to return the
-// given error condition (if non-zero). Threads waiting to put or get
-// are woken as well, if non-zero. If zero, then any present error
-// condition is cleared, and waiters are not woken (there shouldn't be
-// any waiters unless it was already zero.)
-extern void nni_msgq_set_error(nni_msgq *, int);
-
-// nni_msgq_set_put_error sets an error condition on the put side of the
-// message queue, and for that side behaves like nni_msgq_set_error.
-// Readers (nni_msgq_get*) are unaffected.
-extern void nni_msgq_set_put_error(nni_msgq *, int);
-
// nni_msgq_set_get_error sets an error condition on the get side of the
// message queue, and for that side behaves like nni_msgq_set_error.
// Readers (nni_msgq_put*) are unaffected.