aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/sub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
-rw-r--r--src/protocol/pubsub0/sub.c30
1 files changed, 12 insertions, 18 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b5dd7834..e9f029a6 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&sock->lk);
+again:
if (nni_lmq_empty(&ctx->lmq)) {
int rv;
if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
@@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
nni_pollable_clear(&sock->readable);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&sock->lk);
nni_aio_finish(aio, 0, nni_msg_len(msg));
@@ -343,7 +347,6 @@ sub0_recv_cb(void *arg)
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
- nni_msg *dup;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -354,14 +357,7 @@ sub0_recv_cb(void *arg)
continue;
}
- // Special optimization (for the case where only one context),
- // including when no contexts are in use, we avoid duplication.
- if (ctx == nni_list_last(&sock->contexts)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue; // TODO: Bump a stat!
- }
+ nni_msg_clone(msg);
// If we got to this point, we are capable of receiving this
// message and it is intended for us.
@@ -370,7 +366,7 @@ sub0_recv_cb(void *arg)
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, dup);
+ nni_aio_set_msg(aio, msg);
// Save for synchronous completion
nni_list_append(&finish, aio);
@@ -380,24 +376,22 @@ sub0_recv_cb(void *arg)
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
} else {
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
}
}
nni_mtx_unlock(&sock->lk);
+ // Drop the first reference we inherited. Any we passed are
+ // accounted for in the clones we made.
+ nni_msg_free(msg);
+
while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_synch(aio, 0, len);
}
- // We will toss the message if we didn't use it when delivering to
- // the very last context.
- if (msg != NULL) {
- nni_msg_free(msg);
- }
-
if (submatch) {
nni_pollable_raise(&sock->readable);
}