aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/pub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-19 11:06:55 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-20 12:59:45 -0800
commit8abf75857e8993a25e50d07bdd6d9628f028d7cc (patch)
tree15f89948cfa97a44130db224e9e27e51a00e5f76 /src/protocol/pubsub0/pub.c
parentb2ba35251986d2754de5f0f274ee7cbf577223e1 (diff)
downloadnng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.gz
nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.tar.bz2
nng-8abf75857e8993a25e50d07bdd6d9628f028d7cc.zip
fixes #1156 Message cloning could help reduce copies a lot
This introduces reference counting on messages to reduce the data copies. This should have a marked improvement when moving large messages through the system, or when publishing to many subscribers. For some transports, when using large messages, the copy time can be the dominant factor. Note that when a message is actually shared, inproc will still perform an extra copy in order to ensure that it can modify the headers. This will unfortunately always be the case with REQ, as the REQ protocol keeps a copy of the original message so it can retry.
Diffstat (limited to 'src/protocol/pubsub0/pub.c')
-rw-r--r--src/protocol/pubsub0/pub.c27
1 files changed, 9 insertions, 18 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c6959148..2bd723cc 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -8,8 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
-#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg)
static int
pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
@@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio)
pub0_sock *sock = arg;
pub0_pipe *p;
nng_msg * msg;
- nng_msg * dup;
size_t len;
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p == nni_list_last(&sock->pipes)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
+
+ nni_msg_clone(msg);
if (p->busy) {
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, dup);
+ nni_lmq_putq(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, dup);
+ nni_aio_set_msg(p->aio_send, msg);
nni_pipe_send(p->pipe, p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
- if (msg != NULL) {
- nng_msg_free(msg);
- }
+ nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
}
@@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
- .pipe_size = sizeof (pub0_pipe),
+ .pipe_size = sizeof(pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof (pub0_sock),
+ .sock_size = sizeof(pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,