diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-12-07 21:51:44 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-12-07 22:03:16 -0800 |
| commit | b45f876d005371f62fc261a5584c4d7dafd7a0f7 (patch) | |
| tree | 31cf33f7c9ddbe07766fc1ec0c297ea36216cf1a /src/protocol/pubsub0 | |
| parent | 69a475cb890736bfd808f2dc58c28db10574551b (diff) | |
| download | nng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.tar.gz nng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.tar.bz2 nng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.zip | |
fixes #1368 Incorrect recv pollfd handing for SUB.
This hopefully addresses the problem of both missed poll
events (meaning we don't flag the descriptor as pollable),
and spurious poll events (which might have happened when a
message was delivered synchronously.)
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 23 |
1 files changed, 10 insertions, 13 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index a2c40373..c56c8b79 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -330,7 +330,6 @@ sub0_recv_cb(void *arg) uint8_t * body; nni_list finish; nng_aio * aio; - bool submatch; nni_msg * dup_msg; if (nni_aio_result(&p->aio_recv) != 0) { @@ -344,14 +343,14 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - body = nni_msg_body(msg); - len = nni_msg_len(msg); - submatch = false; - dup_msg = NULL; + body = nni_msg_body(msg); + len = nni_msg_len(msg); + dup_msg = NULL; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { + bool queued = false; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -376,10 +375,6 @@ sub0_recv_cb(void *arg) dup_msg = msg; } - // If we got to this point, we are capable of receiving this - // message and it is intended for us. - submatch = true; - if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); @@ -394,8 +389,14 @@ sub0_recv_cb(void *arg) nni_msg_free(old); (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } else { (void) nni_lmq_putq(&ctx->lmq, dup_msg); + queued = true; + } + if (queued && ctx == &sock->master) { + nni_pollable_raise(&sock->readable); } } nni_mtx_unlock(&sock->lk); @@ -416,10 +417,6 @@ sub0_recv_cb(void *arg) nni_aio_finish_sync(aio, 0, len); } - if (submatch) { - nni_pollable_raise(&sock->readable); - } - nni_pipe_recv(p->pipe, &p->aio_recv); } |
