aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/bus.c21
-rw-r--r--src/protocol/pubsub0/pub.c27
-rw-r--r--src/protocol/pubsub0/sub.c30
-rw-r--r--src/protocol/reqrep0/req.c16
4 files changed, 31 insertions, 63 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index dea228a1..c409292e 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -332,9 +332,7 @@ bus0_sock_getq_cb_raw(void *arg)
{
bus0_sock *s = arg;
bus0_pipe *p;
- bus0_pipe *lastp;
nni_msg * msg;
- nni_msg * dup;
uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
@@ -354,26 +352,13 @@ bus0_sock_getq_cb_raw(void *arg)
}
nni_mtx_lock(&s->mtx);
- if (((lastp = nni_list_last(&s->pipes)) != NULL) &&
- (nni_pipe_id(lastp->npipe) == sender)) {
- // If the last pipe in the list is our sender,
- // then ignore it and move to the one just previous.
- lastp = nni_list_prev(&s->pipes, lastp);
- }
NNI_LIST_FOREACH (&s->pipes, p) {
if (nni_pipe_id(p->npipe) == sender) {
continue;
}
- if (p != lastp) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- msg = NULL;
- }
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+ nni_msg_clone(msg);
+ if (nni_msgq_tryput(p->sendq, msg) != 0) {
+ nni_msg_free(msg);
}
}
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index c6959148..2bd723cc 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -8,8 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
-#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -116,7 +114,7 @@ pub0_pipe_fini(void *arg)
static int
pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
pub0_sock *sock = s;
int rv;
size_t len;
@@ -234,37 +232,30 @@ pub0_sock_send(void *arg, nni_aio *aio)
pub0_sock *sock = arg;
pub0_pipe *p;
nng_msg * msg;
- nng_msg * dup;
size_t len;
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p == nni_list_last(&sock->pipes)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
+
+ nni_msg_clone(msg);
if (p->busy) {
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
- nni_msg * old;
+ nni_msg *old;
(void) nni_lmq_getq(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, dup);
+ nni_lmq_putq(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, dup);
+ nni_aio_set_msg(p->aio_send, msg);
nni_pipe_send(p->pipe, p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
- if (msg != NULL) {
- nng_msg_free(msg);
- }
+ nng_msg_free(msg);
nni_aio_finish(aio, 0, len);
}
@@ -326,7 +317,7 @@ pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static nni_proto_pipe_ops pub0_pipe_ops = {
- .pipe_size = sizeof (pub0_pipe),
+ .pipe_size = sizeof(pub0_pipe),
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
@@ -351,7 +342,7 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof (pub0_sock),
+ .sock_size = sizeof(pub0_sock),
.sock_init = pub0_sock_init,
.sock_fini = pub0_sock_fini,
.sock_open = pub0_sock_open,
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b5dd7834..e9f029a6 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -101,6 +101,7 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&sock->lk);
+again:
if (nni_lmq_empty(&ctx->lmq)) {
int rv;
if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
@@ -118,6 +119,9 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
nni_pollable_clear(&sock->readable);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&sock->lk);
nni_aio_finish(aio, 0, nni_msg_len(msg));
@@ -343,7 +347,6 @@ sub0_recv_cb(void *arg)
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
NNI_LIST_FOREACH (&sock->contexts, ctx) {
- nni_msg *dup;
if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
@@ -354,14 +357,7 @@ sub0_recv_cb(void *arg)
continue;
}
- // Special optimization (for the case where only one context),
- // including when no contexts are in use, we avoid duplication.
- if (ctx == nni_list_last(&sock->contexts)) {
- dup = msg;
- msg = NULL;
- } else if (nni_msg_dup(&dup, msg) != 0) {
- continue; // TODO: Bump a stat!
- }
+ nni_msg_clone(msg);
// If we got to this point, we are capable of receiving this
// message and it is intended for us.
@@ -370,7 +366,7 @@ sub0_recv_cb(void *arg)
if (!nni_list_empty(&ctx->recv_queue)) {
aio = nni_list_first(&ctx->recv_queue);
nni_list_remove(&ctx->recv_queue, aio);
- nni_aio_set_msg(aio, dup);
+ nni_aio_set_msg(aio, msg);
// Save for synchronous completion
nni_list_append(&finish, aio);
@@ -380,24 +376,22 @@ sub0_recv_cb(void *arg)
(void) nni_lmq_getq(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
} else {
- (void) nni_lmq_putq(&ctx->lmq, dup);
+ (void) nni_lmq_putq(&ctx->lmq, msg);
}
}
nni_mtx_unlock(&sock->lk);
+ // Drop the first reference we inherited. Any we passed are
+ // accounted for in the clones we made.
+ nni_msg_free(msg);
+
while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_synch(aio, 0, len);
}
- // We will toss the message if we didn't use it when delivering to
- // the very last context.
- if (msg != NULL) {
- nni_msg_free(msg);
- }
-
if (submatch) {
nni_pollable_raise(&sock->readable);
}
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 1de93929..b8ca498d 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -7,6 +7,7 @@
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -36,7 +37,7 @@ struct req0_ctx {
uint32_t request_id; // request ID, without high bit set
nni_aio * recv_aio; // user aio waiting to recv - only one!
nni_aio * send_aio; // user aio waiting to send
- nng_msg * req_msg; // request message
+ nng_msg * req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg * rep_msg; // reply message
nni_timer_node timer;
@@ -435,7 +436,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
// Note: This routine should be called with the socket lock held.
while ((ctx = nni_list_first(&s->send_queue)) != NULL) {
- nni_msg * msg;
req0_pipe *p;
if ((p = nni_list_first(&s->ready_pipes)) == NULL) {
@@ -458,12 +458,6 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
&ctx->timer, nni_clock() + ctx->retry);
}
- if (nni_msg_dup(&msg, ctx->req_msg) != 0) {
- // Oops. Well, keep trying each context; maybe
- // one of them will get lucky.
- continue;
- }
-
// Put us on the pipe list of active contexts.
// This gives the pipe a chance to kick a resubmit
// if the pipe is removed.
@@ -488,7 +482,11 @@ req0_run_send_queue(req0_sock *s, nni_list *sent_list)
}
}
- nni_aio_set_msg(&p->aio_send, msg);
+ // At this point, we will never give this message back to
+ // to the user, so we don't have to worry about making it
+ // unique. We can freely clone it.
+ nni_msg_clone(ctx->req_msg);
+ nni_aio_set_msg(&p->aio_send, ctx->req_msg);
nni_pipe_send(p->pipe, &p->aio_send);
}
}