aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/pub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0/pub.c')
-rw-r--r--src/protocol/pubsub0/pub.c27
1 files changed, 9 insertions, 18 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c6959148..2bd723cc 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -8,8 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
-#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg)
static int
pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
@@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio)
pub0_sock *sock = arg;
pub0_pipe *p;
nng_msg * msg;
- nng_msg * dup;
size_t len;
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p == nni_list_last(&sock->pipes)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
+
+ nni_msg_clone(msg);
if (p->busy) {
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, dup);
+ nni_lmq_putq(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, dup);
+ nni_aio_set_msg(p->aio_send, msg);
nni_pipe_send(p->pipe, p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
- if (msg != NULL) {
- nng_msg_free(msg);
- }
+ nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
}
@@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
- .pipe_size = sizeof (pub0_pipe),
+ .pipe_size = sizeof(pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof (pub0_sock),
+ .sock_size = sizeof(pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,