diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-19 11:06:55 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-20 12:59:45 -0800 |
| commit | 8abf75857e8993a25e50d07bdd6d9628f028d7cc (patch) | |
| tree | 15f89948cfa97a44130db224e9e27e51a00e5f76 | |
| parent | b2ba35251986d2754de5f0f274ee7cbf577223e1 (diff) | |
| download | nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.gz nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.bz2 nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.zip | |
fixes #1156 Message cloning could help reduce copies a lot
This introduces reference counting on messages to reduce the data copies.
This should have a marked improvement when moving large messages through
the system, or when publishing to many subscribers. For some transports,
when using large messages, the copy time can be the dominant factor.
Note that when a message is actually shared, inproc will still perform
an extra copy in order to ensure that it can modify the headers.
This will unfortunately always be the case with REQ, as the REQ protocol
keeps a copy of the original message so it can retry.
| -rw-r--r-- | src/core/message.c | 90 | ||||
| -rw-r--r-- | src/core/message.h | 14 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c | 21 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 27 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 24 |
7 files changed, 138 insertions, 84 deletions
diff --git a/src/core/message.c b/src/core/message.c index af5b0160..888585ad 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -24,9 +24,10 @@ typedef struct { // Underlying message structure. struct nng_msg { - nni_chunk m_header; - nni_chunk m_body; - uint32_t m_pipe; // set on receive + nni_chunk m_header; + nni_chunk m_body; + uint32_t m_pipe; // set on receive + nni_atomic_int m_refcnt; }; #if 0 @@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len) return (0); } +// nni_chunk_room determines the extra space we have left in the chunk. +// This is useful to determine whether we will need to reallocate and +// copy in order to save space. +static size_t +nni_chunk_room(nni_chunk *ch) +{ + return (ch->ch_cap - ch->ch_len); +} + // nni_chunk_insert prepends data to the chunk, as efficiently as possible. // If the data pointer is NULL, then no data is actually copied, but the // data region will have "grown" in the beginning, with uninitialized data. @@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch) return (v); } +void +nni_msg_clone(nni_msg *m) +{ + nni_atomic_inc(&m->m_refcnt); +} + +// This returns either the original message or a new message on success. +// If it fails, then NULL is returned. Either way the original message +// has its reference count dropped (and freed if zero). +nni_msg * +nni_msg_unique(nni_msg *m) +{ + nni_msg *m2; + + // If we already have an exclusive copy, just keep using it. + if (nni_atomic_get(&m->m_refcnt) == 1) { + return (m); + } + // Otherwise we need to make a copy + if (nni_msg_dup(&m2, m) != 0) { + m2 = NULL; + } + nni_msg_free(m); + return (m2); +} + +// nni_msg_pull_up ensures that the message is unique, and that any header +// is merged with the message. The main purpose of doing this is to break +// up the inproc binding -- protocols send messages to inproc with a +// separate header, but they really would like receive a unified +// message so they can pick apart the header. +nni_msg * +nni_msg_pull_up(nni_msg *m) +{ + // This implementation is optimized to ensure that this function + // will not copy the message more than once, and it will not + // allocate unless there is no other option. + if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) || + (nni_atomic_get(&m->m_refcnt) != 1)) { + // We have to duplicate the message. + nni_msg *m2; + uint8_t *dst; + size_t len = nni_msg_len(m) + nni_msg_header_len(m); + if (nni_msg_alloc(&m2, len) != 0) { + return (NULL); + } + dst = nni_msg_body(m2); + len = nni_msg_header_len(m); + memcpy(dst, nni_msg_header(m), len); + dst += len; + memcpy(dst, nni_msg_body(m), nni_msg_len(m)); + nni_msg_free(m); + return (m2); + } + + // At this point, we have a unique instance of the message. + // We also know that we have sufficient space in the message, + // so this insert operation cannot fail. + nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m)); + nni_msg_header_clear(m); + return (m); +} + int nni_msg_alloc(nni_msg **mp, size_t sz) { @@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz) nni_panic("chunk_append failed"); } + // We always start with a single valid reference count. + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *mp = m; return (0); } @@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz) int nni_msg_dup(nni_msg **dup, const nni_msg *src) { - nni_msg * m; - int rv; + nni_msg *m; + int rv; if ((m = NNI_ALLOC_STRUCT(m)) == NULL) { return (NNG_ENOMEM); @@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) } m->m_pipe = src->m_pipe; + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *dup = m; return (0); @@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) void nni_msg_free(nni_msg *m) { - if (m != NULL) { + if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) { nni_chunk_free(&m->m_header); nni_chunk_free(&m->m_body); NNI_FREE_STRUCT(m); diff --git a/src/core/message.h b/src/core/message.h index d09a18dc..53b644c4 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -60,6 +60,20 @@ extern uint64_t nni_msg_header_chop_u64(nni_msg *); extern void nni_msg_set_pipe(nni_msg *, uint32_t); extern uint32_t nni_msg_get_pipe(const nni_msg *); +// Reference counting messages. This allows the same message to be +// cheaply reused instead of copied over and over again. Callers of +// this functionality MUST be certain to use nni_msg_unique() before +// passing a message out of their control (e.g. to user programs.) +// Failure to do so will likely result in corruption. +extern void nni_msg_clone(nni_msg *); +extern nni_msg *nni_msg_unique(nni_msg *); +// nni_msg_pull_up ensures that the message is unique, and that any +// header present is "pulled up" into the message body. If the function +// cannot do this for any reason (out of space in the body), then NULL +// is returned. It is the responsibility of the caller to free the +// original message in that case (same semantics as realloc). +extern nni_msg *nni_msg_pull_up(nni_msg *); + // These should only be used when the transport or protocol is absolutely // certain that there is adequate room. There is about 32 bytes of // header and trailer space for a newly allocated message, and transports diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index dea228a1..c409292e 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg) { bus0_sock *s = arg; bus0_pipe *p; - bus0_pipe *lastp; nni_msg * msg; - nni_msg * dup; uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { @@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg) } nni_mtx_lock(&s->mtx); - if (((lastp = nni_list_last(&s->pipes)) != NULL) && - (nni_pipe_id(lastp->npipe) == sender)) { - // If the last pipe in the list is our sender, - // then ignore it and move to the one just previous. - lastp = nni_list_prev(&s->pipes, lastp); - } NNI_LIST_FOREACH (&s->pipes, p) { if (nni_pipe_id(p->npipe) == sender) { continue; } - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); } } nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c6959148..2bd723cc 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -8,8 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> -#include <stdio.h> #include <string.h> #include "core/nng_impl.h" @@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg) static int pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; @@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio) pub0_sock *sock = arg; pub0_pipe *p; nng_msg * msg; - nng_msg * dup; size_t len; msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); NNI_LIST_FOREACH (&sock->pipes, p) { - if (p == nni_list_last(&sock->pipes)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; - } + + nni_msg_clone(msg); if (p->busy) { if (nni_lmq_full(&p->sendq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, dup); + nni_lmq_putq(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, dup); + nni_aio_set_msg(p->aio_send, msg); nni_pipe_send(p->pipe, p->aio_send); } } nni_mtx_unlock(&sock->mtx); - if (msg != NULL) { - nng_msg_free(msg); - } + nng_msg_free(msg); nni_aio_finish(aio, 0, len); } @@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { - .pipe_size = sizeof (pub0_pipe), + .pipe_size = sizeof(pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof (pub0_sock), + .sock_size = sizeof(pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 1de93929..b8ca498d 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -7,6 +7,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" @@ -36,7 +37,7 @@ struct req0_ctx { uint32_t request_id; // request ID, without high bit set nni_aio * recv_aio; // user aio waiting to recv - only one! nni_aio * send_aio; // user aio waiting to send - nng_msg * req_msg; // request message + nng_msg * req_msg; // request message (owned by protocol) size_t req_len; // length of request message (for stats) nng_msg * rep_msg; // reply message nni_timer_node timer; @@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) // Note: This routine should be called with the socket lock held. while ((ctx = nni_list_first(&s->send_queue)) != NULL) { - nni_msg * msg; req0_pipe *p; if ((p = nni_list_first(&s->ready_pipes)) == NULL) { @@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->req_msg) != 0) { - // Oops. Well, keep trying each context; maybe - // one of them will get lucky. - continue; - } - // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. @@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) } } - nni_aio_set_msg(&p->aio_send, msg); + // At this point, we will never give this message back to + // to the user, so we don't have to worry about making it + // unique. We can freely clone it. + nni_msg_clone(ctx->req_msg); + nni_aio_set_msg(&p->aio_send, ctx->req_msg); nni_pipe_send(p->pipe, &p->aio_send); } } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index a8466e78..518871f4 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -157,8 +157,7 @@ inproc_queue_run(inproc_queue *queue) nni_aio *rd; nni_aio *wr; nni_msg *msg; - size_t header_len; - uint8_t *header; + nni_msg *pu; if (((rd = nni_list_first(&queue->readers)) == NULL) || ((wr = nni_list_first(&queue->writers)) == NULL)) { @@ -168,30 +167,25 @@ inproc_queue_run(inproc_queue *queue) msg = nni_aio_get_msg(wr); NNI_ASSERT(msg != NULL); - header_len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - // At this point, we pass success back to the caller. If // we drop the message for any reason, its accounted on the // receiver side. nni_aio_list_remove(wr); nni_aio_set_msg(wr, NULL); - nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len); + nni_aio_finish( + wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); // TODO: We could check the max receive size here. - // Now the receive side. First lets make sure we ensure that - // the message headers are inserted into the body, because - // that is what the protocols expect. - // TODO: This would also be the place to do the work to make - // sure we aren't sharing the message once #1156 integrates. - - if (nni_msg_insert(msg, header, header_len) != 0) { - // TODO: bump a dropped statistic + // Now the receive side. We need to ensure that we have + // an exclusive copy of the message, and pull the header + // up into the body to match protocol expectations. + if ((pu = nni_msg_pull_up(msg)) == NULL) { nni_msg_free(msg); continue; } - nni_msg_header_clear(msg); + msg = pu; + nni_aio_list_remove(rd); nni_aio_set_msg(rd, msg); nni_aio_finish(rd, 0, nni_msg_len(msg)); |
