diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-09-27 18:44:07 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-27 18:44:07 -0700 |
| commit | ba5b984528b6c500aed063af7f43ea24fa394f4e (patch) | |
| tree | 34583464c833b07bc4179c9f4b1cff8b6c67ad05 | |
| parent | 0736a958673683a9bfe0bf577b696f49c7bd8302 (diff) | |
| download | nng-ba5b984528b6c500aed063af7f43ea24fa394f4e.tar.gz nng-ba5b984528b6c500aed063af7f43ea24fa394f4e.tar.bz2 nng-ba5b984528b6c500aed063af7f43ea24fa394f4e.zip | |
fixes #85 Protocols need to set msg pipe
We looked at other options, but this is the least intrusive, even though
it means that the protocols have to set it up. The reason is that transports
have different methods of receiving messages, and there is no framework code
between the transport and the protocol.
| -rw-r--r-- | src/protocol/pipeline/pull.c | 1 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 5 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 1 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 1 |
4 files changed, 7 insertions, 1 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index ba828763..80db2425 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -125,6 +125,7 @@ pull_recv_cb(void *arg) // Got a message... start the put to send it up to the application. msg = nni_aio_get_msg(aio); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); nni_aio_set_msg(aio, NULL); pull_putq(p, msg); } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 7b6f4904..8b2ed209 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -149,14 +149,17 @@ sub_recv_cb(void *arg) sub_pipe *p = arg; sub_sock *s = p->sub; nni_msgq *urq = s->urq; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); return; } - nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv)); + msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + nni_aio_set_msg(p->aio_putq, msg); nni_msgq_aio_put(urq, p->aio_putq); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 70dbd704..fcb067b0 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -285,6 +285,7 @@ resp_recv_cb(void *arg) msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); // Store the pipe id in the header, first thing. if (nni_msg_header_append_u32(msg, p->id) != 0) { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 361b9d37..de9df7a5 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -244,6 +244,7 @@ surv_recv_cb(void *arg) msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { |
