aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-25 17:38:14 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-25 17:38:14 -0800
commit6237d268514e1f8aec562052954db22c4540eec3 (patch)
tree32213ea0016ae10faee2817f414308c91d881c42
parent7a54bcd6fe345f35dd51eede6c5d66e8516c16ab (diff)
downloadnng-6237d268514e1f8aec562052954db22c4540eec3.tar.gz
nng-6237d268514e1f8aec562052954db22c4540eec3.tar.bz2
nng-6237d268514e1f8aec562052954db22c4540eec3.zip
Provide a tiny buf for lmq buffer by default.
This allows us to make nni_lmq_init() non-failing. (Although the buffer size requested at initialization might not be granted.)
-rw-r--r--src/core/lmq.c69
-rw-r--r--src/core/lmq.h9
-rw-r--r--src/sp/protocol/pair0/pair.c16
-rw-r--r--src/sp/protocol/pair1/pair.c16
-rw-r--r--src/sp/protocol/pipeline0/push.c6
-rw-r--r--src/sp/protocol/pubsub0/pub.c86
-rw-r--r--src/sp/protocol/pubsub0/sub.c19
-rw-r--r--src/sp/protocol/survey0/survey.c22
8 files changed, 113 insertions, 130 deletions
diff --git a/src/core/lmq.c b/src/core/lmq.c
index 10ca9958..d01afe50 100644
--- a/src/core/lmq.c
+++ b/src/core/lmq.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -13,30 +13,27 @@
// message queues, but are less "featured", but more useful for
// performance sensitive contexts. Locking must be done by the caller.
-int
+
+// Note that initialization of a queue is guaranteed to succeed.
+// However, if the requested capacity is larger than 2, and memory
+// cannot be allocated, then the capacity will only be 2.
+void
nni_lmq_init(nni_lmq *lmq, size_t cap)
{
- size_t alloc;
-
- // We prefer alloc to a power of 2, this allows us to do modulo
- // operations as a power of two, for efficiency. It does possibly
- // waste some space, but never more than 2x. Consumers should try
- // for powers of two if they are concerned about efficiency.
- alloc = 2;
- while (alloc < cap) {
- alloc *= 2;
- }
- if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
- return (NNG_ENOMEM);
+ lmq->lmq_len = 0;
+ lmq->lmq_get = 0;
+ lmq->lmq_put = 0;
+ lmq->lmq_alloc = 0;
+ lmq->lmq_mask = 0;
+ lmq->lmq_msgs = NULL;
+ lmq->lmq_msgs = lmq->lmq_buf;
+ lmq->lmq_cap = 2;
+ lmq->lmq_mask = 0x1; // only index 0 and 1
+ if (cap > 2) {
+ (void) nni_lmq_resize(lmq, cap);
+ } else {
+ lmq->lmq_cap = cap;
}
- lmq->lmq_cap = cap;
- lmq->lmq_alloc = alloc;
- lmq->lmq_mask = (alloc - 1);
- lmq->lmq_len = 0;
- lmq->lmq_get = 0;
- lmq->lmq_put = 0;
-
- return (0);
}
void
@@ -53,8 +50,9 @@ nni_lmq_fini(nni_lmq *lmq)
lmq->lmq_len--;
nni_msg_free(msg);
}
-
- nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ if (lmq->lmq_alloc > 0) {
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ }
}
void
@@ -93,7 +91,7 @@ nni_lmq_empty(nni_lmq *lmq)
}
int
-nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
+nni_lmq_put(nni_lmq *lmq, nng_msg *msg)
{
if (lmq->lmq_len >= lmq->lmq_cap) {
return (NNG_EAGAIN);
@@ -105,7 +103,7 @@ nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
}
int
-nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
+nni_lmq_get(nni_lmq *lmq, nng_msg **mp)
{
nng_msg *msg;
if (lmq->lmq_len == 0) {
@@ -114,15 +112,15 @@ nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
msg = lmq->lmq_msgs[lmq->lmq_get++];
lmq->lmq_get &= lmq->lmq_mask;
lmq->lmq_len--;
- *msgp = msg;
+ *mp = msg;
return (0);
}
int
nni_lmq_resize(nni_lmq *lmq, size_t cap)
{
- nng_msg * msg;
- nng_msg **newq;
+ nng_msg *msg;
+ nng_msg **new_q;
size_t alloc;
size_t len;
@@ -131,21 +129,22 @@ nni_lmq_resize(nni_lmq *lmq, size_t cap)
alloc *= 2;
}
- newq = nni_alloc(sizeof(nng_msg *) * alloc);
- if (newq == NULL) {
+ if ((new_q = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
return (NNG_ENOMEM);
}
len = 0;
- while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
- newq[len++] = msg;
+ while ((len < cap) && (nni_lmq_get(lmq, &msg) == 0)) {
+ new_q[len++] = msg;
}
// Flush anything left over.
nni_lmq_flush(lmq);
- nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
- lmq->lmq_msgs = newq;
+ if (lmq->lmq_alloc > 0) {
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ }
+ lmq->lmq_msgs = new_q;
lmq->lmq_cap = cap;
lmq->lmq_alloc = alloc;
lmq->lmq_mask = alloc - 1;
diff --git a/src/core/lmq.h b/src/core/lmq.h
index 0a64c984..e38a7441 100644
--- a/src/core/lmq.h
+++ b/src/core/lmq.h
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -23,15 +23,16 @@ typedef struct nni_lmq {
size_t lmq_get;
size_t lmq_put;
nng_msg **lmq_msgs;
+ nng_msg *lmq_buf[2]; // default minimal buffer
} nni_lmq;
-extern int nni_lmq_init(nni_lmq *, size_t);
+extern void nni_lmq_init(nni_lmq *, size_t);
extern void nni_lmq_fini(nni_lmq *);
extern void nni_lmq_flush(nni_lmq *);
extern size_t nni_lmq_len(nni_lmq *);
extern size_t nni_lmq_cap(nni_lmq *);
-extern int nni_lmq_putq(nni_lmq *, nng_msg *);
-extern int nni_lmq_getq(nni_lmq *, nng_msg **);
+extern int nni_lmq_put(nni_lmq *lmq, nng_msg *msg);
+extern int nni_lmq_get(nni_lmq *lmq, nng_msg **mp);
extern int nni_lmq_resize(nni_lmq *, size_t);
extern bool nni_lmq_full(nni_lmq *);
extern bool nni_lmq_empty(nni_lmq *);
diff --git a/src/sp/protocol/pair0/pair.c b/src/sp/protocol/pair0/pair.c
index c6470b7b..24c88d36 100644
--- a/src/sp/protocol/pair0/pair.c
+++ b/src/sp/protocol/pair0/pair.c
@@ -217,7 +217,7 @@ pair0_pipe_recv_cb(void *arg)
// maybe we have room in the rmq?
if (!nni_lmq_full(&s->rmq)) {
- nni_lmq_putq(&s->rmq, msg);
+ nni_lmq_put(&s->rmq, msg);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_recv(p->pipe, &p->aio_recv);
} else {
@@ -245,14 +245,14 @@ pair0_send_sched(pair0_sock *s)
s->wr_ready = true;
// if message waiting in buffered queue, then we prefer that.
- if (nni_lmq_getq(&s->wmq, &m) == 0) {
+ if (nni_lmq_get(&s->wmq, &m) == 0) {
pair0_pipe_send(p, m);
if ((a = nni_list_first(&s->waq)) != NULL) {
nni_aio_list_remove(a);
m = nni_aio_get_msg(a);
l = nni_msg_len(m);
- nni_lmq_putq(&s->wmq, m);
+ nni_lmq_put(&s->wmq, m);
}
} else if ((a = nni_list_first(&s->waq)) != NULL) {
@@ -311,8 +311,8 @@ pair0_sock_close(void *arg)
nni_aio_list_remove(a);
nni_aio_finish_error(a, NNG_ECLOSED);
}
- while ((nni_lmq_getq(&s->rmq, &m) == 0) ||
- (nni_lmq_getq(&s->wmq, &m) == 0)) {
+ while ((nni_lmq_get(&s->rmq, &m) == 0) ||
+ (nni_lmq_get(&s->wmq, &m) == 0)) {
nni_msg_free(m);
}
nni_mtx_unlock(&s->mtx);
@@ -359,7 +359,7 @@ pair0_sock_send(void *arg, nni_aio *aio)
}
// Can we maybe queue it.
- if (nni_lmq_putq(&s->wmq, m) == 0) {
+ if (nni_lmq_put(&s->wmq, m) == 0) {
// Yay, we can. So we're done.
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, len);
@@ -396,14 +396,14 @@ pair0_sock_recv(void *arg, nni_aio *aio)
// Buffered read. If there is a message waiting for us, pick
// it up. We might need to post another read request as well.
- if (nni_lmq_getq(&s->rmq, &m) == 0) {
+ if (nni_lmq_get(&s->rmq, &m) == 0) {
nni_aio_set_msg(aio, m);
nni_aio_finish(aio, 0, nni_msg_len(m));
if (s->rd_ready) {
s->rd_ready = false;
m = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
- nni_lmq_putq(&s->rmq, m);
+ nni_lmq_put(&s->rmq, m);
nni_pipe_recv(p->pipe, &p->aio_recv);
}
if (nni_lmq_empty(&s->rmq)) {
diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c
index e6be4628..636e51e9 100644
--- a/src/sp/protocol/pair1/pair.c
+++ b/src/sp/protocol/pair1/pair.c
@@ -358,7 +358,7 @@ pair1_pipe_recv_cb(void *arg)
// maybe we have room in the rmq?
if (!nni_lmq_full(&s->rmq)) {
- nni_lmq_putq(&s->rmq, msg);
+ nni_lmq_put(&s->rmq, msg);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_recv(pipe, &p->aio_recv);
} else {
@@ -386,14 +386,14 @@ pair1_send_sched(pair1_sock *s)
s->wr_ready = true;
// if message waiting in buffered queue, then we prefer that.
- if (nni_lmq_getq(&s->wmq, &m) == 0) {
+ if (nni_lmq_get(&s->wmq, &m) == 0) {
pair1_pipe_send(p, m);
if ((a = nni_list_first(&s->waq)) != NULL) {
nni_aio_list_remove(a);
m = nni_aio_get_msg(a);
l = nni_msg_len(m);
- nni_lmq_putq(&s->wmq, m);
+ nni_lmq_put(&s->wmq, m);
}
} else if ((a = nni_list_first(&s->waq)) != NULL) {
@@ -452,8 +452,8 @@ pair1_sock_close(void *arg)
nni_aio_list_remove(a);
nni_aio_finish_error(a, NNG_ECLOSED);
}
- while ((nni_lmq_getq(&s->rmq, &m) == 0) ||
- (nni_lmq_getq(&s->wmq, &m) == 0)) {
+ while ((nni_lmq_get(&s->rmq, &m) == 0) ||
+ (nni_lmq_get(&s->wmq, &m) == 0)) {
nni_msg_free(m);
}
nni_mtx_unlock(&s->mtx);
@@ -574,7 +574,7 @@ inject:
}
// Can we maybe queue it.
- if (nni_lmq_putq(&s->wmq, m) == 0) {
+ if (nni_lmq_put(&s->wmq, m) == 0) {
// Yay, we can. So we're done.
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, len);
@@ -611,14 +611,14 @@ pair1_sock_recv(void *arg, nni_aio *aio)
// Buffered read. If there is a message waiting for us, pick
// it up. We might need to post another read request as well.
- if (nni_lmq_getq(&s->rmq, &m) == 0) {
+ if (nni_lmq_get(&s->rmq, &m) == 0) {
nni_aio_set_msg(aio, m);
nni_aio_finish(aio, 0, nni_msg_len(m));
if (s->rd_ready) {
s->rd_ready = false;
m = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
- nni_lmq_putq(&s->rmq, m);
+ nni_lmq_put(&s->rmq, m);
nni_pipe_recv(p->pipe, &p->aio_recv);
}
if (nni_lmq_empty(&s->rmq)) {
diff --git a/src/sp/protocol/pipeline0/push.c b/src/sp/protocol/pipeline0/push.c
index 028104cd..99cb2da4 100644
--- a/src/sp/protocol/pipeline0/push.c
+++ b/src/sp/protocol/pipeline0/push.c
@@ -193,7 +193,7 @@ push0_pipe_ready(push0_pipe *p)
// if message is waiting in the buffered queue
// then we prefer that.
- if (nni_lmq_getq(&s->wq, &m) == 0) {
+ if (nni_lmq_get(&s->wq, &m) == 0) {
nni_aio_set_msg(&p->aio_send, m);
nni_pipe_send(p->pipe, &p->aio_send);
@@ -201,7 +201,7 @@ push0_pipe_ready(push0_pipe *p)
nni_aio_list_remove(a);
m = nni_aio_get_msg(a);
l = nni_msg_len(m);
- nni_lmq_putq(&s->wq, m);
+ nni_lmq_put(&s->wq, m);
}
} else if ((a = nni_list_first(&s->aq)) != NULL) {
@@ -300,7 +300,7 @@ push0_sock_send(void *arg, nni_aio *aio)
}
// Can we maybe queue it.
- if (nni_lmq_putq(&s->wq, m) == 0) {
+ if (nni_lmq_put(&s->wq, m) == 0) {
// Yay, we can. So we're done.
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, l);
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c
index cfc3ed6d..3911127a 100644
--- a/src/sp/protocol/pubsub0/pub.c
+++ b/src/sp/protocol/pubsub0/pub.c
@@ -36,22 +36,22 @@ static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
- nni_list pipes;
- nni_mtx mtx;
- bool closed;
- size_t sendbuf;
- nni_pollable sendable;
+ nni_list pipes;
+ nni_mtx mtx;
+ bool closed;
+ size_t sendbuf;
+ nni_pollable sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
struct pub0_pipe {
- nni_pipe * pipe;
- pub0_sock * pub;
+ nni_pipe *pipe;
+ pub0_sock *pub;
nni_lmq sendq;
bool closed;
bool busy;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node node;
};
@@ -65,10 +65,10 @@ pub0_sock_fini(void *arg)
}
static int
-pub0_sock_init(void *arg, nni_sock *nsock)
+pub0_sock_init(void *arg, nni_sock *ns)
{
pub0_sock *sock = arg;
- NNI_ARG_UNUSED(nsock);
+ NNI_ARG_UNUSED(ns);
nni_pollable_init(&sock->sendable);
nni_mtx_init(&sock->mtx);
@@ -94,8 +94,8 @@ pub0_pipe_stop(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -103,8 +103,8 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
nni_lmq_fini(&p->sendq);
}
@@ -113,21 +113,15 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
pub0_pipe *p = arg;
pub0_sock *sock = s;
- int rv;
size_t len;
nni_mtx_lock(&sock->mtx);
len = sock->sendbuf;
nni_mtx_unlock(&sock->mtx);
- // XXX: consider making this depth tunable
- if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
-
- pub0_pipe_fini(p);
- return (rv);
- }
+ nni_lmq_init(&p->sendq, len);
+ nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p);
p->busy = false;
p->pipe = pipe;
@@ -149,7 +143,7 @@ pub0_pipe_start(void *arg)
nni_mtx_unlock(&sock->mtx);
// Start the receiver.
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -160,8 +154,8 @@ pub0_pipe_close(void *arg)
pub0_pipe *p = arg;
pub0_sock *sock = p->pub;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&sock->mtx);
p->closed = true;
@@ -180,8 +174,8 @@ pub0_pipe_recv_cb(void *arg)
// We should never receive a message -- the only valid reason for us to
// be here is on pipe close.
- if (nni_aio_result(p->aio_recv) == 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ if (nni_aio_result(&p->aio_recv) == 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_recv));
}
nni_pipe_close(p->pipe);
}
@@ -191,11 +185,11 @@ pub0_pipe_send_cb(void *arg)
{
pub0_pipe *p = arg;
pub0_sock *sock = p->pub;
- nni_msg * msg;
+ nni_msg *msg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -205,9 +199,9 @@ pub0_pipe_send_cb(void *arg)
nni_mtx_unlock(&sock->mtx);
return;
}
- if (nni_lmq_getq(&p->sendq, &msg) == 0) {
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ if (nni_lmq_get(&p->sendq, &msg) == 0) {
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
} else {
p->busy = false;
}
@@ -228,7 +222,7 @@ pub0_sock_send(void *arg, nni_aio *aio)
{
pub0_sock *sock = arg;
pub0_pipe *p;
- nng_msg * msg;
+ nng_msg *msg;
size_t len;
msg = nni_aio_get_msg(aio);
@@ -241,14 +235,14 @@ pub0_sock_send(void *arg, nni_aio *aio)
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
nni_msg *old;
- (void) nni_lmq_getq(&p->sendq, &old);
+ (void) nni_lmq_get(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, msg);
+ nni_lmq_put(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
@@ -289,7 +283,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
nni_mtx_lock(&sock->mtx);
sock->sendbuf = (size_t) val;
NNI_LIST_FOREACH (&sock->pipes, p) {
- // If we fail part way thru (should only be ENOMEM), we
+ // If we fail part way through (should only be ENOMEM), we
// stop short. The others would likely fail for ENOMEM as
// well anyway. There is a weird effect here where the
// buffers may have been set for *some* of the pipes, but
@@ -368,13 +362,13 @@ static nni_proto pub0_proto_raw = {
};
int
-nng_pub0_open(nng_socket *sidp)
+nng_pub0_open(nng_socket *id)
{
- return (nni_proto_open(sidp, &pub0_proto));
+ return (nni_proto_open(id, &pub0_proto));
}
int
-nng_pub0_open_raw(nng_socket *sidp)
+nng_pub0_open_raw(nng_socket *id)
{
- return (nni_proto_open(sidp, &pub0_proto_raw));
+ return (nni_proto_open(id, &pub0_proto_raw));
}
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index a40ee073..35b32af4 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Nathan Kent <nate@nkent.net>
//
@@ -115,7 +115,7 @@ again:
return;
}
- (void) nni_lmq_getq(&ctx->lmq, &msg);
+ (void) nni_lmq_get(&ctx->lmq, &msg);
if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
nni_pollable_clear(&sock->readable);
@@ -182,15 +182,12 @@ sub0_ctx_init(void *ctx_arg, void *sock_arg)
sub0_ctx * ctx = ctx_arg;
size_t len;
bool prefer_new;
- int rv;
nni_mtx_lock(&sock->lk);
len = sock->recv_buf_len;
prefer_new = sock->prefer_new;
- if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
- return (rv);
- }
+ nni_lmq_init(&ctx->lmq, len);
ctx->prefer_new = prefer_new;
nni_aio_list_init(&ctx->recv_queue);
@@ -385,14 +382,14 @@ sub0_recv_cb(void *arg)
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
- (void) nni_lmq_getq(&ctx->lmq, &old);
+ (void) nni_lmq_get(&ctx->lmq, &old);
nni_msg_free(old);
- (void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ (void) nni_lmq_put(&ctx->lmq, dup_msg);
queued = true;
} else {
- (void) nni_lmq_putq(&ctx->lmq, dup_msg);
+ (void) nni_lmq_put(&ctx->lmq, dup_msg);
queued = true;
}
if (queued && ctx == &sock->master) {
@@ -534,9 +531,9 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
for (size_t i = 0; i < len; i++) {
nni_msg *msg;
- (void) nni_lmq_getq(&ctx->lmq, &msg);
+ (void) nni_lmq_get(&ctx->lmq, &msg);
if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) {
- (void) nni_lmq_putq(&ctx->lmq, msg);
+ (void) nni_lmq_put(&ctx->lmq, msg);
} else {
nni_msg_free(msg);
}
diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c
index ce1ed601..3287138d 100644
--- a/src/sp/protocol/survey0/survey.c
+++ b/src/sp/protocol/survey0/survey.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -108,7 +108,6 @@ surv0_ctx_init(void *c, void *s)
{
surv0_ctx * ctx = c;
surv0_sock * sock = s;
- int rv;
int len;
nng_duration tmo;
@@ -129,10 +128,7 @@ surv0_ctx_init(void *c, void *s)
ctx->sock = sock;
- if ((rv = nni_lmq_init(&ctx->recv_lmq, len)) != 0) {
- surv0_ctx_fini(ctx);
- return (rv);
- }
+ nni_lmq_init(&ctx->recv_lmq, len);
nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
return (0);
}
@@ -172,7 +168,7 @@ surv0_ctx_recv(void *arg, nni_aio *aio)
return;
}
again:
- if (nni_lmq_getq(&ctx->recv_lmq, &msg) != 0) {
+ if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) {
int rv;
if ((rv = nni_aio_schedule(aio, &surv0_ctx_cancel, ctx)) !=
0) {
@@ -259,7 +255,7 @@ surv0_ctx_send(void *arg, nni_aio *aio)
nni_pipe_send(pipe->pipe, &pipe->aio_send);
} else if (!nni_lmq_full(&pipe->send_queue)) {
nni_msg_clone(msg);
- nni_lmq_putq(&pipe->send_queue, msg);
+ nni_lmq_put(&pipe->send_queue, msg);
}
}
@@ -359,7 +355,6 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
surv0_pipe *p = arg;
surv0_sock *sock = s;
- int rv;
int len;
len = nni_atomic_get(&sock->send_buf);
@@ -369,10 +364,7 @@ surv0_pipe_init(void *arg, nni_pipe *pipe, void *s)
// This depth could be tunable. The deeper the queue, the more
// concurrent surveys that can be delivered (multiple contexts).
// Note that surveys can be *outstanding*, but not yet put on the wire.
- if ((rv = nni_lmq_init(&p->send_queue, len)) != 0) {
- surv0_pipe_fini(p);
- return (rv);
- }
+ nni_lmq_init(&p->send_queue, len);
p->pipe = pipe;
p->sock = sock;
@@ -434,7 +426,7 @@ surv0_pipe_send_cb(void *arg)
nni_mtx_unlock(&sock->mtx);
return;
}
- if (nni_lmq_getq(&p->send_queue, &msg) == 0) {
+ if (nni_lmq_get(&p->send_queue, &msg) == 0) {
nni_aio_set_msg(&p->aio_send, msg);
nni_pipe_send(p->pipe, &p->aio_send);
} else {
@@ -482,7 +474,7 @@ surv0_pipe_recv_cb(void *arg)
nni_list_remove(&ctx->recv_queue, aio);
nni_aio_finish_msg(aio, msg);
} else {
- nni_lmq_putq(&ctx->recv_lmq, msg);
+ nni_lmq_put(&ctx->recv_lmq, msg);
if (ctx == &sock->ctx) {
nni_pollable_raise(&sock->readable);
}