diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-19 11:06:55 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-20 12:59:45 -0800 |
| commit | 8abf75857e8993a25e50d07bdd6d9628f028d7cc (patch) | |
| tree | 15f89948cfa97a44130db224e9e27e51a00e5f76 /src/transport/inproc | |
| parent | b2ba35251986d2754de5f0f274ee7cbf577223e1 (diff) | |
| download | nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.gz nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.bz2 nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.zip | |
fixes #1156 Message cloning could help reduce copies a lot
This introduces reference counting on messages to reduce the data copies.
This should have a marked improvement when moving large messages through
the system, or when publishing to many subscribers. For some transports,
when using large messages, the copy time can be the dominant factor.
Note that when a message is actually shared, inproc will still perform
an extra copy in order to ensure that it can modify the headers.
This will unfortunately always be the case with REQ, as the REQ protocol
keeps a copy of the original message so it can retry.
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index a8466e78..518871f4 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -157,8 +157,7 @@ inproc_queue_run(inproc_queue *queue) nni_aio *rd; nni_aio *wr; nni_msg *msg; - size_t header_len; - uint8_t *header; + nni_msg *pu; if (((rd = nni_list_first(&queue->readers)) == NULL) || ((wr = nni_list_first(&queue->writers)) == NULL)) { @@ -168,30 +167,25 @@ inproc_queue_run(inproc_queue *queue) msg = nni_aio_get_msg(wr); NNI_ASSERT(msg != NULL); - header_len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - // At this point, we pass success back to the caller. If // we drop the message for any reason, its accounted on the // receiver side. nni_aio_list_remove(wr); nni_aio_set_msg(wr, NULL); - nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len); + nni_aio_finish( + wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); // TODO: We could check the max receive size here. - // Now the receive side. First lets make sure we ensure that - // the message headers are inserted into the body, because - // that is what the protocols expect. - // TODO: This would also be the place to do the work to make - // sure we aren't sharing the message once #1156 integrates. - - if (nni_msg_insert(msg, header, header_len) != 0) { - // TODO: bump a dropped statistic + // Now the receive side. We need to ensure that we have + // an exclusive copy of the message, and pull the header + // up into the body to match protocol expectations. + if ((pu = nni_msg_pull_up(msg)) == NULL) { nni_msg_free(msg); continue; } - nni_msg_header_clear(msg); + msg = pu; + nni_aio_list_remove(rd); nni_aio_set_msg(rd, msg); nni_aio_finish(rd, 0, nni_msg_len(msg)); |
