summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c111
1 files changed, 66 insertions, 45 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 529e7f4d..212b52c2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,11 +31,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
- // Callback - this function is executed with the lock held, and
- // provides information about the current queue state anytime
- // a message enters or leaves the queue, or a waiter is blocked.
- nni_msgq_cb mq_cb_fn;
- void * mq_cb_arg;
+ // Pollable status.
+ nni_pollable *mq_sendable;
+ nni_pollable *mq_recvable;
// Filters.
nni_msgq_filter mq_filter_fn;
@@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
NNI_FREE_STRUCT(mq);
return (NNG_ENOMEM);
}
-
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- *mqp = mq;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_recvable = NULL;
+ mq->mq_sendable = NULL;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
+ if (mq->mq_sendable) {
+ nni_pollable_free(mq->mq_sendable);
+ }
+ if (mq->mq_recvable) {
+ nni_pollable_free(mq->mq_recvable);
+ }
+
nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
}
@@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
- if (mq->mq_cb_fn != NULL) {
- int flags = 0;
-
- if (mq->mq_closed) {
- flags |= nni_msgq_f_closed;
- }
- if (mq->mq_len == 0) {
- flags |= nni_msgq_f_empty;
- } else if (mq->mq_len == mq->mq_cap) {
- flags |= nni_msgq_f_full;
- }
- if (mq->mq_len < mq->mq_cap ||
- !nni_list_empty(&mq->mq_aio_getq)) {
- flags |= nni_msgq_f_can_put;
- }
- if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- flags |= nni_msgq_f_can_get;
- }
- mq->mq_cb_fn(mq->mq_cb_arg, flags);
+ if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) {
+ nni_pollable_raise(mq->mq_sendable);
+ } else {
+ nni_pollable_clear(mq->mq_sendable);
+ }
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
+ nni_pollable_raise(mq->mq_recvable);
+ } else {
+ nni_pollable_clear(mq->mq_recvable);
}
-}
-
-void
-nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
-{
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_cb_fn = fn;
- mq->mq_cb_arg = arg;
- nni_msgq_run_notify(mq);
- nni_mtx_unlock(&mq->mq_lock);
}
static void
@@ -543,3 +528,39 @@ out:
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
+
+int
+nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_recvable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_recvable;
+ return (0);
+}
+
+int
+nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_sendable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_sendable;
+ return (0);
+}