aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0')
-rw-r--r--src/protocol/pubsub0/pub.c27
-rw-r--r--src/protocol/pubsub0/sub.c30
2 files changed, 21 insertions, 36 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c6959148..2bd723cc 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -8,8 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
-#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg)
static int
pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
@@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio)
pub0_sock *sock = arg;
pub0_pipe *p;
nng_msg * msg;
- nng_msg * dup;
size_t len;
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p == nni_list_last(&sock->pipes)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
+
+ nni_msg_clone(msg);
if (p->busy) {
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, dup);
+ nni_lmq_putq(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, dup);
+ nni_aio_set_msg(p->aio_send, msg);
nni_pipe_send(p->pipe, p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
- if (msg != NULL) {
- nng_msg_free(msg);
- }
+ nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
}
@@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
- .pipe_size = sizeof (pub0_pipe),
+ .pipe_size = sizeof(pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof (pub0_sock),
+ .sock_size = sizeof(pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b5dd7834..e9f029a6 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&sock->lk);
+again:
if (nni_lmq_empty(&ctx->lmq)) {
int rv;
if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
@@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
nni_pollable_clear(&sock->readable);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&sock->lk);
nni_aio_finish(aio, 0, nni_msg_len(msg));
@@ -343,7 +347,6 @@ sub0_recv_cb(void *arg)
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
- nni_msg *dup;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -354,14 +357,7 @@ sub0_recv_cb(void *arg)
continue;
}
- // Special optimization (for the case where only one context),
- // including when no contexts are in use, we avoid duplication.
- if (ctx == nni_list_last(&sock->contexts)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue; // TODO: Bump a stat!
- }
+ nni_msg_clone(msg);
// If we got to this point, we are capable of receiving this
// message and it is intended for us.
@@ -370,7 +366,7 @@ sub0_recv_cb(void *arg)
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, dup);
+ nni_aio_set_msg(aio, msg);
// Save for synchronous completion
nni_list_append(&finish, aio);
@@ -380,24 +376,22 @@ sub0_recv_cb(void *arg)
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
} else {
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
}
}
nni_mtx_unlock(&sock->lk);
+ // Drop the first reference we inherited. Any we passed are
+ // accounted for in the clones we made.
+ nni_msg_free(msg);
+
while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_synch(aio, 0, len);
}
- // We will toss the message if we didn't use it when delivering to
- // the very last context.
- if (msg != NULL) {
- nni_msg_free(msg);
- }
-
if (submatch) {
nni_pollable_raise(&sock->readable);
}