diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-10-29 07:36:20 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-10-29 21:52:57 -0700 |
| commit | 5b529c298eb8b56eb2df5a86d9274de06d9ce796 (patch) | |
| tree | 220e887436bcedcb613d9cf5d40cbb7344202e96 /src/protocol/pubsub0 | |
| parent | caefd5cb745e2d1e8454dcda6753262f95812de4 (diff) | |
| download | nng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.tar.gz nng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.tar.bz2 nng-5b529c298eb8b56eb2df5a86d9274de06d9ce796.zip | |
fixes #1308 Sub contexts/AIOs can share a single nng_msg
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 38 |
1 files changed, 31 insertions, 7 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index e9f029a6..2cd05947 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -63,6 +63,7 @@ struct sub0_sock { nni_pollable readable; sub0_ctx master; // default context nni_list contexts; // all contexts + int num_contexts; size_t recv_buf_len; bool prefer_new; nni_mtx lk; @@ -162,6 +163,7 @@ sub0_ctx_fini(void *arg) nni_mtx_lock(&sock->lk); nni_list_remove(&sock->contexts, ctx); + sock->num_contexts--; nni_mtx_unlock(&sock->lk); while ((topic = nni_list_first(&ctx->topics)) != 0) { @@ -197,6 +199,7 @@ sub0_ctx_init(void *ctx_arg, void *sock_arg) ctx->sock = sock; nni_list_append(&sock->contexts, ctx); + sock->num_contexts++; nni_mtx_unlock(&sock->lk); return (0); @@ -328,6 +331,7 @@ sub0_recv_cb(void *arg) nni_list finish; nng_aio * aio; bool submatch; + nni_msg * dup_msg; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -343,6 +347,7 @@ sub0_recv_cb(void *arg) body = nni_msg_body(msg); len = nni_msg_len(msg); submatch = false; + dup_msg = NULL; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. @@ -357,7 +362,19 @@ sub0_recv_cb(void *arg) continue; } - nni_msg_clone(msg); + // This is a performance optimization, that ensures we + // do not duplicate a message in the common case, where there + // is only a single context. + if (sock->num_contexts > 1) { + if (nni_msg_dup(&dup_msg, msg) != 0) { + // if we cannot dup it, continue on + continue; + } + } else { + // We only have one context, so it's the only + // possible message. + dup_msg = msg; + } // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -366,7 +383,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, msg); + nni_aio_set_msg(aio, dup_msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -376,16 +393,23 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, msg); + (void) nni_lmq_putq(&ctx->lmq, dup_msg); } else { - (void) nni_lmq_putq(&ctx->lmq, msg); + (void) nni_lmq_putq(&ctx->lmq, dup_msg); } } nni_mtx_unlock(&sock->lk); - // Drop the first reference we inherited. Any we passed are - // accounted for in the clones we made. - nni_msg_free(msg); + // NB: This is slightly less efficient in that we may have + // created an extra copy in the face of e.g. two subscriptions, + // but optimizing this further would require checking the subscription + // list twice, adding complexity. If this turns out to be a problem + // we could probably add some other sophistication with a counter + // and flags on the contexts themselves. + if (msg != dup_msg) { + // If we didn't just use the message, then free our copy. + nni_msg_free(msg); + } while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); |
