aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c23
1 files changed, 10 insertions, 13 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index a2c40373..c56c8b79 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -330,7 +330,6 @@ sub0_recv_cb(void *arg)
uint8_t * body;
nni_list finish;
nng_aio * aio;
- bool submatch;
nni_msg * dup_msg;
if (nni_aio_result(&p->aio_recv) != 0) {
@@ -344,14 +343,14 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
- submatch = false;
- dup_msg = NULL;
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+ dup_msg = NULL;
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
+ bool queued = false;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -376,10 +375,6 @@ sub0_recv_cb(void *arg)
dup_msg = msg;
}
- // If we got to this point, we are capable of receiving this
- // message and it is intended for us.
- submatch = true;
-
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
@@ -394,8 +389,14 @@ sub0_recv_cb(void *arg)
nni_msg_free(old);
(void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ queued = true;
+
} else {
(void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ queued = true;
+ }
+ if (queued && ctx == &sock->master) {
+ nni_pollable_raise(&sock->readable);
}
}
nni_mtx_unlock(&sock->lk);
@@ -416,10 +417,6 @@ sub0_recv_cb(void *arg)
nni_aio_finish_sync(aio, 0, len);
}
- if (submatch) {
- nni_pollable_raise(&sock->readable);
- }
-
nni_pipe_recv(p->pipe, &p->aio_recv);
}