From 97fb819ccfd0d4cb7f02d7fc521d9478ba050776 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 10 Mar 2017 21:02:57 -0800 Subject: Surveyor pattern callback-driven. --- src/core/msgqueue.c | 106 +++++++++++++++++++++++++++------------------------- src/core/msgqueue.h | 17 --------- 2 files changed, 55 insertions(+), 68 deletions(-) (limited to 'src/core') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 47b98629..60744ba1 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -217,27 +217,43 @@ nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg) void -nni_msgq_set_put_error(nni_msgq *mq, int error) +nni_msgq_set_get_error(nni_msgq *mq, int error) { + nni_aio *naio; + nni_aio *aio; + + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - mq->mq_puterr = error; - if (error) { - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_writeable); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } } + mq->mq_geterr = error; nni_mtx_unlock(&mq->mq_lock); } void -nni_msgq_set_get_error(nni_msgq *mq, int error) +nni_msgq_set_put_error(nni_msgq *mq, int error) { + nni_aio *naio; + nni_aio *aio; + + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - mq->mq_geterr = error; - if (error) { - mq->mq_rwait = 0; - nni_cv_wake(&mq->mq_readable); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } } + mq->mq_puterr = error; nni_mtx_unlock(&mq->mq_lock); } @@ -245,34 +261,27 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) void nni_msgq_set_error(nni_msgq *mq, int error) { - nni_mtx_lock(&mq->mq_lock); - mq->mq_geterr = error; - mq->mq_puterr = error; - if (error) { - mq->mq_rwait = 0; - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); - } - nni_mtx_unlock(&mq->mq_lock); -} - + nni_aio *naio; + nni_aio *aio; -// nni_msgq_signal raises a signal on the signal object. This allows a -// waiter to be signaled, so that it can be woken e.g. due to a pipe closing. -// Note that the signal object must be *zero* if no signal is raised. -void -nni_msgq_signal(nni_msgq *mq, int *signal) -{ + // Let all pending blockers know we are closing the queue. nni_mtx_lock(&mq->mq_lock); - *signal = 1; - - // We have to wake everyone. - mq->mq_rwait = 0; - mq->mq_wwait = 0; - nni_cv_wake(&mq->mq_readable); - nni_cv_wake(&mq->mq_writeable); - nni_cv_wake(&mq->mq_notify_cv); + if (error != 0) { + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, error, 0); + } + } + mq->mq_puterr = error; + mq->mq_geterr = error; nni_mtx_unlock(&mq->mq_lock); } @@ -393,6 +402,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + if (mq->mq_puterr) { + nni_aio_finish(aio, mq->mq_puterr, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -416,6 +430,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } + if (mq->mq_geterr) { + nni_aio_finish(aio, mq->mq_geterr, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -755,14 +774,6 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } -// XXX Remove this later. -int -nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) -{ - return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal)); -} - - int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -789,13 +800,6 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } -int -nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) -{ - return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal)); -} - - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index de23ebac..51143706 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -72,23 +72,6 @@ extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time); // a message from the queue, it will return NNG_ETIMEDOUT. extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time); -// nni_msgq_put_sig is an enhanced version of nni_msgq_put, but it -// can be interrupted by nni_msgqueue_signal using the same final pointer, -// which can be thought of as a turnstile. If interrupted it returns EINTR. -// The turnstile should be initialized to zero. -extern int nni_msgq_put_sig(nni_msgq *, nni_msg *, nni_signal *); - -// nni_msgq_get_sig is an enhanced version of nni_msgq_get_t, but it -// can be interrupted by nni_msgq_signal using the same final pointer, -// which can be thought of as a turnstile. If interrupted it returns EINTR. -// The turnstile should be initialized to zero. -extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *); - -// nni_msgq_signal delivers a signal / interrupt to waiters blocked in -// the msgq, if they have registered an interest in the same turnstile. -// It modifies the turnstile's value under the lock to a non-zero value. -extern void nni_msgq_signal(nni_msgq *, nni_signal *); - // nni_msgq_set_error sets an error condition on the message queue, // which causes all current and future readers/writes to return the // given error condition (if non-zero). Threads waiting to put or get -- cgit v1.2.3-70-g09d2