summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-10 21:02:57 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-10 21:02:57 -0800
commit97fb819ccfd0d4cb7f02d7fc521d9478ba050776 (patch)
treef9517083e33473cfa790611c6cbec4650ceed6f4 /src/core/msgqueue.c
parentb89a02106c3388f40ab4cd3610de102259fa5da0 (diff)
downloadnng-97fb819ccfd0d4cb7f02d7fc521d9478ba050776.tar.gz
nng-97fb819ccfd0d4cb7f02d7fc521d9478ba050776.tar.bz2
nng-97fb819ccfd0d4cb7f02d7fc521d9478ba050776.zip
Surveyor pattern callback-driven.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c106
1 files changed, 55 insertions, 51 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 47b98629..60744ba1 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -217,27 +217,43 @@ nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
void
-nni_msgq_set_put_error(nni_msgq *mq, int error)
+nni_msgq_set_get_error(nni_msgq *mq, int error)
{
+ nni_aio *naio;
+ nni_aio *aio;
+
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- mq->mq_puterr = error;
- if (error) {
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
}
+ mq->mq_geterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
void
-nni_msgq_set_get_error(nni_msgq *mq, int error)
+nni_msgq_set_put_error(nni_msgq *mq, int error)
{
+ nni_aio *naio;
+ nni_aio *aio;
+
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- mq->mq_geterr = error;
- if (error) {
- mq->mq_rwait = 0;
- nni_cv_wake(&mq->mq_readable);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
}
+ mq->mq_puterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -245,34 +261,27 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
void
nni_msgq_set_error(nni_msgq *mq, int error)
{
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_geterr = error;
- mq->mq_puterr = error;
- if (error) {
- mq->mq_rwait = 0;
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
+ nni_aio *naio;
+ nni_aio *aio;
-// nni_msgq_signal raises a signal on the signal object. This allows a
-// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
-// Note that the signal object must be *zero* if no signal is raised.
-void
-nni_msgq_signal(nni_msgq *mq, int *signal)
-{
+ // Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
- *signal = 1;
-
- // We have to wake everyone.
- mq->mq_rwait = 0;
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_notify_cv);
+ if (error != 0) {
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, error, 0);
+ }
+ }
+ mq->mq_puterr = error;
+ mq->mq_geterr = error;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -393,6 +402,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ if (mq->mq_puterr) {
+ nni_aio_finish(aio, mq->mq_puterr, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -416,6 +430,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ if (mq->mq_geterr) {
+ nni_aio_finish(aio, mq->mq_geterr, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -755,14 +774,6 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
-// XXX Remove this later.
-int
-nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
-{
- return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal));
-}
-
-
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -789,13 +800,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
-int
-nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
-{
- return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal));
-}
-
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{