aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/protocol/pipeline/pull.c1
-rw-r--r--src/protocol/pubsub/sub.c5
-rw-r--r--src/protocol/survey/respond.c1
-rw-r--r--src/protocol/survey/survey.c1
4 files changed, 7 insertions, 1 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index ba828763..80db2425 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -125,6 +125,7 @@ pull_recv_cb(void *arg)
// Got a message... start the put to send it up to the application.
msg = nni_aio_get_msg(aio);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
nni_aio_set_msg(aio, NULL);
pull_putq(p, msg);
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 7b6f4904..8b2ed209 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -149,14 +149,17 @@ sub_recv_cb(void *arg)
sub_pipe *p = arg;
sub_sock *s = p->sub;
nni_msgq *urq = s->urq;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
return;
}
- nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv));
+ msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ nni_aio_set_msg(p->aio_putq, msg);
nni_msgq_aio_put(urq, p->aio_putq);
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 70dbd704..fcb067b0 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -285,6 +285,7 @@ resp_recv_cb(void *arg)
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, p->id);
// Store the pipe id in the header, first thing.
if (nni_msg_header_append_u32(msg, p->id) != 0) {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 361b9d37..de9df7a5 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -244,6 +244,7 @@ surv_recv_cb(void *arg)
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {