From 8abf75857e8993a25e50d07bdd6d9628f028d7cc Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 19 Jan 2020 11:06:55 -0800 Subject: fixes #1156 Message cloning could help reduce copies a lot This introduces reference counting on messages to reduce the data copies. This should have a marked improvement when moving large messages through the system, or when publishing to many subscribers. For some transports, when using large messages, the copy time can be the dominant factor. Note that when a message is actually shared, inproc will still perform an extra copy in order to ensure that it can modify the headers. This will unfortunately always be the case with REQ, as the REQ protocol keeps a copy of the original message so it can retry. --- src/protocol/bus0/bus.c | 21 +++------------------ src/protocol/pubsub0/pub.c | 27 +++++++++------------------ src/protocol/pubsub0/sub.c | 30 ++++++++++++------------------ src/protocol/reqrep0/req.c | 16 +++++++--------- 4 files changed, 31 insertions(+), 63 deletions(-) (limited to 'src/protocol') diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index dea228a1..c409292e 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg) { bus0_sock *s = arg; bus0_pipe *p; - bus0_pipe *lastp; nni_msg * msg; - nni_msg * dup; uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { @@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg) } nni_mtx_lock(&s->mtx); - if (((lastp = nni_list_last(&s->pipes)) != NULL) && - (nni_pipe_id(lastp->npipe) == sender)) { - // If the last pipe in the list is our sender, - // then ignore it and move to the one just previous. - lastp = nni_list_prev(&s->pipes, lastp); - } NNI_LIST_FOREACH (&s->pipes, p) { if (nni_pipe_id(p->npipe) == sender) { continue; } - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); } } nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c6959148..2bd723cc 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -8,8 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include -#include #include #include "core/nng_impl.h" @@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg) static int pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; @@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio) pub0_sock *sock = arg; pub0_pipe *p; nng_msg * msg; - nng_msg * dup; size_t len; msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); NNI_LIST_FOREACH (&sock->pipes, p) { - if (p == nni_list_last(&sock->pipes)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; - } + + nni_msg_clone(msg); if (p->busy) { if (nni_lmq_full(&p->sendq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, dup); + nni_lmq_putq(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, dup); + nni_aio_set_msg(p->aio_send, msg); nni_pipe_send(p->pipe, p->aio_send); } } nni_mtx_unlock(&sock->mtx); - if (msg != NULL) { - nng_msg_free(msg); - } + nng_msg_free(msg); nni_aio_finish(aio, 0, len); } @@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { - .pipe_size = sizeof (pub0_pipe), + .pipe_size = sizeof(pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof (pub0_sock), + .sock_size = sizeof(pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 1de93929..b8ca498d 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -7,6 +7,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" @@ -36,7 +37,7 @@ struct req0_ctx { uint32_t request_id; // request ID, without high bit set nni_aio * recv_aio; // user aio waiting to recv - only one! nni_aio * send_aio; // user aio waiting to send - nng_msg * req_msg; // request message + nng_msg * req_msg; // request message (owned by protocol) size_t req_len; // length of request message (for stats) nng_msg * rep_msg; // reply message nni_timer_node timer; @@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) // Note: This routine should be called with the socket lock held. while ((ctx = nni_list_first(&s->send_queue)) != NULL) { - nni_msg * msg; req0_pipe *p; if ((p = nni_list_first(&s->ready_pipes)) == NULL) { @@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->req_msg) != 0) { - // Oops. Well, keep trying each context; maybe - // one of them will get lucky. - continue; - } - // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. @@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) } } - nni_aio_set_msg(&p->aio_send, msg); + // At this point, we will never give this message back to + // to the user, so we don't have to worry about making it + // unique. We can freely clone it. + nni_msg_clone(ctx->req_msg); + nni_aio_set_msg(&p->aio_send, ctx->req_msg); nni_pipe_send(p->pipe, &p->aio_send); } } -- cgit v1.2.3-70-g09d2