diff options
Diffstat (limited to 'src/core/message.c')
| -rw-r--r-- | src/core/message.c | 90 |
1 files changed, 84 insertions, 6 deletions
diff --git a/src/core/message.c b/src/core/message.c index af5b0160..888585ad 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -24,9 +24,10 @@ typedef struct { // Underlying message structure. struct nng_msg { - nni_chunk m_header; - nni_chunk m_body; - uint32_t m_pipe; // set on receive + nni_chunk m_header; + nni_chunk m_body; + uint32_t m_pipe; // set on receive + nni_atomic_int m_refcnt; }; #if 0 @@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len) return (0); } +// nni_chunk_room determines the extra space we have left in the chunk. +// This is useful to determine whether we will need to reallocate and +// copy in order to save space. +static size_t +nni_chunk_room(nni_chunk *ch) +{ + return (ch->ch_cap - ch->ch_len); +} + // nni_chunk_insert prepends data to the chunk, as efficiently as possible. // If the data pointer is NULL, then no data is actually copied, but the // data region will have "grown" in the beginning, with uninitialized data. @@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch) return (v); } +void +nni_msg_clone(nni_msg *m) +{ + nni_atomic_inc(&m->m_refcnt); +} + +// This returns either the original message or a new message on success. +// If it fails, then NULL is returned. Either way the original message +// has its reference count dropped (and freed if zero). +nni_msg * +nni_msg_unique(nni_msg *m) +{ + nni_msg *m2; + + // If we already have an exclusive copy, just keep using it. + if (nni_atomic_get(&m->m_refcnt) == 1) { + return (m); + } + // Otherwise we need to make a copy + if (nni_msg_dup(&m2, m) != 0) { + m2 = NULL; + } + nni_msg_free(m); + return (m2); +} + +// nni_msg_pull_up ensures that the message is unique, and that any header +// is merged with the message. The main purpose of doing this is to break +// up the inproc binding -- protocols send messages to inproc with a +// separate header, but they really would like receive a unified +// message so they can pick apart the header. +nni_msg * +nni_msg_pull_up(nni_msg *m) +{ + // This implementation is optimized to ensure that this function + // will not copy the message more than once, and it will not + // allocate unless there is no other option. + if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) || + (nni_atomic_get(&m->m_refcnt) != 1)) { + // We have to duplicate the message. + nni_msg *m2; + uint8_t *dst; + size_t len = nni_msg_len(m) + nni_msg_header_len(m); + if (nni_msg_alloc(&m2, len) != 0) { + return (NULL); + } + dst = nni_msg_body(m2); + len = nni_msg_header_len(m); + memcpy(dst, nni_msg_header(m), len); + dst += len; + memcpy(dst, nni_msg_body(m), nni_msg_len(m)); + nni_msg_free(m); + return (m2); + } + + // At this point, we have a unique instance of the message. + // We also know that we have sufficient space in the message, + // so this insert operation cannot fail. + nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m)); + nni_msg_header_clear(m); + return (m); +} + int nni_msg_alloc(nni_msg **mp, size_t sz) { @@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz) nni_panic("chunk_append failed"); } + // We always start with a single valid reference count. + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *mp = m; return (0); } @@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz) int nni_msg_dup(nni_msg **dup, const nni_msg *src) { - nni_msg * m; - int rv; + nni_msg *m; + int rv; if ((m = NNI_ALLOC_STRUCT(m)) == NULL) { return (NNG_ENOMEM); @@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) } m->m_pipe = src->m_pipe; + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *dup = m; return (0); @@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) void nni_msg_free(nni_msg *m) { - if (m != NULL) { + if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) { nni_chunk_free(&m->m_header); nni_chunk_free(&m->m_body); NNI_FREE_STRUCT(m); |
