aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/bus0/bus.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-27 23:13:42 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-27 23:13:42 -0800
commit92bf6fd4cccc548e14ae826c1d36851f98378da0 (patch)
treeca3d35f130e7ec6e51ede51142cc34c5f73ff6ca /src/sp/protocol/bus0/bus.c
parent2fbfd7e5c3ad245de2c905720eb8d9d5b27b6739 (diff)
downloadnng-92bf6fd4cccc548e14ae826c1d36851f98378da0.tar.gz
nng-92bf6fd4cccc548e14ae826c1d36851f98378da0.tar.bz2
nng-92bf6fd4cccc548e14ae826c1d36851f98378da0.zip
Bus socket converted to use lmq.
This should give significant performance boosts to anyone using this protocol. Buffering on both the send and receive side is supported, with a default buffer size of 16 messages. This should help provide a reasonable default case for most users. While here updated the test for bus to much more complete NUTS style test framework, with increased coverage.
Diffstat (limited to 'src/sp/protocol/bus0/bus.c')
-rw-r--r--src/sp/protocol/bus0/bus.c419
1 files changed, 257 insertions, 162 deletions
diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c
index ab857d72..faa94c13 100644
--- a/src/sp/protocol/bus0/bus.c
+++ b/src/sp/protocol/bus0/bus.c
@@ -13,6 +13,7 @@
#include "core/nng_impl.h"
#include "nng/protocol/bus0/bus.h"
+#include <stdio.h>
// Bus protocol. The BUS protocol, each peer sends a message to its peers.
// However, bus protocols do not "forward" (absent a device). So in order
@@ -26,41 +27,35 @@
typedef struct bus0_pipe bus0_pipe;
typedef struct bus0_sock bus0_sock;
-static void bus0_sock_getq(bus0_sock *);
static void bus0_sock_send(void *, nni_aio *);
static void bus0_sock_recv(void *, nni_aio *);
-static void bus0_pipe_getq(bus0_pipe *);
static void bus0_pipe_recv(bus0_pipe *);
-static void bus0_sock_getq_cb(void *);
-static void bus0_sock_getq_cb_raw(void *);
-static void bus0_pipe_getq_cb(void *);
static void bus0_pipe_send_cb(void *);
static void bus0_pipe_recv_cb(void *);
-static void bus0_pipe_putq_cb(void *);
// bus0_sock is our per-socket protocol private structure.
struct bus0_sock {
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
- nni_msgq *uwq;
- nni_msgq *urq;
- bool raw;
+ nni_list pipes;
+ nni_mtx mtx;
+ nni_pollable can_send;
+ nni_pollable can_recv;
+ nni_lmq recv_msgs;
+ nni_list recv_wait;
+ int send_buf;
+ bool raw;
};
// bus0_pipe is our per-pipe protocol private structure.
struct bus0_pipe {
- nni_pipe *npipe;
- bus0_sock *psock;
- nni_msgq *sendq;
+ nni_pipe *pipe;
+ bus0_sock *bus;
+ nni_lmq send_queue;
nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
- nni_mtx mtx;
+ bool busy;
+ nni_aio aio_recv;
+ nni_aio aio_send;
};
static void
@@ -68,50 +63,57 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
nni_mtx_fini(&s->mtx);
+ nni_pollable_fini(&s->can_send);
+ nni_pollable_fini(&s->can_recv);
+ nni_lmq_fini(&s->recv_msgs);
}
static void
-bus0_sock_init(void *arg, nni_sock *nsock)
+bus0_sock_init(void *arg, nni_sock *ns)
{
bus0_sock *s = arg;
+ NNI_ARG_UNUSED(ns);
+
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s);
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ nni_aio_list_init(&s->recv_wait);
+ nni_pollable_init(&s->can_send);
+ nni_pollable_init(&s->can_recv);
+ nni_lmq_init(&s->recv_msgs, 16);
+ s->send_buf = 16;
+
s->raw = false;
}
static void
-bus0_sock_init_raw(void *arg, nni_sock *nsock)
+bus0_sock_init_raw(void *arg, nni_sock *ns)
{
bus0_sock *s = arg;
- NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
- nni_mtx_init(&s->mtx);
- nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s);
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ bus0_sock_init(arg, ns);
s->raw = true;
}
static void
bus0_sock_open(void *arg)
{
- bus0_sock *s = arg;
-
- bus0_sock_getq(s);
+ NNI_ARG_UNUSED(arg);
}
static void
bus0_sock_close(void *arg)
{
bus0_sock *s = arg;
+ nni_aio *aio;
- nni_aio_close(&s->aio_getq);
+ nni_mtx_lock(&s->mtx);
+ while ((aio = nni_list_first(&s->recv_wait)) != NULL) {
+ nni_list_remove(&s->recv_wait, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
@@ -119,10 +121,8 @@ bus0_pipe_stop(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_stop(&p->aio_getq);
nni_aio_stop(&p->aio_send);
nni_aio_stop(&p->aio_recv);
- nni_aio_stop(&p->aio_putq);
}
static void
@@ -130,33 +130,23 @@ bus0_pipe_fini(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_fini(&p->aio_getq);
nni_aio_fini(&p->aio_send);
nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_msgq_fini(p->sendq);
- nni_mtx_fini(&p->mtx);
+ nni_lmq_fini(&p->send_queue);
}
static int
-bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
+bus0_pipe_init(void *arg, nni_pipe *np, void *s)
{
bus0_pipe *p = arg;
- int rv;
+ p->pipe = np;
+ p->bus = s;
NNI_LIST_NODE_INIT(&p->node);
- nni_mtx_init(&p->mtx);
- nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p);
nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p);
nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p);
- nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p);
- if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) {
- bus0_pipe_fini(p);
- return (rv);
- }
+ nni_lmq_init(&p->send_queue, p->bus->send_buf);
- p->npipe = npipe;
- p->psock = s;
return (0);
}
@@ -164,10 +154,9 @@ static int
bus0_pipe_start(void *arg)
{
bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_sock *s = p->bus;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) {
- // Peer protocol mismatch.
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_BUS_V0) {
return (NNG_EPROTO);
}
@@ -176,7 +165,6 @@ bus0_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
bus0_pipe_recv(p);
- bus0_pipe_getq(p);
return (0);
}
@@ -185,15 +173,13 @@ static void
bus0_pipe_close(void *arg)
{
bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_sock *s = p->bus;
- nni_aio_close(&p->aio_getq);
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
- nni_aio_close(&p->aio_putq);
- nni_msgq_close(p->sendq);
nni_mtx_lock(&s->mtx);
+ nni_lmq_flush(&p->send_queue);
if (nni_list_active(&s->pipes, p)) {
nni_list_remove(&s->pipes, p);
}
@@ -201,187 +187,278 @@ bus0_pipe_close(void *arg)
}
static void
-bus0_pipe_getq_cb(void *arg)
-{
- bus0_pipe *p = arg;
-
- if (nni_aio_result(&p->aio_getq) != 0) {
- // closed?
- nni_pipe_close(p->npipe);
- return;
- }
- nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
- nni_aio_set_msg(&p->aio_getq, NULL);
-
- nni_pipe_send(p->npipe, &p->aio_send);
-}
-
-static void
bus0_pipe_send_cb(void *arg)
{
bus0_pipe *p = arg;
+ bus0_sock *s = p->bus;
+ nni_msg *msg;
if (nni_aio_result(&p->aio_send) != 0) {
// closed?
nni_msg_free(nni_aio_get_msg(&p->aio_send));
nni_aio_set_msg(&p->aio_send, NULL);
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
- bus0_pipe_getq(p);
+ nni_mtx_lock(&s->mtx);
+ if (nni_lmq_get(&p->send_queue, &msg) == 0) {
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ } else {
+ p->busy = false;
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
bus0_pipe_recv_cb(void *arg)
{
- bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->bus;
+ nni_aio *aio = NULL;
nni_msg *msg;
if (nni_aio_result(&p->aio_recv) != 0) {
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
+
msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe));
+ nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
}
- nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_aio_set_msg(&p->aio_putq, msg);
- nni_aio_set_msg(&p->aio_recv, NULL);
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ if (!nni_list_empty(&s->recv_wait)) {
+ aio = nni_list_first(&s->recv_wait);
+ nni_aio_list_remove(aio);
+ nni_aio_set_msg(aio, msg);
+ } else if (nni_lmq_put(&s->recv_msgs, msg) == 0) {
+ nni_pollable_raise(&s->can_recv);
+ } else {
+ // dropped message due to no room
+ nni_msg_free(msg);
+ }
+ nni_mtx_unlock(&s->mtx);
+
+ if (aio != NULL) {
+ nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
+ }
+ bus0_pipe_recv(p);
}
static void
-bus0_pipe_putq_cb(void *arg)
+bus0_pipe_recv(bus0_pipe *p)
{
- bus0_pipe *p = arg;
-
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(&p->aio_putq));
- nni_aio_set_msg(&p->aio_putq, NULL);
- nni_pipe_close(p->npipe);
- return;
- }
-
- // Wait for another recv.
- bus0_pipe_recv(p);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static void
-bus0_sock_getq_cb(void *arg)
+bus0_sock_send(void *arg, nni_aio *aio)
{
bus0_sock *s = arg;
- bus0_pipe *p;
- bus0_pipe *lastp;
nni_msg *msg;
- nni_msg *dup;
+ bus0_pipe *pipe;
+ uint32_t sender = 0;
+ size_t len;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
- msg = nni_aio_get_msg(&s->aio_getq);
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
- // We ignore any headers present for cooked mode.
- nni_msg_header_clear(msg);
+ if (s->raw) {
+ // In raw mode, we look for the message header, to see if it
+ // is being resent from another pipe (e.g. via a device).
+ // We don't want to send it back to the originator.
+ if (nni_msg_header_len(msg) >= sizeof(uint32_t)) {
+ sender = nni_msg_header_trim_u32(msg);
+ }
+ } else {
+ // In cooked mode just strip the header.
+ nni_msg_header_clear(msg);
+ }
nni_mtx_lock(&s->mtx);
- lastp = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != lastp) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- msg = NULL;
+ NNI_LIST_FOREACH (&s->pipes, pipe) {
+
+ if (s->raw && nni_pipe_id(pipe->pipe) == sender) {
+ continue;
}
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+
+ // if the pipe isn't busy, then send this message direct.
+ if (!pipe->busy) {
+ pipe->busy = true;
+ nni_msg_clone(msg);
+ nni_aio_set_msg(&pipe->aio_send, msg);
+ nni_pipe_send(pipe->pipe, &pipe->aio_send);
+ } else if (!nni_lmq_full(&pipe->send_queue)) {
+ nni_msg_clone(msg);
+ nni_lmq_put(&pipe->send_queue, msg);
}
}
nni_mtx_unlock(&s->mtx);
+
nni_msg_free(msg);
+ nni_aio_finish(aio, 0, len);
+}
- bus0_sock_getq(s);
+static void
+bus0_recv_cancel(nng_aio *aio, void *arg, int rv)
+{
+ bus0_sock *s = arg;
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
-bus0_sock_getq_cb_raw(void *arg)
+bus0_sock_recv(void *arg, nni_aio *aio)
{
bus0_sock *s = arg;
- bus0_pipe *p;
nni_msg *msg;
- uint32_t sender;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
- msg = nni_aio_get_msg(&s->aio_getq);
-
- // The header being present indicates that the message
- // was received locally and we are rebroadcasting. (Device
- // is doing this probably.) In this case grab the pipe
- // ID from the header, so we can exclude it.
- if (nni_msg_header_len(msg) >= 4) {
- sender = nni_msg_header_trim_u32(msg);
- } else {
- sender = 0;
- }
-
nni_mtx_lock(&s->mtx);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (nni_pipe_id(p->npipe) == sender) {
- continue;
- }
- nni_msg_clone(msg);
- if (nni_msgq_tryput(p->sendq, msg) != 0) {
- nni_msg_free(msg);
+again:
+ if (nni_lmq_empty(&s->recv_msgs)) {
+ int rv;
+ if ((rv = nni_aio_schedule(aio, bus0_recv_cancel, s)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ nni_list_append(&s->recv_wait, aio);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ (void) nni_lmq_get(&s->recv_msgs, &msg);
+
+ if (nni_lmq_empty(&s->recv_msgs)) {
+ nni_pollable_clear(&s->can_recv);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
+ nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
- bus0_sock_getq(s);
+static int
+bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ bus0_sock *sock = arg;
+ int fd;
+ int rv;
+ nni_mtx_lock(&sock->mtx);
+ // BUS sockets are *always* writable (best effort)
+ nni_pollable_raise(&sock->can_send);
+ rv = nni_pollable_getfd(&sock->can_send, &fd);
+ nni_mtx_unlock(&sock->mtx);
+
+ if (rv == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
}
-static void
-bus0_sock_getq(bus0_sock *s)
+static int
+bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ bus0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
}
-static void
-bus0_pipe_getq(bus0_pipe *p)
+static int
+bus0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ bus0_sock *s = arg;
+ int val;
+ nni_mtx_lock(&s->mtx);
+ val = (int) nni_lmq_cap(&s->recv_msgs);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
}
-static void
-bus0_pipe_recv(bus0_pipe *p)
+static int
+bus0_sock_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ bus0_sock *s = arg;
+ int val;
+ nni_mtx_lock(&s->mtx);
+ val = s->send_buf;
+ nni_mtx_unlock(&s->mtx);
+ return (nni_copyout_int(val, buf, szp, t));
}
-static void
-bus0_sock_send(void *arg, nni_aio *aio)
+static int
+bus0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
bus0_sock *s = arg;
+ int val;
+ int rv;
- nni_msgq_aio_put(s->uwq, aio);
+ if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ if ((rv = nni_lmq_resize(&s->recv_msgs, (size_t) val)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+ }
+
+ nni_mtx_unlock(&s->mtx);
+ return (0);
}
-static void
-bus0_sock_recv(void *arg, nni_aio *aio)
+static int
+bus0_sock_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
bus0_sock *s = arg;
+ bus0_pipe *p;
+ int val;
+ int rv;
- nni_msgq_aio_get(s->urq, aio);
+ if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&s->mtx);
+ s->send_buf = val;
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ // If we fail part way through (should only be ENOMEM), we
+ // stop short. The others would likely fail for ENOMEM as
+ // well anyway. There is a weird effect here where the
+ // buffers may have been set for *some* of the pipes, but
+ // we have no way to correct partial failure.
+ if ((rv = nni_lmq_resize(&p->send_queue, (size_t) val)) != 0) {
+ break;
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
}
static nni_proto_pipe_ops bus0_pipe_ops = {
@@ -394,6 +471,24 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
};
static nni_option bus0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = bus0_sock_get_send_fd,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = bus0_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = bus0_sock_get_recv_buf_len,
+ .o_set = bus0_sock_set_recv_buf_len,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = bus0_sock_get_send_buf_len,
+ .o_set = bus0_sock_set_send_buf_len,
+ },
// terminate list
{
.o_name = NULL,
@@ -441,13 +536,13 @@ static nni_proto bus0_proto_raw = {
};
int
-nng_bus0_open(nng_socket *sidp)
+nng_bus0_open(nng_socket *id)
{
- return (nni_proto_open(sidp, &bus0_proto));
+ return (nni_proto_open(id, &bus0_proto));
}
int
-nng_bus0_open_raw(nng_socket *sidp)
+nng_bus0_open_raw(nng_socket *id)
{
- return (nni_proto_open(sidp, &bus0_proto_raw));
+ return (nni_proto_open(id, &bus0_proto_raw));
}