aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c52
1 files changed, 9 insertions, 43 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 92213265..697ca503 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -32,10 +32,6 @@ struct nni_msgq {
// Pollable status.
nni_pollable *mq_sendable;
nni_pollable *mq_recvable;
-
- // Filters.
- nni_msgq_filter mq_filter_fn;
- void * mq_filter_arg;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -143,13 +139,6 @@ nni_msgq_flush(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
}
-void
-nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg)
-{
- mq->mq_filter_fn = filter;
- mq->mq_filter_arg = arg;
-}
-
static void
nni_msgq_run_putq(nni_msgq *mq)
{
@@ -167,15 +156,8 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- len = nni_msg_len(msg);
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
-
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -213,13 +195,8 @@ nni_msgq_run_getq(nni_msgq *mq)
}
mq->mq_len--;
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -234,14 +211,8 @@ nni_msgq_run_getq(nni_msgq *mq)
nni_aio_list_remove(waio);
nni_aio_finish(waio, 0, len);
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- len = nni_msg_len(msg);
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -352,14 +323,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- nni_list_remove(&mq->mq_aio_getq, raio);
- nni_aio_finish_msg(raio, msg);
- nni_msgq_run_notify(mq);
- }
+ nni_list_remove(&mq->mq_aio_getq, raio);
+ nni_aio_finish_msg(raio, msg);
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}