aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c15
1 files changed, 13 insertions, 2 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index fefd79a9..9c71fb04 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -10,6 +10,7 @@
#include <stdlib.h>
#include <string.h>
+#include <stdbool.h>
#include "core/nng_impl.h"
#include "nng/protocol/pubsub0/sub.h"
@@ -353,6 +354,7 @@ sub0_recv_cb(void *arg)
uint8_t * body;
nni_list finish;
nng_aio * aio;
+ bool submatch;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -365,8 +367,9 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+ submatch = false;
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
@@ -391,6 +394,10 @@ sub0_recv_cb(void *arg)
continue; // TODO: Bump a stat!
}
+ // If we got to this point, we are capable of receiving this message
+ // and it is intended for us.
+ submatch = true;
+
if (!nni_list_empty(&ctx->raios)) {
nni_aio *aio = nni_list_first(&ctx->raios);
nni_list_remove(&ctx->raios, aio);
@@ -415,6 +422,10 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}
+ if (submatch) {
+ nni_pollable_raise(sock->recvable);
+ }
+
nni_pipe_recv(p->pipe, p->aio_recv);
}