aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c24
1 files changed, 9 insertions, 15 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index a8466e78..518871f4 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -157,8 +157,7 @@ inproc_queue_run(inproc_queue *queue)
nni_aio *rd;
nni_aio *wr;
nni_msg *msg;
- size_t header_len;
- uint8_t *header;
+ nni_msg *pu;
if (((rd = nni_list_first(&queue->readers)) == NULL) ||
((wr = nni_list_first(&queue->writers)) == NULL)) {
@@ -168,30 +167,25 @@ inproc_queue_run(inproc_queue *queue)
msg = nni_aio_get_msg(wr);
NNI_ASSERT(msg != NULL);
- header_len = nni_msg_header_len(msg);
- header = nni_msg_header(msg);
-
// At this point, we pass success back to the caller. If
// we drop the message for any reason, its accounted on the
// receiver side.
nni_aio_list_remove(wr);
nni_aio_set_msg(wr, NULL);
- nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len);
+ nni_aio_finish(
+ wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg));
// TODO: We could check the max receive size here.
- // Now the receive side. First lets make sure we ensure that
- // the message headers are inserted into the body, because
- // that is what the protocols expect.
- // TODO: This would also be the place to do the work to make
- // sure we aren't sharing the message once #1156 integrates.
-
- if (nni_msg_insert(msg, header, header_len) != 0) {
- // TODO: bump a dropped statistic
+ // Now the receive side. We need to ensure that we have
+ // an exclusive copy of the message, and pull the header
+ // up into the body to match protocol expectations.
+ if ((pu = nni_msg_pull_up(msg)) == NULL) {
nni_msg_free(msg);
continue;
}
- nni_msg_header_clear(msg);
+ msg = pu;
+
nni_aio_list_remove(rd);
nni_aio_set_msg(rd, msg);
nni_aio_finish(rd, 0, nni_msg_len(msg));