aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-12-07 21:51:44 -0800
committerGarrett D'Amore <garrett@damore.org>2020-12-07 22:03:16 -0800
commitb45f876d005371f62fc261a5584c4d7dafd7a0f7 (patch)
tree31cf33f7c9ddbe07766fc1ec0c297ea36216cf1a /src/protocol
parent69a475cb890736bfd808f2dc58c28db10574551b (diff)
downloadnng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.tar.gz
nng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.tar.bz2
nng-b45f876d005371f62fc261a5584c4d7dafd7a0f7.zip
fixes #1368 Incorrect recv pollfd handing for SUB.
This hopefully addresses the problem of both missed poll events (meaning we don't flag the descriptor as pollable), and spurious poll events (which might have happened when a message was delivered synchronously.)
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c23
1 files changed, 10 insertions, 13 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index a2c40373..c56c8b79 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -330,7 +330,6 @@ sub0_recv_cb(void *arg)
uint8_t * body;
nni_list finish;
nng_aio * aio;
- bool submatch;
nni_msg * dup_msg;
if (nni_aio_result(&p->aio_recv) != 0) {
@@ -344,14 +343,14 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
- submatch = false;
- dup_msg = NULL;
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+ dup_msg = NULL;
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
+ bool queued = false;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -376,10 +375,6 @@ sub0_recv_cb(void *arg)
dup_msg = msg;
}
- // If we got to this point, we are capable of receiving this
- // message and it is intended for us.
- submatch = true;
-
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
@@ -394,8 +389,14 @@ sub0_recv_cb(void *arg)
nni_msg_free(old);
(void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ queued = true;
+
} else {
(void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ queued = true;
+ }
+ if (queued && ctx == &sock->master) {
+ nni_pollable_raise(&sock->readable);
}
}
nni_mtx_unlock(&sock->lk);
@@ -416,10 +417,6 @@ sub0_recv_cb(void *arg)
nni_aio_finish_sync(aio, 0, len);
}
- if (submatch) {
- nni_pollable_raise(&sock->readable);
- }
-
nni_pipe_recv(p->pipe, &p->aio_recv);
}