From 97fb819ccfd0d4cb7f02d7fc521d9478ba050776 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 10 Mar 2017 21:02:57 -0800 Subject: Surveyor pattern callback-driven. --- src/core/msgqueue.c | 106 +++++++++++++++++++++++++++------------------------- 1 file changed, 55 insertions(+), 51 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 47b98629..60744ba1 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -217,27 +217,43 @@ nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) void -nni_msgq_set_put_error(nni_msgq *mq, int error) +nni_msgq_set_get_error(nni_msgq *mq, int error) { + nni_aio *naio; + nni_aio *aio; + + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - mq->mq_puterr = error; - if (error) { - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_writeable); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } } + mq->mq_geterr = error; nni_mtx_unlock(&mq->mq_lock); } void -nni_msgq_set_get_error(nni_msgq *mq, int error) +nni_msgq_set_put_error(nni_msgq *mq, int error) { + nni_aio *naio; + nni_aio *aio; + + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - mq->mq_geterr = error; - if (error) { - mq->mq_rwait = 0; - nni_cv_wake(&mq->mq_readable); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } } + mq->mq_puterr = error; nni_mtx_unlock(&mq->mq_lock); } @@ -245,34 +261,27 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) void nni_msgq_set_error(nni_msgq *mq, int error) { - nni_mtx_lock(&mq->mq_lock); - mq->mq_geterr = error; - mq->mq_puterr = error; - if (error) { - mq->mq_rwait = 0; - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); - } - nni_mtx_unlock(&mq->mq_lock); -} - + nni_aio *naio; + nni_aio *aio; -// nni_msgq_signal raises a signal on the signal object. This allows a -// waiter to be signaled, so that it can be woken e.g. due to a pipe closing. -// Note that the signal object must be *zero* if no signal is raised. -void -nni_msgq_signal(nni_msgq *mq, int *signal) -{ + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - *signal = 1; - - // We have to wake everyone. - mq->mq_rwait = 0; - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_notify_cv); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } + } + mq->mq_puterr = error; + mq->mq_geterr = error; nni_mtx_unlock(&mq->mq_lock); } @@ -393,6 +402,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + if (mq->mq_puterr) { + nni_aio_finish(aio, mq->mq_puterr, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -416,6 +430,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + if (mq->mq_geterr) { + nni_aio_finish(aio, mq->mq_geterr, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -755,14 +774,6 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } -// XXX Remove this later. -int -nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) -{ - return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal)); -} - - int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -789,13 +800,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } -int -nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) -{ - return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal)); -} - - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { -- cgit v1.2.3-70-g09d2