aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sp/protocol/bus0/CMakeLists.txt4
-rw-r--r--src/sp/protocol/bus0/bug1247_test.c35
-rw-r--r--src/sp/protocol/bus0/bus.c419
-rw-r--r--src/sp/protocol/bus0/bus_test.c423
4 files changed, 682 insertions, 199 deletions
diff --git a/src/sp/protocol/bus0/CMakeLists.txt b/src/sp/protocol/bus0/CMakeLists.txt
index 01c0b05b..ca7ee9bc 100644
--- a/src/sp/protocol/bus0/CMakeLists.txt
+++ b/src/sp/protocol/bus0/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -15,4 +15,4 @@ nng_sources_if(NNG_PROTO_BUS0 bus.c)
nng_headers_if(NNG_PROTO_BUS0 nng/protocol/bus0/bus.h)
nng_defines_if(NNG_PROTO_BUS0 NNG_HAVE_BUS0)
-nng_test(bug1247_test) \ No newline at end of file
+nng_test(bus_test) \ No newline at end of file
diff --git a/src/sp/protocol/bus0/bug1247_test.c b/src/sp/protocol/bus0/bug1247_test.c
deleted file mode 100644
index bbc6958b..00000000
--- a/src/sp/protocol/bus0/bug1247_test.c
+++ /dev/null
@@ -1,35 +0,0 @@
-//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <nuts.h>
-
-#include <nng/protocol/bus0/bus.h>
-
-void
-test_bug1247(void)
-{
- nng_socket bus1, bus2;
- char * addr;
-
- NUTS_ADDR(addr, "tcp");
-
- NUTS_PASS(nng_bus0_open(&bus1));
- NUTS_PASS(nng_bus0_open(&bus2));
-
- NUTS_PASS(nng_listen(bus1, addr, NULL, 0));
- NUTS_FAIL(nng_listen(bus2, addr, NULL, 0), NNG_EADDRINUSE);
-
- NUTS_PASS(nng_close(bus2));
- NUTS_PASS(nng_close(bus1));
-}
-
-TEST_LIST = {
- { "bug1247", test_bug1247 },
- { NULL, NULL },
-};
diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c
index ab857d72..faa94c13 100644
--- a/src/sp/protocol/bus0/bus.c
+++ b/src/sp/protocol/bus0/bus.c
@@ -13,6 +13,7 @@
#include "core/nng_impl.h"
#include "nng/protocol/bus0/bus.h"
+#include <stdio.h>
// Bus protocol. The BUS protocol, each peer sends a message to its peers.
// However, bus protocols do not "forward" (absent a device). So in order
@@ -26,41 +27,35 @@
typedef struct bus0_pipe bus0_pipe;
typedef struct bus0_sock bus0_sock;
-static void bus0_sock_getq(bus0_sock *);
static void bus0_sock_send(void *, nni_aio *);
static void bus0_sock_recv(void *, nni_aio *);
-static void bus0_pipe_getq(bus0_pipe *);
static void bus0_pipe_recv(bus0_pipe *);
-static void bus0_sock_getq_cb(void *);
-static void bus0_sock_getq_cb_raw(void *);
-static void bus0_pipe_getq_cb(void *);
static void bus0_pipe_send_cb(void *);
static void bus0_pipe_recv_cb(void *);
-static void bus0_pipe_putq_cb(void *);
// bus0_sock is our per-socket protocol private structure.
struct bus0_sock {
- nni_aio aio_getq;
- nni_list pipes;
- nni_mtx mtx;
- nni_msgq *uwq;
- nni_msgq *urq;
- bool raw;
+ nni_list pipes;
+ nni_mtx mtx;
+ nni_pollable can_send;
+ nni_pollable can_recv;
+ nni_lmq recv_msgs;
+ nni_list recv_wait;
+ int send_buf;
+ bool raw;
};
// bus0_pipe is our per-pipe protocol private structure.
struct bus0_pipe {
- nni_pipe *npipe;
- bus0_sock *psock;
- nni_msgq *sendq;
+ nni_pipe *pipe;
+ bus0_sock *bus;
+ nni_lmq send_queue;
nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
- nni_mtx mtx;
+ bool busy;
+ nni_aio aio_recv;
+ nni_aio aio_send;
};
static void
@@ -68,50 +63,57 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
nni_mtx_fini(&s->mtx);
+ nni_pollable_fini(&s->can_send);
+ nni_pollable_fini(&s->can_recv);
+ nni_lmq_fini(&s->recv_msgs);
}
static void
-bus0_sock_init(void *arg, nni_sock *nsock)
+bus0_sock_init(void *arg, nni_sock *ns)
{
bus0_sock *s = arg;
+ NNI_ARG_UNUSED(ns);
+
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s);
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ nni_aio_list_init(&s->recv_wait);
+ nni_pollable_init(&s->can_send);
+ nni_pollable_init(&s->can_recv);
+ nni_lmq_init(&s->recv_msgs, 16);
+ s->send_buf = 16;
+
s->raw = false;
}
static void
-bus0_sock_init_raw(void *arg, nni_sock *nsock)
+bus0_sock_init_raw(void *arg, nni_sock *ns)
{
bus0_sock *s = arg;
- NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
- nni_mtx_init(&s->mtx);
- nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s);
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ bus0_sock_init(arg, ns);
s->raw = true;
}
static void
bus0_sock_open(void *arg)
{
- bus0_sock *s = arg;
-
- bus0_sock_getq(s);
+ NNI_ARG_UNUSED(arg);
}
static void
bus0_sock_close(void *arg)
{
bus0_sock *s = arg;
+ nni_aio *aio;
- nni_aio_close(&s->aio_getq);
+ nni_mtx_lock(&s->mtx);
+ while ((aio = nni_list_first(&s->recv_wait)) != NULL) {
+ nni_list_remove(&s->recv_wait, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
@@ -119,10 +121,8 @@ bus0_pipe_stop(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_stop(&p->aio_getq);
nni_aio_stop(&p->aio_send);
nni_aio_stop(&p->aio_recv);
- nni_aio_stop(&p->aio_putq);
}
static void
@@ -130,33 +130,23 @@ bus0_pipe_fini(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_fini(&p->aio_getq);
nni_aio_fini(&p->aio_send);
nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_msgq_fini(p->sendq);
- nni_mtx_fini(&p->mtx);
+ nni_lmq_fini(&p->send_queue);
}
static int
-bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
+bus0_pipe_init(void *arg, nni_pipe *np, void *s)
{
bus0_pipe *p = arg;
- int rv;
+ p->pipe = np;
+ p->bus = s;
NNI_LIST_NODE_INIT(&p->node);
- nni_mtx_init(&p->mtx);
- nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p);
nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p);
nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p);
- nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p);
- if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) {
- bus0_pipe_fini(p);
- return (rv);
- }
+ nni_lmq_init(&p->send_queue, p->bus->send_buf);
- p->npipe = npipe;
- p->psock = s;
return (0);
}
@@ -164,10 +154,9 @@ static int
bus0_pipe_start(void *arg)
{
bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_sock *s = p->bus;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) {
- // Peer protocol mismatch.
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_BUS_V0) {
return (NNG_EPROTO);
}
@@ -176,7 +165,6 @@ bus0_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
bus0_pipe_recv(p);
- bus0_pipe_getq(p);
return (0);
}
@@ -185,15 +173,13 @@ static void
bus0_pipe_close(void *arg)
{
bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_sock *s = p->bus;
- nni_aio_close(&p->aio_getq);
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
- nni_aio_close(&p->aio_putq);
- nni_msgq_close(p->sendq);
nni_mtx_lock(&s->mtx);
+ nni_lmq_flush(&p->send_queue);
if (nni_list_active(&s->pipes, p)) {
nni_list_remove(&s->pipes, p);
}
@@ -201,187 +187,278 @@ bus0_pipe_close(void *arg)
}
static void
-bus0_pipe_getq_cb(void *arg)
-{
- bus0_pipe *p = arg;
-
- if (nni_aio_result(&p->aio_getq) != 0) {
- // closed?
- nni_pipe_close(p->npipe);
- return;
- }
- nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
- nni_aio_set_msg(&p->aio_getq, NULL);
-
- nni_pipe_send(p->npipe, &p->aio_send);
-}
-
-static void
bus0_pipe_send_cb(void *arg)
{
bus0_pipe *p = arg;
+ bus0_sock *s = p->bus;
+ nni_msg *msg;
if (nni_aio_result(&p->aio_send) != 0) {
// closed?
nni_msg_free(nni_aio_get_msg(&p->aio_send));
nni_aio_set_msg(&p->aio_send, NULL);
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
- bus0_pipe_getq(p);
+ nni_mtx_lock(&s->mtx);
+ if (nni_lmq_get(&p->send_queue, &msg) == 0) {
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ } else {
+ p->busy = false;
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
bus0_pipe_recv_cb(void *arg)
{
- bus0_pipe *p = arg;
- bus0_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->bus;
+ nni_aio *aio = NULL;
nni_msg *msg;
if (nni_aio_result(&p->aio_recv) != 0) {
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
+
msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe));
+ nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
}
- nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_aio_set_msg(&p->aio_putq, msg);
- nni_aio_set_msg(&p->aio_recv, NULL);
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ if (!nni_list_empty(&s->recv_wait)) {
+ aio = nni_list_first(&s->recv_wait);
+ nni_aio_list_remove(aio);
+ nni_aio_set_msg(aio, msg);
+ } else if (nni_lmq_put(&s->recv_msgs, msg) == 0) {
+ nni_pollable_raise(&s->can_recv);
+ } else {
+ // dropped message due to no room
+ nni_msg_free(msg);
+ }
+ nni_mtx_unlock(&s->mtx);
+
+ if (aio != NULL) {
+ nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
+ }
+ bus0_pipe_recv(p);
}
static void
-bus0_pipe_putq_cb(void *arg)
+bus0_pipe_recv(bus0_pipe *p)
{
- bus0_pipe *p = arg;
-
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(&p->aio_putq));
- nni_aio_set_msg(&p->aio_putq, NULL);
- nni_pipe_close(p->npipe);
- return;
- }
-
- // Wait for another recv.
- bus0_pipe_recv(p);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static void
-bus0_sock_getq_cb(void *arg)
+bus0_sock_send(void *arg, nni_aio *aio)
{
bus0_sock *s = arg;
- bus0_pipe *p;
- bus0_pipe *lastp;
nni_msg *msg;
- nni_msg *dup;
+ bus0_pipe *pipe;
+ uint32_t sender = 0;
+ size_t len;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
- msg = nni_aio_get_msg(&s->aio_getq);
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
- // We ignore any headers present for cooked mode.
- nni_msg_header_clear(msg);
+ if (s->raw) {
+ // In raw mode, we look for the message header, to see if it
+ // is being resent from another pipe (e.g. via a device).
+ // We don't want to send it back to the originator.
+ if (nni_msg_header_len(msg) >= sizeof(uint32_t)) {
+ sender = nni_msg_header_trim_u32(msg);
+ }
+ } else {
+ // In cooked mode just strip the header.
+ nni_msg_header_clear(msg);
+ }
nni_mtx_lock(&s->mtx);
- lastp = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != lastp) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- msg = NULL;
+ NNI_LIST_FOREACH (&s->pipes, pipe) {
+
+ if (s->raw && nni_pipe_id(pipe->pipe) == sender) {
+ continue;
}
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+
+ // if the pipe isn't busy, then send this message direct.
+ if (!pipe->busy) {
+ pipe->busy = true;
+ nni_msg_clone(msg);
+ nni_aio_set_msg(&pipe->aio_send, msg);
+ nni_pipe_send(pipe->pipe, &pipe->aio_send);
+ } else if (!nni_lmq_full(&pipe->send_queue)) {
+ nni_msg_clone(msg);
+ nni_lmq_put(&pipe->send_queue, msg);
}
}
nni_mtx_unlock(&s->mtx);
+
nni_msg_free(msg);
+ nni_aio_finish(aio, 0, len);
+}
- bus0_sock_getq(s);
+static void
+bus0_recv_cancel(nng_aio *aio, void *arg, int rv)
+{
+ bus0_sock *s = arg;
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static void
-bus0_sock_getq_cb_raw(void *arg)
+bus0_sock_recv(void *arg, nni_aio *aio)
{
bus0_sock *s = arg;
- bus0_pipe *p;
nni_msg *msg;
- uint32_t sender;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
- msg = nni_aio_get_msg(&s->aio_getq);
-
- // The header being present indicates that the message
- // was received locally and we are rebroadcasting. (Device
- // is doing this probably.) In this case grab the pipe
- // ID from the header, so we can exclude it.
- if (nni_msg_header_len(msg) >= 4) {
- sender = nni_msg_header_trim_u32(msg);
- } else {
- sender = 0;
- }
-
nni_mtx_lock(&s->mtx);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (nni_pipe_id(p->npipe) == sender) {
- continue;
- }
- nni_msg_clone(msg);
- if (nni_msgq_tryput(p->sendq, msg) != 0) {
- nni_msg_free(msg);
+again:
+ if (nni_lmq_empty(&s->recv_msgs)) {
+ int rv;
+ if ((rv = nni_aio_schedule(aio, bus0_recv_cancel, s)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ nni_list_append(&s->recv_wait, aio);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ (void) nni_lmq_get(&s->recv_msgs, &msg);
+
+ if (nni_lmq_empty(&s->recv_msgs)) {
+ nni_pollable_clear(&s->can_recv);
}
+ if ((msg = nni_msg_unique(msg)) == NULL) {
+ goto again;
+ }
+ nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
- bus0_sock_getq(s);
+static int
+bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ bus0_sock *sock = arg;
+ int fd;
+ int rv;
+ nni_mtx_lock(&sock->mtx);
+ // BUS sockets are *always* writable (best effort)
+ nni_pollable_raise(&sock->can_send);
+ rv = nni_pollable_getfd(&sock->can_send, &fd);
+ nni_mtx_unlock(&sock->mtx);
+
+ if (rv == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
}
-static void
-bus0_sock_getq(bus0_sock *s)
+static int
+bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ bus0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
}
-static void
-bus0_pipe_getq(bus0_pipe *p)
+static int
+bus0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ bus0_sock *s = arg;
+ int val;
+ nni_mtx_lock(&s->mtx);
+ val = (int) nni_lmq_cap(&s->recv_msgs);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
}
-static void
-bus0_pipe_recv(bus0_pipe *p)
+static int
+bus0_sock_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ bus0_sock *s = arg;
+ int val;
+ nni_mtx_lock(&s->mtx);
+ val = s->send_buf;
+ nni_mtx_unlock(&s->mtx);
+ return (nni_copyout_int(val, buf, szp, t));
}
-static void
-bus0_sock_send(void *arg, nni_aio *aio)
+static int
+bus0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
bus0_sock *s = arg;
+ int val;
+ int rv;
- nni_msgq_aio_put(s->uwq, aio);
+ if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ if ((rv = nni_lmq_resize(&s->recv_msgs, (size_t) val)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+ }
+
+ nni_mtx_unlock(&s->mtx);
+ return (0);
}
-static void
-bus0_sock_recv(void *arg, nni_aio *aio)
+static int
+bus0_sock_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
bus0_sock *s = arg;
+ bus0_pipe *p;
+ int val;
+ int rv;
- nni_msgq_aio_get(s->urq, aio);
+ if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&s->mtx);
+ s->send_buf = val;
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ // If we fail part way through (should only be ENOMEM), we
+ // stop short. The others would likely fail for ENOMEM as
+ // well anyway. There is a weird effect here where the
+ // buffers may have been set for *some* of the pipes, but
+ // we have no way to correct partial failure.
+ if ((rv = nni_lmq_resize(&p->send_queue, (size_t) val)) != 0) {
+ break;
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
}
static nni_proto_pipe_ops bus0_pipe_ops = {
@@ -394,6 +471,24 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
};
static nni_option bus0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = bus0_sock_get_send_fd,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = bus0_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = bus0_sock_get_recv_buf_len,
+ .o_set = bus0_sock_set_recv_buf_len,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = bus0_sock_get_send_buf_len,
+ .o_set = bus0_sock_set_send_buf_len,
+ },
// terminate list
{
.o_name = NULL,
@@ -441,13 +536,13 @@ static nni_proto bus0_proto_raw = {
};
int
-nng_bus0_open(nng_socket *sidp)
+nng_bus0_open(nng_socket *id)
{
- return (nni_proto_open(sidp, &bus0_proto));
+ return (nni_proto_open(id, &bus0_proto));
}
int
-nng_bus0_open_raw(nng_socket *sidp)
+nng_bus0_open_raw(nng_socket *id)
{
- return (nni_proto_open(sidp, &bus0_proto_raw));
+ return (nni_proto_open(id, &bus0_proto_raw));
}
diff --git a/src/sp/protocol/bus0/bus_test.c b/src/sp/protocol/bus0/bus_test.c
new file mode 100644
index 00000000..77d2b4d6
--- /dev/null
+++ b/src/sp/protocol/bus0/bus_test.c
@@ -0,0 +1,423 @@
+//
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+#include <nng/protocol/bus0/bus.h>
+
+#define SECOND 1000
+
+void
+test_bus_identity(void)
+{
+ nng_socket s;
+ int p;
+ char *n;
+
+ NUTS_PASS(nng_bus0_open(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p));
+ NUTS_TRUE(p == NNG_BUS0_SELF);
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p));
+ NUTS_TRUE(p == NNG_BUS0_PEER); // 49
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n));
+ NUTS_MATCH(n, NNG_BUS0_SELF_NAME);
+ nng_strfree(n);
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n));
+ NUTS_MATCH(n, NNG_BUS0_PEER_NAME);
+ nng_strfree(n);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_bus_star(void)
+{
+ nng_socket s1, s2, s3;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_PASS(nng_bus0_open(&s3));
+
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(s3, NNG_OPT_RECVTIMEO, SECOND));
+
+ NUTS_MARRY(s1, s2);
+ NUTS_MARRY(s1, s3);
+
+ NUTS_SEND(s1, "one");
+ NUTS_RECV(s2, "one");
+ NUTS_RECV(s3, "one");
+
+ NUTS_SEND(s2, "two");
+ NUTS_SEND(s1, "one");
+ NUTS_RECV(s1, "two");
+ NUTS_RECV(s2, "one");
+ NUTS_RECV(s3, "one");
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s3);
+}
+
+static void
+test_bus_device(void)
+{
+ nng_socket s1, s2, s3;
+ nng_socket none = NNG_SOCKET_INITIALIZER;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_bus0_open_raw(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_PASS(nng_bus0_open(&s3));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(s3, NNG_OPT_RECVTIMEO, SECOND));
+
+ NUTS_MARRY(s1, s2);
+ NUTS_MARRY(s1, s3);
+
+ nng_device_aio(aio, s1, none);
+
+ NUTS_SEND(s2, "two");
+ NUTS_SEND(s3, "three");
+ NUTS_RECV(s2, "three");
+ NUTS_RECV(s3, "two");
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s3);
+
+ nng_aio_free(aio);
+}
+
+static void
+test_bus_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat *stats;
+ nng_stat *reject;
+ char *addr;
+
+ NUTS_ADDR(addr, "inproc");
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ nng_stats_free(stats);
+}
+
+static void
+test_bus_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_bus0_open(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_bus_recv_cancel(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_set_timeout(aio, SECOND);
+ nng_recv_aio(s1, aio);
+ nng_aio_abort(aio, NNG_ECANCELED);
+
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_CLOSE(s1);
+ nng_aio_free(aio);
+}
+
+static void
+test_bus_close_recv_abort(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_set_timeout(aio, SECOND);
+ nng_recv_aio(s1, aio);
+ NUTS_CLOSE(s1);
+
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED);
+ nng_aio_free(aio);
+}
+
+static void
+test_bus_aio_stopped(void)
+{
+ nng_socket s1;
+ nng_aio *aio;
+ nng_msg *msg;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ nng_aio_stop(aio);
+
+ nng_recv_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+
+ nng_aio_set_msg(aio, msg);
+ nng_send_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+
+ nng_aio_free(aio);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s1);
+}
+
+static void
+test_bus_send_no_pipes(void)
+{
+ nng_socket s1;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_SEND(s1, "DROP1");
+ NUTS_SEND(s1, "DROP2");
+ NUTS_CLOSE(s1);
+}
+
+static void
+test_bus_send_flood(void)
+{
+ nng_socket s1, s2;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1));
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s1, s2);
+
+ // Even if we send messages.
+ for (int i = 0; i < 1000; i++) {
+ NUTS_SEND(s2, "one thousand");
+ }
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_bus_poll_readable(void)
+{
+ int fd;
+ nng_socket s1, s2;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s2, s1);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(s2, "abc");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // and receiving makes it no longer ready
+ NUTS_RECV(s1, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s2);
+ NUTS_CLOSE(s1);
+}
+
+static void
+test_bus_poll_writeable(void)
+{
+ int fd;
+ nng_socket s1, s2;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_get_int(s2, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Pub is *always* writeable
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // Even if we send messages.
+ NUTS_SEND(s2, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_bus_recv_buf_option(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+ const char *opt = NNG_OPT_RECVBUF;
+
+ NUTS_PASS(nng_bus0_open(&s));
+
+ NUTS_PASS(nng_socket_set_int(s, opt, 1));
+ NUTS_FAIL(nng_socket_set_int(s, opt, 0), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(s, opt, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(s, opt, 1000000), NNG_EINVAL);
+ NUTS_PASS(nng_socket_set_int(s, opt, 3));
+ NUTS_PASS(nng_socket_get_int(s, opt, &v));
+ NUTS_TRUE(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ NUTS_PASS(nng_socket_get(s, opt, &v, &sz));
+ NUTS_TRUE(v == 3);
+ NUTS_TRUE(sz == sizeof(v));
+
+ NUTS_FAIL(nng_socket_set(s, opt, "", 1), NNG_EINVAL);
+ sz = 1;
+ NUTS_FAIL(nng_socket_get(s, opt, &v, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_bool(s, opt, true), NNG_EBADTYPE);
+ NUTS_FAIL(nng_socket_get_bool(s, opt, &b), NNG_EBADTYPE);
+
+ NUTS_CLOSE(s);
+}
+
+static void
+test_bus_send_buf_option(void)
+{
+ nng_socket s1;
+ nng_socket s2;
+ int v;
+ bool b;
+ size_t sz;
+ const char *opt = NNG_OPT_SENDBUF;
+
+ NUTS_PASS(nng_bus0_open(&s1));
+ NUTS_PASS(nng_bus0_open(&s2));
+ NUTS_MARRY(s1, s2);
+
+ NUTS_PASS(nng_socket_set_int(s1, opt, 1));
+ NUTS_FAIL(nng_socket_set_int(s1, opt, 0), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(s1, opt, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(s1, opt, 1000000), NNG_EINVAL);
+ NUTS_PASS(nng_socket_set_int(s1, opt, 3));
+ NUTS_PASS(nng_socket_get_int(s1, opt, &v));
+ NUTS_TRUE(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ NUTS_PASS(nng_socket_get(s1, opt, &v, &sz));
+ NUTS_TRUE(v == 3);
+ NUTS_TRUE(sz == sizeof(v));
+
+ NUTS_FAIL(nng_socket_set(s1, opt, "", 1), NNG_EINVAL);
+ sz = 1;
+ NUTS_FAIL(nng_socket_get(s1, opt, &v, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_bool(s1, opt, true), NNG_EBADTYPE);
+ NUTS_FAIL(nng_socket_get_bool(s1, opt, &b), NNG_EBADTYPE);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_bus_cooked(void)
+{
+ nng_socket s;
+ bool b;
+
+ NUTS_PASS(nng_bus0_open(&s));
+ NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b));
+ NUTS_TRUE(!b);
+ NUTS_FAIL(nng_socket_set_bool(s, NNG_OPT_RAW, true), NNG_EREADONLY);
+ NUTS_PASS(nng_close(s));
+
+ // raw pub only differs in the option setting
+ NUTS_PASS(nng_bus0_open_raw(&s));
+ NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b));
+ NUTS_TRUE(b);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_bug1247(void)
+{
+ nng_socket bus1, bus2;
+ char *addr;
+
+ NUTS_ADDR(addr, "tcp");
+
+ NUTS_PASS(nng_bus0_open(&bus1));
+ NUTS_PASS(nng_bus0_open(&bus2));
+
+ NUTS_PASS(nng_listen(bus1, addr, NULL, 0));
+ NUTS_FAIL(nng_listen(bus2, addr, NULL, 0), NNG_EADDRINUSE);
+
+ NUTS_CLOSE(bus2);
+ NUTS_CLOSE(bus1);
+}
+
+TEST_LIST = {
+ { "bus identity", test_bus_identity },
+ { "bus star", test_bus_star },
+ { "bus device", test_bus_device },
+ { "bus validate peer", test_bus_validate_peer },
+ { "bus no context", test_bus_no_context },
+ { "bus poll read", test_bus_poll_readable },
+ { "bus poll write", test_bus_poll_writeable },
+ { "bus send no pipes", test_bus_send_no_pipes },
+ { "bus send flood", test_bus_send_flood },
+ { "bus recv cancel", test_bus_recv_cancel },
+ { "bus close recv abort", test_bus_close_recv_abort },
+ { "bus aio stopped", test_bus_aio_stopped },
+ { "bus recv buf option", test_bus_recv_buf_option },
+ { "bus send buf option", test_bus_send_buf_option },
+ { "bus cooked", test_bus_cooked },
+ { "bug1247", test_bug1247 },
+ { NULL, NULL },
+};