summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/msgqueue.c11
-rw-r--r--src/protocol/pubsub0/sub.c30
2 files changed, 19 insertions, 22 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index fa94e32f..62f57553 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -403,10 +403,15 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- nni_list_remove(&mq->mq_aio_getq, raio);
- nni_aio_finish_msg(raio, msg);
- nni_msgq_run_notify(mq);
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
+ nni_aio_finish_msg(raio, msg);
+ nni_msgq_run_notify(mq);
+ }
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b4fa2e2a..a7d6b9f5 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -31,7 +31,6 @@ typedef struct sub0_sock sub0_sock;
typedef struct sub0_topic sub0_topic;
static void sub0_recv_cb(void *);
-static void sub0_putq_cb(void *);
static void sub0_pipe_fini(void *);
struct sub0_topic {
@@ -52,7 +51,6 @@ struct sub0_pipe {
nni_pipe * pipe;
sub0_sock *sub;
nni_aio * aio_recv;
- nni_aio * aio_putq;
};
static int
@@ -103,7 +101,6 @@ sub0_pipe_stop(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
@@ -112,7 +109,6 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
NNI_FREE_STRUCT(p);
}
@@ -126,8 +122,7 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
}
@@ -157,7 +152,6 @@ sub0_pipe_close(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_close(p->aio_putq);
nni_aio_close(p->aio_recv);
}
@@ -177,22 +171,20 @@ sub0_recv_cb(void *arg)
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(urq, p->aio_putq);
-}
-static void
-sub0_putq_cb(void *arg)
-{
- sub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ switch (nni_msgq_tryput(urq, msg)) {
+ case 0:
+ break;
+ case NNG_EAGAIN:
+ nni_msg_free(msg);
+ break;
+ default:
+ // Any other error we stop the pipe for. It's probably
+ // NNG_ECLOSED anyway.
+ nng_msg_free(msg);
nni_pipe_stop(p->pipe);
return;
}
-
nni_pipe_recv(p->pipe, p->aio_recv);
}