diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-19 11:06:55 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-20 12:59:45 -0800 |
| commit | 8abf75857e8993a25e50d07bdd6d9628f028d7cc (patch) | |
| tree | 15f89948cfa97a44130db224e9e27e51a00e5f76 /src/protocol/pubsub0/sub.c | |
| parent | b2ba35251986d2754de5f0f274ee7cbf577223e1 (diff) | |
| download | nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.gz nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.bz2 nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.zip | |
fixes #1156 Message cloning could help reduce copies a lot
This introduces reference counting on messages to reduce the data copies.
This should have a marked improvement when moving large messages through
the system, or when publishing to many subscribers. For some transports,
when using large messages, the copy time can be the dominant factor.
Note that when a message is actually shared, inproc will still perform
an extra copy in order to ensure that it can modify the headers.
This will unfortunately always be the case with REQ, as the REQ protocol
keeps a copy of the original message so it can retry.
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 30 |
1 files changed, 12 insertions, 18 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b5dd7834..e9f029a6 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&sock->lk); +again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { @@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio) if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -343,7 +347,6 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { - nni_msg *dup; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. @@ -354,14 +357,7 @@ sub0_recv_cb(void *arg) continue; } - // Special optimization (for the case where only one context), - // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->contexts)) { - dup = msg; - msg = NULL; - } else if (nni_msg_dup(&dup, msg) != 0) { - continue; // TODO: Bump a stat! - } + nni_msg_clone(msg); // If we got to this point, we are capable of receiving this // message and it is intended for us. @@ -370,7 +366,7 @@ sub0_recv_cb(void *arg) if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); - nni_aio_set_msg(aio, dup); + nni_aio_set_msg(aio, msg); // Save for synchronous completion nni_list_append(&finish, aio); @@ -380,24 +376,22 @@ sub0_recv_cb(void *arg) (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } else { - (void) nni_lmq_putq(&ctx->lmq, dup); + (void) nni_lmq_putq(&ctx->lmq, msg); } } nni_mtx_unlock(&sock->lk); + // Drop the first reference we inherited. Any we passed are + // accounted for in the clones we made. + nni_msg_free(msg); + while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_synch(aio, 0, len); } - // We will toss the message if we didn't use it when delivering to - // the very last context. - if (msg != NULL) { - nni_msg_free(msg); - } - if (submatch) { nni_pollable_raise(&sock->readable); } |
