From b45f876d005371f62fc261a5584c4d7dafd7a0f7 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 7 Dec 2020 21:51:44 -0800 Subject: fixes #1368 Incorrect recv pollfd handing for SUB. This hopefully addresses the problem of both missed poll events (meaning we don't flag the descriptor as pollable), and spurious poll events (which might have happened when a message was delivered synchronously.) --- src/protocol/pubsub0/sub.c | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index a2c40373..c56c8b79 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -330,7 +330,6 @@ sub0_recv_cb(void *arg) uint8_t * body; nni_list finish; nng_aio * aio; - bool submatch; nni_msg * dup_msg; if (nni_aio_result(&p->aio_recv) != 0) { @@ -344,14 +343,14 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - body = nni_msg_body(msg); - len = nni_msg_len(msg); - submatch = false; - dup_msg = NULL; + body = nni_msg_body(msg); + len = nni_msg_len(msg); + dup_msg = NULL; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { + bool queued = false; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -376,10 +375,6 @@ sub0_recv_cb(void *arg) dup_msg = msg; } - // If we got to this point, we are capable of receiving this - // message and it is intended for us. - submatch = true; - if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); @@ -394,8 +389,14 @@ sub0_recv_cb(void *arg) nni_msg_free(old); (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } else { (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } + if (queued && ctx == &sock->master) { + nni_pollable_raise(&sock->readable); } } nni_mtx_unlock(&sock->lk); @@ -416,10 +417,6 @@ sub0_recv_cb(void *arg) nni_aio_finish_sync(aio, 0, len); } - if (submatch) { - nni_pollable_raise(&sock->readable); - } - nni_pipe_recv(p->pipe, &p->aio_recv); } -- cgit v1.2.3-70-g09d2