diff options
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus0/bus.c | 21 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 27 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 |
4 files changed, 31 insertions, 63 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index dea228a1..c409292e 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg) { bus0_sock *s = arg; bus0_pipe *p; - bus0_pipe *lastp; nni_msg * msg; - nni_msg * dup; uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { @@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg) } nni_mtx_lock(&s->mtx); - if (((lastp = nni_list_last(&s->pipes)) != NULL) && - (nni_pipe_id(lastp->npipe) == sender)) { - // If the last pipe in the list is our sender, - // then ignore it and move to the one just previous. - lastp = nni_list_prev(&s->pipes, lastp); - } NNI_LIST_FOREACH (&s->pipes, p) { if (nni_pipe_id(p->npipe) == sender) { continue; } - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_msg_clone(msg); + if (nni_msgq_tryput(p->sendq, msg) != 0) { + nni_msg_free(msg); } } nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c6959148..2bd723cc 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -8,8 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> -#include <stdio.h> #include <string.h> #include "core/nng_impl.h" @@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg) static int pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; @@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio) pub0_sock *sock = arg; pub0_pipe *p; nng_msg * msg; - nng_msg * dup; size_t len; msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); NNI_LIST_FOREACH (&sock->pipes, p) { - if (p == nni_list_last(&sock->pipes)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; - } + + nni_msg_clone(msg); if (p->busy) { if (nni_lmq_full(&p->sendq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, dup); + nni_lmq_putq(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, dup); + nni_aio_set_msg(p->aio_send, msg); nni_pipe_send(p->pipe, p->aio_send); } } nni_mtx_unlock(&sock->mtx); - if (msg != NULL) { - nng_msg_free(msg); - } + nng_msg_free(msg); nni_aio_finish(aio, 0, len); } @@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { - .pipe_size = sizeof (pub0_pipe), + .pipe_size = sizeof(pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof (pub0_sock), + .sock_size = sizeof(pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 1de93929..b8ca498d 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -7,6 +7,7 @@ // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" @@ -36,7 +37,7 @@ struct req0_ctx { uint32_t request_id; // request ID, without high bit set nni_aio * recv_aio; // user aio waiting to recv - only one! nni_aio * send_aio; // user aio waiting to send - nng_msg * req_msg; // request message + nng_msg * req_msg; // request message (owned by protocol) size_t req_len; // length of request message (for stats) nng_msg * rep_msg; // reply message nni_timer_node timer; @@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) // Note: This routine should be called with the socket lock held. while ((ctx = nni_list_first(&s->send_queue)) != NULL) { - nni_msg * msg; req0_pipe *p; if ((p = nni_list_first(&s->ready_pipes)) == NULL) { @@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->req_msg) != 0) { - // Oops. Well, keep trying each context; maybe - // one of them will get lucky. - continue; - } - // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. @@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list) } } - nni_aio_set_msg(&p->aio_send, msg); + // At this point, we will never give this message back to + // to the user, so we don't have to worry about making it + // unique. We can freely clone it. + nni_msg_clone(ctx->req_msg); + nni_aio_set_msg(&p->aio_send, ctx->req_msg); nni_pipe_send(p->pipe, &p->aio_send); } } |
