diff options
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index fefd79a9..9c71fb04 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -10,6 +10,7 @@ #include <stdlib.h> #include <string.h> +#include <stdbool.h> #include "core/nng_impl.h" #include "nng/protocol/pubsub0/sub.h" @@ -353,6 +354,7 @@ sub0_recv_cb(void *arg) uint8_t * body; nni_list finish; nng_aio * aio; + bool submatch; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -365,8 +367,9 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - body = nni_msg_body(msg); - len = nni_msg_len(msg); + body = nni_msg_body(msg); + len = nni_msg_len(msg); + submatch = false; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. @@ -391,6 +394,10 @@ sub0_recv_cb(void *arg) continue; // TODO: Bump a stat! } + // If we got to this point, we are capable of receiving this message + // and it is intended for us. + submatch = true; + if (!nni_list_empty(&ctx->raios)) { nni_aio *aio = nni_list_first(&ctx->raios); nni_list_remove(&ctx->raios, aio); @@ -415,6 +422,10 @@ sub0_recv_cb(void *arg) nni_msg_free(msg); } + if (submatch) { + nni_pollable_raise(sock->recvable); + } + nni_pipe_recv(p->pipe, p->aio_recv); } |
