aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-10-29 07:36:20 -0700
committerGarrett D'Amore <garrett@damore.org>2020-10-29 21:52:57 -0700
commit5b529c298eb8b56eb2df5a86d9274de06d9ce796 (patch)
tree220e887436bcedcb613d9cf5d40cbb7344202e96 /src/protocol
parentcaefd5cb745e2d1e8454dcda6753262f95812de4 (diff)
downloadnng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.tar.gz
nng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.tar.bz2
nng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.zip
fixes #1308 Sub contexts/AIOs can share a single nng_msg
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c38
1 files changed, 31 insertions, 7 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index e9f029a6..2cd05947 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -63,6 +63,7 @@ struct sub0_sock {
nni_pollable readable;
sub0_ctx master; // default context
nni_list contexts; // all contexts
+ int num_contexts;
size_t recv_buf_len;
bool prefer_new;
nni_mtx lk;
@@ -162,6 +163,7 @@ sub0_ctx_fini(void *arg)
nni_mtx_lock(&sock->lk);
nni_list_remove(&sock->contexts, ctx);
+ sock->num_contexts--;
nni_mtx_unlock(&sock->lk);
while ((topic = nni_list_first(&ctx->topics)) != 0) {
@@ -197,6 +199,7 @@ sub0_ctx_init(void *ctx_arg, void *sock_arg)
ctx->sock = sock;
nni_list_append(&sock->contexts, ctx);
+ sock->num_contexts++;
nni_mtx_unlock(&sock->lk);
return (0);
@@ -328,6 +331,7 @@ sub0_recv_cb(void *arg)
nni_list finish;
nng_aio * aio;
bool submatch;
+ nni_msg * dup_msg;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -343,6 +347,7 @@ sub0_recv_cb(void *arg)
body = nni_msg_body(msg);
len = nni_msg_len(msg);
submatch = false;
+ dup_msg = NULL;
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
@@ -357,7 +362,19 @@ sub0_recv_cb(void *arg)
continue;
}
- nni_msg_clone(msg);
+ // This is a performance optimization, that ensures we
+ // do not duplicate a message in the common case, where there
+ // is only a single context.
+ if (sock->num_contexts > 1) {
+ if (nni_msg_dup(&dup_msg, msg) != 0) {
+ // if we cannot dup it, continue on
+ continue;
+ }
+ } else {
+ // We only have one context, so it's the only
+ // possible message.
+ dup_msg = msg;
+ }
// If we got to this point, we are capable of receiving this
// message and it is intended for us.
@@ -366,7 +383,7 @@ sub0_recv_cb(void *arg)
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, msg);
+ nni_aio_set_msg(aio, dup_msg);
// Save for synchronous completion
nni_list_append(&finish, aio);
@@ -376,16 +393,23 @@ sub0_recv_cb(void *arg)
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, msg);
+ (void) nni_lmq_putq(&ctx->lmq, dup_msg);
} else {
- (void) nni_lmq_putq(&ctx->lmq, msg);
+ (void) nni_lmq_putq(&ctx->lmq, dup_msg);
}
}
nni_mtx_unlock(&sock->lk);
- // Drop the first reference we inherited. Any we passed are
- // accounted for in the clones we made.
- nni_msg_free(msg);
+ // NB: This is slightly less efficient in that we may have
+ // created an extra copy in the face of e.g. two subscriptions,
+ // but optimizing this further would require checking the subscription
+ // list twice, adding complexity. If this turns out to be a problem
+ // we could probably add some other sophistication with a counter
+ // and flags on the contexts themselves.
+ if (msg != dup_msg) {
+ // If we didn't just use the message, then free our copy.
+ nni_msg_free(msg);
+ }
while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);