aboutsummaryrefslogtreecommitdiff
path: root/src/core/message.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/message.c')
-rw-r--r--src/core/message.c90
1 files changed, 84 insertions, 6 deletions
diff --git a/src/core/message.c b/src/core/message.c
index af5b0160..888585ad 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -24,9 +24,10 @@ typedef struct {
// Underlying message structure.
struct nng_msg {
- nni_chunk m_header;
- nni_chunk m_body;
- uint32_t m_pipe; // set on receive
+ nni_chunk m_header;
+ nni_chunk m_body;
+ uint32_t m_pipe; // set on receive
+ nni_atomic_int m_refcnt;
};
#if 0
@@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len)
return (0);
}
+// nni_chunk_room determines the extra space we have left in the chunk.
+// This is useful to determine whether we will need to reallocate and
+// copy in order to save space.
+static size_t
+nni_chunk_room(nni_chunk *ch)
+{
+ return (ch->ch_cap - ch->ch_len);
+}
+
// nni_chunk_insert prepends data to the chunk, as efficiently as possible.
// If the data pointer is NULL, then no data is actually copied, but the
// data region will have "grown" in the beginning, with uninitialized data.
@@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch)
return (v);
}
+void
+nni_msg_clone(nni_msg *m)
+{
+ nni_atomic_inc(&m->m_refcnt);
+}
+
+// This returns either the original message or a new message on success.
+// If it fails, then NULL is returned. Either way the original message
+// has its reference count dropped (and freed if zero).
+nni_msg *
+nni_msg_unique(nni_msg *m)
+{
+ nni_msg *m2;
+
+ // If we already have an exclusive copy, just keep using it.
+ if (nni_atomic_get(&m->m_refcnt) == 1) {
+ return (m);
+ }
+ // Otherwise we need to make a copy
+ if (nni_msg_dup(&m2, m) != 0) {
+ m2 = NULL;
+ }
+ nni_msg_free(m);
+ return (m2);
+}
+
+// nni_msg_pull_up ensures that the message is unique, and that any header
+// is merged with the message. The main purpose of doing this is to break
+// up the inproc binding -- protocols send messages to inproc with a
+// separate header, but they really would like receive a unified
+// message so they can pick apart the header.
+nni_msg *
+nni_msg_pull_up(nni_msg *m)
+{
+ // This implementation is optimized to ensure that this function
+ // will not copy the message more than once, and it will not
+ // allocate unless there is no other option.
+ if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) ||
+ (nni_atomic_get(&m->m_refcnt) != 1)) {
+ // We have to duplicate the message.
+ nni_msg *m2;
+ uint8_t *dst;
+ size_t len = nni_msg_len(m) + nni_msg_header_len(m);
+ if (nni_msg_alloc(&m2, len) != 0) {
+ return (NULL);
+ }
+ dst = nni_msg_body(m2);
+ len = nni_msg_header_len(m);
+ memcpy(dst, nni_msg_header(m), len);
+ dst += len;
+ memcpy(dst, nni_msg_body(m), nni_msg_len(m));
+ nni_msg_free(m);
+ return (m2);
+ }
+
+ // At this point, we have a unique instance of the message.
+ // We also know that we have sufficient space in the message,
+ // so this insert operation cannot fail.
+ nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m));
+ nni_msg_header_clear(m);
+ return (m);
+}
+
int
nni_msg_alloc(nni_msg **mp, size_t sz)
{
@@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
nni_panic("chunk_append failed");
}
+ // We always start with a single valid reference count.
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*mp = m;
return (0);
}
@@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
int
nni_msg_dup(nni_msg **dup, const nni_msg *src)
{
- nni_msg * m;
- int rv;
+ nni_msg *m;
+ int rv;
if ((m = NNI_ALLOC_STRUCT(m)) == NULL) {
return (NNG_ENOMEM);
@@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
}
m->m_pipe = src->m_pipe;
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*dup = m;
return (0);
@@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
void
nni_msg_free(nni_msg *m)
{
- if (m != NULL) {
+ if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) {
nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
NNI_FREE_STRUCT(m);