diff options
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 27 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 |
2 files changed, 21 insertions, 36 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index c6959148..2bd723cc 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -8,8 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> -#include <stdio.h> #include <string.h> #include "core/nng_impl.h" @@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg) static int pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - pub0_pipe *p = arg; + pub0_pipe *p = arg; pub0_sock *sock = s; int rv; size_t len; @@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio) pub0_sock *sock = arg; pub0_pipe *p; nng_msg * msg; - nng_msg * dup; size_t len; msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_mtx_lock(&sock->mtx); NNI_LIST_FOREACH (&sock->pipes, p) { - if (p == nni_list_last(&sock->pipes)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; - } + + nni_msg_clone(msg); if (p->busy) { if (nni_lmq_full(&p->sendq)) { // Make space for the new message. - nni_msg * old; + nni_msg *old; (void) nni_lmq_getq(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, dup); + nni_lmq_putq(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, dup); + nni_aio_set_msg(p->aio_send, msg); nni_pipe_send(p->pipe, p->aio_send); } } nni_mtx_unlock(&sock->mtx); - if (msg != NULL) { - nng_msg_free(msg); - } + nng_msg_free(msg); nni_aio_finish(aio, 0, len); } @@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t) } static nni_proto_pipe_ops pub0_pipe_ops = { - .pipe_size = sizeof (pub0_pipe), + .pipe_size = sizeof(pub0_pipe), .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, @@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof (pub0_sock), + .sock_size = sizeof(pub0_sock), .sock_init = pub0_sock_init, .sock_fini = pub0_sock_fini, .sock_open = pub0_sock_open, diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } |
