summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorBehrooze Sirang <behrooze.sirang@postmates.com>2019-04-09 17:37:46 -0700
committerGarrett D'Amore <garrett@damore.org>2019-04-11 21:31:07 -0700
commit90467583b7544b68483334070518e29b00ec6d81 (patch)
treeca31612c9c7e8373e9f3afbbeb432c44297c7a2c /src/protocol
parentc3e062661388f70386d6766e3ce648030af340ee (diff)
downloadnng-90467583b7544b68483334070518e29b00ec6d81.tar.gz
nng-90467583b7544b68483334070518e29b00ec6d81.tar.bz2
nng-90467583b7544b68483334070518e29b00ec6d81.zip
fixes #919 Polling on subscriber socket recvfd seems broken
sub0_recv_cb was not calling nni_pollable_raise on sock->recvable.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c15
1 files changed, 13 insertions, 2 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index fefd79a9..9c71fb04 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -10,6 +10,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdbool.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -353,6 +354,7 @@ sub0_recv_cb(void *arg)
uint8_t * body;
nni_list finish;
nng_aio * aio;
+ bool submatch;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -365,8 +367,9 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+ submatch = false;
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
@@ -391,6 +394,10 @@ sub0_recv_cb(void *arg)
continue; // TODO: Bump a stat!
}
+ // If we got to this point, we are capable of receiving this message
+ // and it is intended for us.
+ submatch = true;
+
if (!nni_list_empty(&ctx->raios)) {
nni_aio *aio = nni_list_first(&ctx->raios);
nni_list_remove(&ctx->raios, aio);
@@ -415,6 +422,10 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}
+ if (submatch) {
+ nni_pollable_raise(sock->recvable);
+ }
+
nni_pipe_recv(p->pipe, p->aio_recv);
}