aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/message.c90
-rw-r--r--src/core/message.h14
-rw-r--r--src/protocol/bus0/bus.c21
-rw-r--r--src/protocol/pubsub0/pub.c27
-rw-r--r--src/protocol/pubsub0/sub.c30
-rw-r--r--src/protocol/reqrep0/req.c16
-rw-r--r--src/transport/inproc/inproc.c24
7 files changed, 138 insertions, 84 deletions
diff --git a/src/core/message.c b/src/core/message.c
index af5b0160..888585ad 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -24,9 +24,10 @@ typedef struct {
// Underlying message structure.
struct nng_msg {
- nni_chunk m_header;
- nni_chunk m_body;
- uint32_t m_pipe; // set on receive
+ nni_chunk m_header;
+ nni_chunk m_body;
+ uint32_t m_pipe; // set on receive
+ nni_atomic_int m_refcnt;
};
#if 0
@@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len)
return (0);
}
+// nni_chunk_room determines the extra space we have left in the chunk.
+// This is useful to determine whether we will need to reallocate and
+// copy in order to save space.
+static size_t
+nni_chunk_room(nni_chunk *ch)
+{
+ return (ch->ch_cap - ch->ch_len);
+}
+
// nni_chunk_insert prepends data to the chunk, as efficiently as possible.
// If the data pointer is NULL, then no data is actually copied, but the
// data region will have "grown" in the beginning, with uninitialized data.
@@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch)
return (v);
}
+void
+nni_msg_clone(nni_msg *m)
+{
+ nni_atomic_inc(&m->m_refcnt);
+}
+
+// This returns either the original message or a new message on success.
+// If it fails, then NULL is returned. Either way the original message
+// has its reference count dropped (and freed if zero).
+nni_msg *
+nni_msg_unique(nni_msg *m)
+{
+ nni_msg *m2;
+
+ // If we already have an exclusive copy, just keep using it.
+ if (nni_atomic_get(&m->m_refcnt) == 1) {
+ return (m);
+ }
+ // Otherwise we need to make a copy
+ if (nni_msg_dup(&m2, m) != 0) {
+ m2 = NULL;
+ }
+ nni_msg_free(m);
+ return (m2);
+}
+
+// nni_msg_pull_up ensures that the message is unique, and that any header
+// is merged with the message. The main purpose of doing this is to break
+// up the inproc binding -- protocols send messages to inproc with a
+// separate header, but they really would like receive a unified
+// message so they can pick apart the header.
+nni_msg *
+nni_msg_pull_up(nni_msg *m)
+{
+ // This implementation is optimized to ensure that this function
+ // will not copy the message more than once, and it will not
+ // allocate unless there is no other option.
+ if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) ||
+ (nni_atomic_get(&m->m_refcnt) != 1)) {
+ // We have to duplicate the message.
+ nni_msg *m2;
+ uint8_t *dst;
+ size_t len = nni_msg_len(m) + nni_msg_header_len(m);
+ if (nni_msg_alloc(&m2, len) != 0) {
+ return (NULL);
+ }
+ dst = nni_msg_body(m2);
+ len = nni_msg_header_len(m);
+ memcpy(dst, nni_msg_header(m), len);
+ dst += len;
+ memcpy(dst, nni_msg_body(m), nni_msg_len(m));
+ nni_msg_free(m);
+ return (m2);
+ }
+
+ // At this point, we have a unique instance of the message.
+ // We also know that we have sufficient space in the message,
+ // so this insert operation cannot fail.
+ nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m));
+ nni_msg_header_clear(m);
+ return (m);
+}
+
int
nni_msg_alloc(nni_msg **mp, size_t sz)
{
@@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
nni_panic("chunk_append failed");
}
+ // We always start with a single valid reference count.
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*mp = m;
return (0);
}
@@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz)
int
nni_msg_dup(nni_msg **dup, const nni_msg *src)
{
- nni_msg * m;
- int rv;
+ nni_msg *m;
+ int rv;
if ((m = NNI_ALLOC_STRUCT(m)) == NULL) {
return (NNG_ENOMEM);
@@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
}
m->m_pipe = src->m_pipe;
+ nni_atomic_init(&m->m_refcnt);
+ nni_atomic_set(&m->m_refcnt, 1);
*dup = m;
return (0);
@@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src)
void
nni_msg_free(nni_msg *m)
{
- if (m != NULL) {
+ if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) {
nni_chunk_free(&m->m_header);
nni_chunk_free(&m->m_body);
NNI_FREE_STRUCT(m);
diff --git a/src/core/message.h b/src/core/message.h
index d09a18dc..53b644c4 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -60,6 +60,20 @@ extern uint64_t nni_msg_header_chop_u64(nni_msg *);
extern void nni_msg_set_pipe(nni_msg *, uint32_t);
extern uint32_t nni_msg_get_pipe(const nni_msg *);
+// Reference counting messages. This allows the same message to be
+// cheaply reused instead of copied over and over again. Callers of
+// this functionality MUST be certain to use nni_msg_unique() before
+// passing a message out of their control (e.g. to user programs.)
+// Failure to do so will likely result in corruption.
+extern void nni_msg_clone(nni_msg *);
+extern nni_msg *nni_msg_unique(nni_msg *);
+// nni_msg_pull_up ensures that the message is unique, and that any
+// header present is "pulled up" into the message body. If the function
+// cannot do this for any reason (out of space in the body), then NULL
+// is returned. It is the responsibility of the caller to free the
+// original message in that case (same semantics as realloc).
+extern nni_msg *nni_msg_pull_up(nni_msg *);
+
// These should only be used when the transport or protocol is absolutely
// certain that there is adequate room. There is about 32 bytes of
// header and trailer space for a newly allocated message, and transports
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index dea228a1..c409292e 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg)
{
bus0_sock *s = arg;
bus0_pipe *p;
- bus0_pipe *lastp;
nni_msg * msg;
- nni_msg * dup;
uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
@@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg)
}
nni_mtx_lock(&s->mtx);
- if (((lastp = nni_list_last(&s->pipes)) != NULL) &&
- (nni_pipe_id(lastp->npipe) == sender)) {
- // If the last pipe in the list is our sender,
- // then ignore it and move to the one just previous.
- lastp = nni_list_prev(&s->pipes, lastp);
- }
NNI_LIST_FOREACH (&s->pipes, p) {
if (nni_pipe_id(p->npipe) == sender) {
continue;
}
- if (p != lastp) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- msg = NULL;
- }
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+ nni_msg_clone(msg);
+ if (nni_msgq_tryput(p->sendq, msg) != 0) {
+ nni_msg_free(msg);
}
}
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c6959148..2bd723cc 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -8,8 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
-#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg)
static int
pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
@@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio)
pub0_sock *sock = arg;
pub0_pipe *p;
nng_msg * msg;
- nng_msg * dup;
size_t len;
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p == nni_list_last(&sock->pipes)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
+
+ nni_msg_clone(msg);
if (p->busy) {
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, dup);
+ nni_lmq_putq(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, dup);
+ nni_aio_set_msg(p->aio_send, msg);
nni_pipe_send(p->pipe, p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
- if (msg != NULL) {
- nng_msg_free(msg);
- }
+ nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
}
@@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
- .pipe_size = sizeof (pub0_pipe),
+ .pipe_size = sizeof(pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof (pub0_sock),
+ .sock_size = sizeof(pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b5dd7834..e9f029a6 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&sock->lk);
+again:
if (nni_lmq_empty(&ctx->lmq)) {
int rv;
if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
@@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
nni_pollable_clear(&sock->readable);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&sock->lk);
nni_aio_finish(aio, 0, nni_msg_len(msg));
@@ -343,7 +347,6 @@ sub0_recv_cb(void *arg)
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
- nni_msg *dup;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -354,14 +357,7 @@ sub0_recv_cb(void *arg)
continue;
}
- // Special optimization (for the case where only one context),
- // including when no contexts are in use, we avoid duplication.
- if (ctx == nni_list_last(&sock->contexts)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue; // TODO: Bump a stat!
- }
+ nni_msg_clone(msg);
// If we got to this point, we are capable of receiving this
// message and it is intended for us.
@@ -370,7 +366,7 @@ sub0_recv_cb(void *arg)
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, dup);
+ nni_aio_set_msg(aio, msg);
// Save for synchronous completion
nni_list_append(&finish, aio);
@@ -380,24 +376,22 @@ sub0_recv_cb(void *arg)
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
} else {
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
}
}
nni_mtx_unlock(&sock->lk);
+ // Drop the first reference we inherited. Any we passed are
+ // accounted for in the clones we made.
+ nni_msg_free(msg);
+
while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_synch(aio, 0, len);
}
- // We will toss the message if we didn't use it when delivering to
- // the very last context.
- if (msg != NULL) {
- nni_msg_free(msg);
- }
-
if (submatch) {
nni_pollable_raise(&sock->readable);
}
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 1de93929..b8ca498d 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -7,6 +7,7 @@
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -36,7 +37,7 @@ struct req0_ctx {
uint32_t request_id; // request ID, without high bit set
nni_aio * recv_aio; // user aio waiting to recv - only one!
nni_aio * send_aio; // user aio waiting to send
- nng_msg * req_msg; // request message
+ nng_msg * req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg * rep_msg; // reply message
nni_timer_node timer;
@@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
// Note: This routine should be called with the socket lock held.
while ((ctx = nni_list_first(&s->send_queue)) != NULL) {
- nni_msg * msg;
req0_pipe *p;
if ((p = nni_list_first(&s->ready_pipes)) == NULL) {
@@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
&ctx->timer, nni_clock() + ctx->retry);
}
- if (nni_msg_dup(&msg, ctx->req_msg) != 0) {
- // Oops. Well, keep trying each context; maybe
- // one of them will get lucky.
- continue;
- }
-
// Put us on the pipe list of active contexts.
// This gives the pipe a chance to kick a resubmit
// if the pipe is removed.
@@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
}
}
- nni_aio_set_msg(&p->aio_send, msg);
+ // At this point, we will never give this message back to
+ // to the user, so we don't have to worry about making it
+ // unique. We can freely clone it.
+ nni_msg_clone(ctx->req_msg);
+ nni_aio_set_msg(&p->aio_send, ctx->req_msg);
nni_pipe_send(p->pipe, &p->aio_send);
}
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index a8466e78..518871f4 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -157,8 +157,7 @@ inproc_queue_run(inproc_queue *queue)
nni_aio *rd;
nni_aio *wr;
nni_msg *msg;
- size_t header_len;
- uint8_t *header;
+ nni_msg *pu;
if (((rd = nni_list_first(&queue->readers)) == NULL) ||
((wr = nni_list_first(&queue->writers)) == NULL)) {
@@ -168,30 +167,25 @@ inproc_queue_run(inproc_queue *queue)
msg = nni_aio_get_msg(wr);
NNI_ASSERT(msg != NULL);
- header_len = nni_msg_header_len(msg);
- header = nni_msg_header(msg);
-
// At this point, we pass success back to the caller. If
// we drop the message for any reason, its accounted on the
// receiver side.
nni_aio_list_remove(wr);
nni_aio_set_msg(wr, NULL);
- nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len);
+ nni_aio_finish(
+ wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg));
// TODO: We could check the max receive size here.
- // Now the receive side. First lets make sure we ensure that
- // the message headers are inserted into the body, because
- // that is what the protocols expect.
- // TODO: This would also be the place to do the work to make
- // sure we aren't sharing the message once #1156 integrates.
-
- if (nni_msg_insert(msg, header, header_len) != 0) {
- // TODO: bump a dropped statistic
+ // Now the receive side. We need to ensure that we have
+ // an exclusive copy of the message, and pull the header
+ // up into the body to match protocol expectations.
+ if ((pu = nni_msg_pull_up(msg)) == NULL) {
nni_msg_free(msg);
continue;
}
- nni_msg_header_clear(msg);
+ msg = pu;
+
nni_aio_list_remove(rd);
nni_aio_set_msg(rd, msg);
nni_aio_finish(rd, 0, nni_msg_len(msg));