aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c74
1 files changed, 32 insertions, 42 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index c1f3701c..3f1ffcb5 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -32,8 +32,12 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
- nni_list mq_aio_notify_get;
- nni_list mq_aio_notify_put;
+
+ // Callback - this function is executed with the lock held, and
+ // provides information about the current queue state anytime
+ // a message enters or leaves the queue, or a waiter is blocked.
+ nni_msgq_cb mq_cb_fn;
+ void * mq_cb_arg;
// Filters.
nni_msgq_filter mq_filter_fn;
@@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
- nni_aio_list_init(&mq->mq_aio_notify_get);
- nni_aio_list_init(&mq->mq_aio_notify_put);
-
nni_mtx_init(&mq->mq_lock);
nni_cv_init(&mq->mq_drained, &mq->mq_lock);
@@ -297,22 +298,25 @@ static void
nni_msgq_run_notify(nni_msgq *mq)
{
nni_aio *aio;
+ if (mq->mq_cb_fn != NULL) {
+ int flags = 0;
- if (mq->mq_closed) {
- return;
- }
- if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
- NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
- // This stays on the list.
- nni_aio_finish(aio, 0, 0);
+ if (mq->mq_closed) {
+ flags |= nni_msgq_f_closed;
}
- }
-
- if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
- // This stays on the list.
- nni_aio_finish(aio, 0, 0);
+ if (mq->mq_len == 0) {
+ flags |= nni_msgq_f_empty;
+ } else if (mq->mq_len == mq->mq_cap) {
+ flags |= nni_msgq_f_full;
+ }
+ if (mq->mq_len < mq->mq_cap ||
+ !nni_list_empty(&mq->mq_aio_getq)) {
+ flags |= nni_msgq_f_can_put;
+ }
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
+ flags |= nni_msgq_f_can_get;
}
+ mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
if (mq->mq_draining) {
@@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq)
}
}
+void
+nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_cb_fn = fn;
+ mq->mq_cb_arg = arg;
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
static void
nni_msgq_cancel(nni_aio *aio, int rv)
{
@@ -336,30 +350,6 @@ nni_msgq_cancel(nni_aio *aio, int rv)
}
void
-nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
-{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- nni_aio_list_append(&mq->mq_aio_notify_put, aio);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
-nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
-{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- nni_aio_list_append(&mq->mq_aio_notify_get, aio);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);