aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-19 11:06:55 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-20 12:59:45 -0800
commit8abf75857e8993a25e50d07bdd6d9628f028d7cc (patch)
tree15f89948cfa97a44130db224e9e27e51a00e5f76 /src/core
parentb2ba35251986d2754de5f0f274ee7cbf577223e1 (diff)
downloadnng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.gz
nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.bz2
nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.zip
fixes #1156 Message cloning could help reduce copies a lot
This introduces reference counting on messages to reduce the data copies. This should have a marked improvement when moving large messages through the system, or when publishing to many subscribers. For some transports, when using large messages, the copy time can be the dominant factor. Note that when a message is actually shared, inproc will still perform an extra copy in order to ensure that it can modify the headers. This will unfortunately always be the case with REQ, as the REQ protocol keeps a copy of the original message so it can retry.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/message.c90
-rw-r--r--src/core/message.h14
2 files changed, 98 insertions, 6 deletions
diff --git a/src/core/message.c b/src/core/message.c
index af5b0160..888585ad 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -24,9 +24,10 @@ typedef struct {
// Underlying message structure.
struct nng_msg {
- nni_chunk m_header;
- nni_chunk m_body;
- uint32_t m_pipe; // set on receive
+ nni_chunk m_header;
+ nni_chunk m_body;
+ uint32_t m_pipe; // set on receive
+ nni_atomic_int m_refcnt;
};
#if 0
@@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len)
return (0);
}
+// nni_chunk_room determines the extra space we have left in the chunk.
+// This is useful to determine whether we will need to reallocate and
+// copy in order to save space.
+static size_t
+nni_chunk_room(nni_chunk *ch)
+{
+ return (ch->ch_cap - ch->ch_len);
+}
+
// nni_chunk_insert prepends data to the chunk, as efficiently as possible.
// If the data pointer is NULL, then no data is actually copied, but the
// data region will have "grown" in the beginning, with uninitialized data.
@@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch)
return (v);
}
+void
+nni_msg_clone(nni_msg *m)
+{
+ nni_atomic_inc(&m->m_refcnt);
+}
+
+// This returns either the original message or a new message on success.
+// If it fails, then NULL is returned. Either way the original message
+// has its reference count dropped (and freed if zero).
+nni_msg *
+nni_msg_unique(nni_msg *m)
+{
+ nni_msg *m2;
+
+ // If we already have an exclusive copy, just keep using it.
+ if (nni_atomic_get(&m->m_refcnt) == 1) {
+ return (m);
+ }
+ // Otherwise we need to make a copy
+ if (nni_msg_dup(&m2, m) != 0) {
+ m2 = NULL;
+ }
+ nni_msg_free(m);
+ return (m2);
+}
+
+// nni_msg_pull_up ensures that the message is unique, and that any header
+// is merged with the message. The main purpose of doing this is to break
+// up the inproc binding -- protocols send messages to inproc with a
+// separate header, but they really would like receive a unified
+// message so they can pick apart the header.
+nni_msg *
+nni_msg_pull_up(nni_msg *m)
+{
+ // This implementation is optimized to ensure that this function
+ // will not copy the message more than once, and it will not
+ // allocate unless there is no other option.
+ if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) ||
+ (nni_atomic_get(&m->m_refcnt) != 1)) {
+ // We have to duplicate the message.
+ nni_msg *m2;
+ uint8_t *dst;
+ size_t len = nni_msg_len(m) + nni_msg_header_len(m);
+ if (nni_msg_alloc(&m2, len) != 0) {
+ return (NULL);
+ }
+ dst = nni_msg_body(m2);
+ len = nni_msg_header_len(m);
+ memcpy(dst, nni_msg_header(m), len);
+ dst += len;
+ memcpy(dst, nni_msg_body(m), nni_msg_len(m));
+ nni_msg_free(m);
+ return (m2);
+ }
+
+ // At this point, we have a unique instance of the message.
+ // We also know that we have sufficient space in the message,
+ // so this insert operation cannot fail.
+ nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m));
+ nni_msg_header_clear(m);
+ return (m);
+}
+
int
nni_msg_alloc(nni_msg **mp, size_t sz)
{
@@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
nni_panic("chunk_append failed");
}
+ // We always start with a single valid reference count.
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*mp = m;
return (0);
}
@@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
int
nni_msg_dup(nni_msg **dup, const nni_msg *src)
{
- nni_msg * m;
- int rv;
+ nni_msg *m;
+ int rv;
if ((m = NNI_ALLOC_STRUCT(m)) == NULL) {
return (NNG_ENOMEM);
@@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
}
m->m_pipe = src->m_pipe;
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*dup = m;
return (0);
@@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
void
nni_msg_free(nni_msg *m)
{
- if (m != NULL) {
+ if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) {
nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
NNI_FREE_STRUCT(m);
diff --git a/src/core/message.h b/src/core/message.h
index d09a18dc..53b644c4 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -60,6 +60,20 @@ extern uint64_t nni_msg_header_chop_u64(nni_msg *);
extern void nni_msg_set_pipe(nni_msg *, uint32_t);
extern uint32_t nni_msg_get_pipe(const nni_msg *);
+// Reference counting messages. This allows the same message to be
+// cheaply reused instead of copied over and over again. Callers of
+// this functionality MUST be certain to use nni_msg_unique() before
+// passing a message out of their control (e.g. to user programs.)
+// Failure to do so will likely result in corruption.
+extern void nni_msg_clone(nni_msg *);
+extern nni_msg *nni_msg_unique(nni_msg *);
+// nni_msg_pull_up ensures that the message is unique, and that any
+// header present is "pulled up" into the message body. If the function
+// cannot do this for any reason (out of space in the body), then NULL
+// is returned. It is the responsibility of the caller to free the
+// original message in that case (same semantics as realloc).
+extern nni_msg *nni_msg_pull_up(nni_msg *);
+
// These should only be used when the transport or protocol is absolutely
// certain that there is adequate room. There is about 32 bytes of
// header and trailer space for a newly allocated message, and transports