diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/message.c | 90 | ||||
| -rw-r--r-- | src/core/message.h | 14 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c | 21 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 27 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 24 |
7 files changed, 138 insertions, 84 deletions
diff --git a/src/core/message.c b/src/core/message.c index af5b0160..888585ad 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -24,9 +24,10 @@ typedef struct { // Underlying message structure. struct nng_msg { - nni_chunk m_header; - nni_chunk m_body; - uint32_t m_pipe; // set on receive + nni_chunk m_header; + nni_chunk m_body; + uint32_t m_pipe; // set on receive + nni_atomic_int m_refcnt; }; #if 0 @@ -243,6 +244,15 @@ nni_chunk_append(nni_chunk *ch, const void *data, size_t len) return (0); } +// nni_chunk_room determines the extra space we have left in the chunk. +// This is useful to determine whether we will need to reallocate and +// copy in order to save space. +static size_t +nni_chunk_room(nni_chunk *ch) +{ + return (ch->ch_cap - ch->ch_len); +} + // nni_chunk_insert prepends data to the chunk, as efficiently as possible. // If the data pointer is NULL, then no data is actually copied, but the // data region will have "grown" in the beginning, with uninitialized data. @@ -387,6 +397,69 @@ nni_chunk_chop_u64(nni_chunk *ch) return (v); } +void +nni_msg_clone(nni_msg *m) +{ + nni_atomic_inc(&m->m_refcnt); +} + +// This returns either the original message or a new message on success. +// If it fails, then NULL is returned. Either way the original message +// has its reference count dropped (and freed if zero). +nni_msg * +nni_msg_unique(nni_msg *m) +{ + nni_msg *m2; + + // If we already have an exclusive copy, just keep using it. + if (nni_atomic_get(&m->m_refcnt) == 1) { + return (m); + } + // Otherwise we need to make a copy + if (nni_msg_dup(&m2, m) != 0) { + m2 = NULL; + } + nni_msg_free(m); + return (m2); +} + +// nni_msg_pull_up ensures that the message is unique, and that any header +// is merged with the message. The main purpose of doing this is to break +// up the inproc binding -- protocols send messages to inproc with a +// separate header, but they really would like receive a unified +// message so they can pick apart the header. +nni_msg * +nni_msg_pull_up(nni_msg *m) +{ + // This implementation is optimized to ensure that this function + // will not copy the message more than once, and it will not + // allocate unless there is no other option. + if (((nni_chunk_room(&m->m_body) < nni_msg_header_len(m))) || + (nni_atomic_get(&m->m_refcnt) != 1)) { + // We have to duplicate the message. + nni_msg *m2; + uint8_t *dst; + size_t len = nni_msg_len(m) + nni_msg_header_len(m); + if (nni_msg_alloc(&m2, len) != 0) { + return (NULL); + } + dst = nni_msg_body(m2); + len = nni_msg_header_len(m); + memcpy(dst, nni_msg_header(m), len); + dst += len; + memcpy(dst, nni_msg_body(m), nni_msg_len(m)); + nni_msg_free(m); + return (m2); + } + + // At this point, we have a unique instance of the message. + // We also know that we have sufficient space in the message, + // so this insert operation cannot fail. + nni_msg_insert(m, nni_msg_header(m), nni_msg_header_len(m)); + nni_msg_header_clear(m); + return (m); +} + int nni_msg_alloc(nni_msg **mp, size_t sz) { @@ -423,6 +496,9 @@ nni_msg_alloc(nni_msg **mp, size_t sz) nni_panic("chunk_append failed"); } + // We always start with a single valid reference count. + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *mp = m; return (0); } @@ -430,8 +506,8 @@ nni_msg_alloc(nni_msg **mp, size_t sz) int nni_msg_dup(nni_msg **dup, const nni_msg *src) { - nni_msg * m; - int rv; + nni_msg *m; + int rv; if ((m = NNI_ALLOC_STRUCT(m)) == NULL) { return (NNG_ENOMEM); @@ -448,6 +524,8 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) } m->m_pipe = src->m_pipe; + nni_atomic_init(&m->m_refcnt); + nni_atomic_set(&m->m_refcnt, 1); *dup = m; return (0); @@ -456,7 +534,7 @@ nni_msg_dup(nni_msg **dup, const nni_msg *src) void nni_msg_free(nni_msg *m) { - if (m != NULL) { + if ((m != NULL) && (nni_atomic_dec_nv(&m->m_refcnt) == 0)) { nni_chunk_free(&m->m_header); nni_chunk_free(&m->m_body); NNI_FREE_STRUCT(m); diff --git a/src/core/message.h b/src/core/message.h index d09a18dc..53b644c4 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -60,6 +60,20 @@ extern uint64_t nni_msg_header_chop_u64(nni_msg *); extern void nni_msg_set_pipe(nni_msg *, uint32_t); extern uint32_t nni_msg_get_pipe(const nni_msg *); +// Reference counting messages. This allows the same message to be +// cheaply reused instead of copied over and over again. Callers of +// this functionality MUST be certain to use nni_msg_unique() before +// passing a message out of their control (e.g. to user programs.) +// Failure to do so will likely result in corruption. +extern void nni_msg_clone(nni_msg *); +extern nni_msg *nni_msg_unique(nni_msg *); +// nni_msg_pull_up ensures that the message is unique, and that any +// header present is "pulled up" into the message body. If the function +// cannot do this for any reason (out of space in the body), then NULL +// is returned. It is the responsibility of the caller to free the +// original message in that case (same semantics as realloc). +extern nni_msg *nni_msg_pull_up(nni_msg *); + // These should only be used when the transport or protocol is absolutely // certain that there is adequate room. There is about 32 bytes of // header and trailer space for a newly allocated message, and transports diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index dea228a1..c409292e 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg) { bus0_sock *s = arg; bus0_pipe *p; - bus0_pipe *lastp; nni_msg * msg; - nni_msg * dup; uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { @@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg) } nni_mtx_lock(&s->mtx); - if (((lastp = nni_list_last(&s->pipes)) != NULL) && - (nni_pipe_id(lastp->npipe) == sender)) { - // If the last pipe in the list is our sender, - // then ignore it and move to the one just previous. - lastp = nni_list_prev(&s->pipes, lastp); - } NNI_LIST_FOREACH (&s->pipes, p) { if (nni_pipe_id(p->npipe) == sender) { continue; } - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); } } nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c6959148..2bd723cc 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -8,8 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> -#include <stdio.h> #include <string.h> #include "core/nng_impl.h" @@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg) static int pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; @@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio) pub0_sock *sock = arg; pub0_pipe *p; nng_msg * msg; - nng_msg * dup; size_t len; msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); NNI_LIST_FOREACH (&sock->pipes, p) { - if (p == nni_list_last(&sock->pipes)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; - } + + nni_msg_clone(msg); if (p->busy) { if (nni_lmq_full(&p->sendq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, dup); + nni_lmq_putq(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, dup); + nni_aio_set_msg(p->aio_send, msg); nni_pipe_send(p->pipe, p->aio_send); } } nni_mtx_unlock(&sock->mtx); - if (msg != NULL) { - nng_msg_free(msg); - } + nng_msg_free(msg); nni_aio_finish(aio, 0, len); } @@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { - .pipe_size = sizeof (pub0_pipe), + .pipe_size = sizeof(pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof (pub0_sock), + .sock_size = sizeof(pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 1de93929..b8ca498d 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -7,6 +7,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" @@ -36,7 +37,7 @@ struct req0_ctx { uint32_t request_id; // request ID, without high bit set nni_aio * recv_aio; // user aio waiting to recv - only one! nni_aio * send_aio; // user aio waiting to send - nng_msg * req_msg; // request message + nng_msg * req_msg; // request message (owned by protocol) size_t req_len; // length of request message (for stats) nng_msg * rep_msg; // reply message nni_timer_node timer; @@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) // Note: This routine should be called with the socket lock held. while ((ctx = nni_list_first(&s->send_queue)) != NULL) { - nni_msg * msg; req0_pipe *p; if ((p = nni_list_first(&s->ready_pipes)) == NULL) { @@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->req_msg) != 0) { - // Oops. Well, keep trying each context; maybe - // one of them will get lucky. - continue; - } - // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. @@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) } } - nni_aio_set_msg(&p->aio_send, msg); + // At this point, we will never give this message back to + // to the user, so we don't have to worry about making it + // unique. We can freely clone it. + nni_msg_clone(ctx->req_msg); + nni_aio_set_msg(&p->aio_send, ctx->req_msg); nni_pipe_send(p->pipe, &p->aio_send); } } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index a8466e78..518871f4 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -157,8 +157,7 @@ inproc_queue_run(inproc_queue *queue) nni_aio *rd; nni_aio *wr; nni_msg *msg; - size_t header_len; - uint8_t *header; + nni_msg *pu; if (((rd = nni_list_first(&queue->readers)) == NULL) || ((wr = nni_list_first(&queue->writers)) == NULL)) { @@ -168,30 +167,25 @@ inproc_queue_run(inproc_queue *queue) msg = nni_aio_get_msg(wr); NNI_ASSERT(msg != NULL); - header_len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - // At this point, we pass success back to the caller. If // we drop the message for any reason, its accounted on the // receiver side. nni_aio_list_remove(wr); nni_aio_set_msg(wr, NULL); - nni_aio_finish(wr, 0, nni_msg_len(msg) + header_len); + nni_aio_finish( + wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); // TODO: We could check the max receive size here. - // Now the receive side. First lets make sure we ensure that - // the message headers are inserted into the body, because - // that is what the protocols expect. - // TODO: This would also be the place to do the work to make - // sure we aren't sharing the message once #1156 integrates. - - if (nni_msg_insert(msg, header, header_len) != 0) { - // TODO: bump a dropped statistic + // Now the receive side. We need to ensure that we have + // an exclusive copy of the message, and pull the header + // up into the body to match protocol expectations. + if ((pu = nni_msg_pull_up(msg)) == NULL) { nni_msg_free(msg); continue; } - nni_msg_header_clear(msg); + msg = pu; + nni_aio_list_remove(rd); nni_aio_set_msg(rd, msg); nni_aio_finish(rd, 0, nni_msg_len(msg)); |
