aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c106
-rw-r--r--src/core/msgqueue.h17
2 files changed, 55 insertions, 68 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 47b98629..60744ba1 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -217,27 +217,43 @@ nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
void
-nni_msgq_set_put_error(nni_msgq *mq, int error)
+nni_msgq_set_get_error(nni_msgq *mq, int error)
{
+ nni_aio *naio;
+ nni_aio *aio;
+
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- mq->mq_puterr = error;
- if (error) {
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
}
+ mq->mq_geterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
void
-nni_msgq_set_get_error(nni_msgq *mq, int error)
+nni_msgq_set_put_error(nni_msgq *mq, int error)
{
+ nni_aio *naio;
+ nni_aio *aio;
+
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- mq->mq_geterr = error;
- if (error) {
- mq->mq_rwait = 0;
- nni_cv_wake(&mq->mq_readable);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
}
+ mq->mq_puterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -245,34 +261,27 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
void
nni_msgq_set_error(nni_msgq *mq, int error)
{
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_geterr = error;
- mq->mq_puterr = error;
- if (error) {
- mq->mq_rwait = 0;
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
+ nni_aio *naio;
+ nni_aio *aio;
-// nni_msgq_signal raises a signal on the signal object. This allows a
-// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
-// Note that the signal object must be *zero* if no signal is raised.
-void
-nni_msgq_signal(nni_msgq *mq, int *signal)
-{
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- *signal = 1;
-
- // We have to wake everyone.
- mq->mq_rwait = 0;
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_notify_cv);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
+ }
+ mq->mq_puterr = error;
+ mq->mq_geterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -393,6 +402,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ if (mq->mq_puterr) {
+ nni_aio_finish(aio, mq->mq_puterr, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -416,6 +430,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ if (mq->mq_geterr) {
+ nni_aio_finish(aio, mq->mq_geterr, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -755,14 +774,6 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
-// XXX Remove this later.
-int
-nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
-{
- return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal));
-}
-
-
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -789,13 +800,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
-int
-nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
-{
- return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal));
-}
-
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index de23ebac..51143706 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -72,23 +72,6 @@ extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time);
// a message from the queue, it will return NNG_ETIMEDOUT.
extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time);
-// nni_msgq_put_sig is an enhanced version of nni_msgq_put, but it
-// can be interrupted by nni_msgqueue_signal using the same final pointer,
-// which can be thought of as a turnstile. If interrupted it returns EINTR.
-// The turnstile should be initialized to zero.
-extern int nni_msgq_put_sig(nni_msgq *, nni_msg *, nni_signal *);
-
-// nni_msgq_get_sig is an enhanced version of nni_msgq_get_t, but it
-// can be interrupted by nni_msgq_signal using the same final pointer,
-// which can be thought of as a turnstile. If interrupted it returns EINTR.
-// The turnstile should be initialized to zero.
-extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *);
-
-// nni_msgq_signal delivers a signal / interrupt to waiters blocked in
-// the msgq, if they have registered an interest in the same turnstile.
-// It modifies the turnstile's value under the lock to a non-zero value.
-extern void nni_msgq_signal(nni_msgq *, nni_signal *);
-
// nni_msgq_set_error sets an error condition on the message queue,
// which causes all current and future readers/writes to return the
// given error condition (if non-zero). Threads waiting to put or get