diff options
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 23 |
1 files changed, 10 insertions, 13 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index a2c40373..c56c8b79 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -330,7 +330,6 @@ sub0_recv_cb(void *arg) uint8_t * body; nni_list finish; nng_aio * aio; - bool submatch; nni_msg * dup_msg; if (nni_aio_result(&p->aio_recv) != 0) { @@ -344,14 +343,14 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - body = nni_msg_body(msg); - len = nni_msg_len(msg); - submatch = false; - dup_msg = NULL; + body = nni_msg_body(msg); + len = nni_msg_len(msg); + dup_msg = NULL; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { + bool queued = false; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -376,10 +375,6 @@ sub0_recv_cb(void *arg) dup_msg = msg; } - // If we got to this point, we are capable of receiving this - // message and it is intended for us. - submatch = true; - if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); @@ -394,8 +389,14 @@ sub0_recv_cb(void *arg) nni_msg_free(old); (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } else { (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } + if (queued && ctx == &sock->master) { + nni_pollable_raise(&sock->readable); } } nni_mtx_unlock(&sock->lk); @@ -416,10 +417,6 @@ sub0_recv_cb(void *arg) nni_aio_finish_sync(aio, 0, len); } - if (submatch) { - nni_pollable_raise(&sock->readable); - } - nni_pipe_recv(p->pipe, &p->aio_recv); } |
