diff options
| -rw-r--r-- | include/nng/protocol/bus0/bus.h | 7 | ||||
| -rw-r--r-- | src/sp/protocol/bus0/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/sp/protocol/bus0/bug1247_test.c | 35 | ||||
| -rw-r--r-- | src/sp/protocol/bus0/bus.c | 419 | ||||
| -rw-r--r-- | src/sp/protocol/bus0/bus_test.c | 423 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/bus.c | 90 |
7 files changed, 688 insertions, 291 deletions
diff --git a/include/nng/protocol/bus0/bus.h b/include/nng/protocol/bus0/bus.h index c8c23d84..31167d75 100644 --- a/include/nng/protocol/bus0/bus.h +++ b/include/nng/protocol/bus0/bus.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -27,6 +27,11 @@ NNG_DECL int nng_bus0_open_raw(nng_socket *); #define nng_bus_open_raw nng_bus0_open_raw #endif +#define NNG_BUS0_SELF 0x70 +#define NNG_BUS0_PEER 0x70 +#define NNG_BUS0_SELF_NAME "bus" +#define NNG_BUS0_PEER_NAME "bus" + #ifdef __cplusplus } #endif diff --git a/src/sp/protocol/bus0/CMakeLists.txt b/src/sp/protocol/bus0/CMakeLists.txt index 01c0b05b..ca7ee9bc 100644 --- a/src/sp/protocol/bus0/CMakeLists.txt +++ b/src/sp/protocol/bus0/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> # Copyright 2018 Capitar IT Group BV <info@capitar.com> # # This software is supplied under the terms of the MIT License, a @@ -15,4 +15,4 @@ nng_sources_if(NNG_PROTO_BUS0 bus.c) nng_headers_if(NNG_PROTO_BUS0 nng/protocol/bus0/bus.h) nng_defines_if(NNG_PROTO_BUS0 NNG_HAVE_BUS0) -nng_test(bug1247_test)
\ No newline at end of file +nng_test(bus_test)
\ No newline at end of file diff --git a/src/sp/protocol/bus0/bug1247_test.c b/src/sp/protocol/bus0/bug1247_test.c deleted file mode 100644 index bbc6958b..00000000 --- a/src/sp/protocol/bus0/bug1247_test.c +++ /dev/null @@ -1,35 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <nuts.h> - -#include <nng/protocol/bus0/bus.h> - -void -test_bug1247(void) -{ - nng_socket bus1, bus2; - char * addr; - - NUTS_ADDR(addr, "tcp"); - - NUTS_PASS(nng_bus0_open(&bus1)); - NUTS_PASS(nng_bus0_open(&bus2)); - - NUTS_PASS(nng_listen(bus1, addr, NULL, 0)); - NUTS_FAIL(nng_listen(bus2, addr, NULL, 0), NNG_EADDRINUSE); - - NUTS_PASS(nng_close(bus2)); - NUTS_PASS(nng_close(bus1)); -} - -TEST_LIST = { - { "bug1247", test_bug1247 }, - { NULL, NULL }, -}; diff --git a/src/sp/protocol/bus0/bus.c b/src/sp/protocol/bus0/bus.c index ab857d72..faa94c13 100644 --- a/src/sp/protocol/bus0/bus.c +++ b/src/sp/protocol/bus0/bus.c @@ -13,6 +13,7 @@ #include "core/nng_impl.h" #include "nng/protocol/bus0/bus.h" +#include <stdio.h> // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order @@ -26,41 +27,35 @@ typedef struct bus0_pipe bus0_pipe; typedef struct bus0_sock bus0_sock; -static void bus0_sock_getq(bus0_sock *); static void bus0_sock_send(void *, nni_aio *); static void bus0_sock_recv(void *, nni_aio *); -static void bus0_pipe_getq(bus0_pipe *); static void bus0_pipe_recv(bus0_pipe *); -static void bus0_sock_getq_cb(void *); -static void bus0_sock_getq_cb_raw(void *); -static void bus0_pipe_getq_cb(void *); static void bus0_pipe_send_cb(void *); static void bus0_pipe_recv_cb(void *); -static void bus0_pipe_putq_cb(void *); // bus0_sock is our per-socket protocol private structure. struct bus0_sock { - nni_aio aio_getq; - nni_list pipes; - nni_mtx mtx; - nni_msgq *uwq; - nni_msgq *urq; - bool raw; + nni_list pipes; + nni_mtx mtx; + nni_pollable can_send; + nni_pollable can_recv; + nni_lmq recv_msgs; + nni_list recv_wait; + int send_buf; + bool raw; }; // bus0_pipe is our per-pipe protocol private structure. struct bus0_pipe { - nni_pipe *npipe; - bus0_sock *psock; - nni_msgq *sendq; + nni_pipe *pipe; + bus0_sock *bus; + nni_lmq send_queue; nni_list_node node; - nni_aio aio_getq; - nni_aio aio_recv; - nni_aio aio_send; - nni_aio aio_putq; - nni_mtx mtx; + bool busy; + nni_aio aio_recv; + nni_aio aio_send; }; static void @@ -68,50 +63,57 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_fini(&s->aio_getq); nni_mtx_fini(&s->mtx); + nni_pollable_fini(&s->can_send); + nni_pollable_fini(&s->can_recv); + nni_lmq_fini(&s->recv_msgs); } static void -bus0_sock_init(void *arg, nni_sock *nsock) +bus0_sock_init(void *arg, nni_sock *ns) { bus0_sock *s = arg; + NNI_ARG_UNUSED(ns); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s); - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + nni_aio_list_init(&s->recv_wait); + nni_pollable_init(&s->can_send); + nni_pollable_init(&s->can_recv); + nni_lmq_init(&s->recv_msgs, 16); + s->send_buf = 16; + s->raw = false; } static void -bus0_sock_init_raw(void *arg, nni_sock *nsock) +bus0_sock_init_raw(void *arg, nni_sock *ns) { bus0_sock *s = arg; - NNI_LIST_INIT(&s->pipes, bus0_pipe, node); - nni_mtx_init(&s->mtx); - nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s); - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + bus0_sock_init(arg, ns); s->raw = true; } static void bus0_sock_open(void *arg) { - bus0_sock *s = arg; - - bus0_sock_getq(s); + NNI_ARG_UNUSED(arg); } static void bus0_sock_close(void *arg) { bus0_sock *s = arg; + nni_aio *aio; - nni_aio_close(&s->aio_getq); + nni_mtx_lock(&s->mtx); + while ((aio = nni_list_first(&s->recv_wait)) != NULL) { + nni_list_remove(&s->recv_wait, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); } static void @@ -119,10 +121,8 @@ bus0_pipe_stop(void *arg) { bus0_pipe *p = arg; - nni_aio_stop(&p->aio_getq); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_putq); } static void @@ -130,33 +130,23 @@ bus0_pipe_fini(void *arg) { bus0_pipe *p = arg; - nni_aio_fini(&p->aio_getq); nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_msgq_fini(p->sendq); - nni_mtx_fini(&p->mtx); + nni_lmq_fini(&p->send_queue); } static int -bus0_pipe_init(void *arg, nni_pipe *npipe, void *s) +bus0_pipe_init(void *arg, nni_pipe *np, void *s) { bus0_pipe *p = arg; - int rv; + p->pipe = np; + p->bus = s; NNI_LIST_NODE_INIT(&p->node); - nni_mtx_init(&p->mtx); - nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p); nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p); nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p); - nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p); - if ((rv = nni_msgq_init(&p->sendq, 16)) != 0) { - bus0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->send_queue, p->bus->send_buf); - p->npipe = npipe; - p->psock = s; return (0); } @@ -164,10 +154,9 @@ static int bus0_pipe_start(void *arg) { bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_sock *s = p->bus; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) { - // Peer protocol mismatch. + if (nni_pipe_peer(p->pipe) != NNI_PROTO_BUS_V0) { return (NNG_EPROTO); } @@ -176,7 +165,6 @@ bus0_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); bus0_pipe_recv(p); - bus0_pipe_getq(p); return (0); } @@ -185,15 +173,13 @@ static void bus0_pipe_close(void *arg) { bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_sock *s = p->bus; - nni_aio_close(&p->aio_getq); nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_putq); - nni_msgq_close(p->sendq); nni_mtx_lock(&s->mtx); + nni_lmq_flush(&p->send_queue); if (nni_list_active(&s->pipes, p)) { nni_list_remove(&s->pipes, p); } @@ -201,187 +187,278 @@ bus0_pipe_close(void *arg) } static void -bus0_pipe_getq_cb(void *arg) -{ - bus0_pipe *p = arg; - - if (nni_aio_result(&p->aio_getq) != 0) { - // closed? - nni_pipe_close(p->npipe); - return; - } - nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); - nni_aio_set_msg(&p->aio_getq, NULL); - - nni_pipe_send(p->npipe, &p->aio_send); -} - -static void bus0_pipe_send_cb(void *arg) { bus0_pipe *p = arg; + bus0_sock *s = p->bus; + nni_msg *msg; if (nni_aio_result(&p->aio_send) != 0) { // closed? nni_msg_free(nni_aio_get_msg(&p->aio_send)); nni_aio_set_msg(&p->aio_send, NULL); - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } - bus0_pipe_getq(p); + nni_mtx_lock(&s->mtx); + if (nni_lmq_get(&p->send_queue, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + } else { + p->busy = false; + } + nni_mtx_unlock(&s->mtx); } static void bus0_pipe_recv_cb(void *arg) { - bus0_pipe *p = arg; - bus0_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->bus; + nni_aio *aio = NULL; nni_msg *msg; if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + nni_mtx_lock(&s->mtx); if (s->raw) { - nni_msg_header_append_u32(msg, nni_pipe_id(p->npipe)); + nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); } - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_aio_set_msg(&p->aio_putq, msg); - nni_aio_set_msg(&p->aio_recv, NULL); - nni_msgq_aio_put(s->urq, &p->aio_putq); + if (!nni_list_empty(&s->recv_wait)) { + aio = nni_list_first(&s->recv_wait); + nni_aio_list_remove(aio); + nni_aio_set_msg(aio, msg); + } else if (nni_lmq_put(&s->recv_msgs, msg) == 0) { + nni_pollable_raise(&s->can_recv); + } else { + // dropped message due to no room + nni_msg_free(msg); + } + nni_mtx_unlock(&s->mtx); + + if (aio != NULL) { + nni_aio_finish_sync(aio, 0, nni_msg_len(msg)); + } + bus0_pipe_recv(p); } static void -bus0_pipe_putq_cb(void *arg) +bus0_pipe_recv(bus0_pipe *p) { - bus0_pipe *p = arg; - - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_putq)); - nni_aio_set_msg(&p->aio_putq, NULL); - nni_pipe_close(p->npipe); - return; - } - - // Wait for another recv. - bus0_pipe_recv(p); + nni_pipe_recv(p->pipe, &p->aio_recv); } static void -bus0_sock_getq_cb(void *arg) +bus0_sock_send(void *arg, nni_aio *aio) { bus0_sock *s = arg; - bus0_pipe *p; - bus0_pipe *lastp; nni_msg *msg; - nni_msg *dup; + bus0_pipe *pipe; + uint32_t sender = 0; + size_t len; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_begin(aio) != 0) { return; } - msg = nni_aio_get_msg(&s->aio_getq); + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); - // We ignore any headers present for cooked mode. - nni_msg_header_clear(msg); + if (s->raw) { + // In raw mode, we look for the message header, to see if it + // is being resent from another pipe (e.g. via a device). + // We don't want to send it back to the originator. + if (nni_msg_header_len(msg) >= sizeof(uint32_t)) { + sender = nni_msg_header_trim_u32(msg); + } + } else { + // In cooked mode just strip the header. + nni_msg_header_clear(msg); + } nni_mtx_lock(&s->mtx); - lastp = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != lastp) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - msg = NULL; + NNI_LIST_FOREACH (&s->pipes, pipe) { + + if (s->raw && nni_pipe_id(pipe->pipe) == sender) { + continue; } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + + // if the pipe isn't busy, then send this message direct. + if (!pipe->busy) { + pipe->busy = true; + nni_msg_clone(msg); + nni_aio_set_msg(&pipe->aio_send, msg); + nni_pipe_send(pipe->pipe, &pipe->aio_send); + } else if (!nni_lmq_full(&pipe->send_queue)) { + nni_msg_clone(msg); + nni_lmq_put(&pipe->send_queue, msg); } } nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + nni_aio_finish(aio, 0, len); +} - bus0_sock_getq(s); +static void +bus0_recv_cancel(nng_aio *aio, void *arg, int rv) +{ + bus0_sock *s = arg; + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); } static void -bus0_sock_getq_cb_raw(void *arg) +bus0_sock_recv(void *arg, nni_aio *aio) { bus0_sock *s = arg; - bus0_pipe *p; nni_msg *msg; - uint32_t sender; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_begin(aio) != 0) { return; } - msg = nni_aio_get_msg(&s->aio_getq); - - // The header being present indicates that the message - // was received locally and we are rebroadcasting. (Device - // is doing this probably.) In this case grab the pipe - // ID from the header, so we can exclude it. - if (nni_msg_header_len(msg) >= 4) { - sender = nni_msg_header_trim_u32(msg); - } else { - sender = 0; - } - nni_mtx_lock(&s->mtx); - NNI_LIST_FOREACH (&s->pipes, p) { - if (nni_pipe_id(p->npipe) == sender) { - continue; - } - nni_msg_clone(msg); - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); +again: + if (nni_lmq_empty(&s->recv_msgs)) { + int rv; + if ((rv = nni_aio_schedule(aio, bus0_recv_cancel, s)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } + nni_list_append(&s->recv_wait, aio); + nni_mtx_unlock(&s->mtx); + return; + } + + (void) nni_lmq_get(&s->recv_msgs, &msg); + + if (nni_lmq_empty(&s->recv_msgs)) { + nni_pollable_clear(&s->can_recv); } + if ((msg = nni_msg_unique(msg)) == NULL) { + goto again; + } + nni_aio_set_msg(aio, msg); nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} - bus0_sock_getq(s); +static int +bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t) +{ + bus0_sock *sock = arg; + int fd; + int rv; + nni_mtx_lock(&sock->mtx); + // BUS sockets are *always* writable (best effort) + nni_pollable_raise(&sock->can_send); + rv = nni_pollable_getfd(&sock->can_send, &fd); + nni_mtx_unlock(&sock->mtx); + + if (rv == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); } -static void -bus0_sock_getq(bus0_sock *s) +static int +bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { - nni_msgq_aio_get(s->uwq, &s->aio_getq); + bus0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) { + rv = nni_copyout_int(fd, buf, szp, t); + } + return (rv); } -static void -bus0_pipe_getq(bus0_pipe *p) +static int +bus0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); + bus0_sock *s = arg; + int val; + nni_mtx_lock(&s->mtx); + val = (int) nni_lmq_cap(&s->recv_msgs); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); } -static void -bus0_pipe_recv(bus0_pipe *p) +static int +bus0_sock_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - nni_pipe_recv(p->npipe, &p->aio_recv); + bus0_sock *s = arg; + int val; + nni_mtx_lock(&s->mtx); + val = s->send_buf; + nni_mtx_unlock(&s->mtx); + return (nni_copyout_int(val, buf, szp, t)); } -static void -bus0_sock_send(void *arg, nni_aio *aio) +static int +bus0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { bus0_sock *s = arg; + int val; + int rv; - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + if ((rv = nni_lmq_resize(&s->recv_msgs, (size_t) val)) != 0) { + nni_mtx_unlock(&s->mtx); + return (rv); + } + + nni_mtx_unlock(&s->mtx); + return (0); } -static void -bus0_sock_recv(void *arg, nni_aio *aio) +static int +bus0_sock_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { bus0_sock *s = arg; + bus0_pipe *p; + int val; + int rv; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&s->mtx); + s->send_buf = val; + NNI_LIST_FOREACH (&s->pipes, p) { + // If we fail part way through (should only be ENOMEM), we + // stop short. The others would likely fail for ENOMEM as + // well anyway. There is a weird effect here where the + // buffers may have been set for *some* of the pipes, but + // we have no way to correct partial failure. + if ((rv = nni_lmq_resize(&p->send_queue, (size_t) val)) != 0) { + break; + } + } + nni_mtx_unlock(&s->mtx); + return (rv); } static nni_proto_pipe_ops bus0_pipe_ops = { @@ -394,6 +471,24 @@ static nni_proto_pipe_ops bus0_pipe_ops = { }; static nni_option bus0_sock_options[] = { + { + .o_name = NNG_OPT_SENDFD, + .o_get = bus0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = bus0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = bus0_sock_get_recv_buf_len, + .o_set = bus0_sock_set_recv_buf_len, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = bus0_sock_get_send_buf_len, + .o_set = bus0_sock_set_send_buf_len, + }, // terminate list { .o_name = NULL, @@ -441,13 +536,13 @@ static nni_proto bus0_proto_raw = { }; int -nng_bus0_open(nng_socket *sidp) +nng_bus0_open(nng_socket *id) { - return (nni_proto_open(sidp, &bus0_proto)); + return (nni_proto_open(id, &bus0_proto)); } int -nng_bus0_open_raw(nng_socket *sidp) +nng_bus0_open_raw(nng_socket *id) { - return (nni_proto_open(sidp, &bus0_proto_raw)); + return (nni_proto_open(id, &bus0_proto_raw)); } diff --git a/src/sp/protocol/bus0/bus_test.c b/src/sp/protocol/bus0/bus_test.c new file mode 100644 index 00000000..77d2b4d6 --- /dev/null +++ b/src/sp/protocol/bus0/bus_test.c @@ -0,0 +1,423 @@ +// +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +#include <nng/protocol/bus0/bus.h> + +#define SECOND 1000 + +void +test_bus_identity(void) +{ + nng_socket s; + int p; + char *n; + + NUTS_PASS(nng_bus0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NNG_BUS0_SELF); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NNG_BUS0_PEER); // 49 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, NNG_BUS0_SELF_NAME); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, NNG_BUS0_PEER_NAME); + nng_strfree(n); + NUTS_CLOSE(s); +} + +static void +test_bus_star(void) +{ + nng_socket s1, s2, s3; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_PASS(nng_bus0_open(&s3)); + + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(s3, NNG_OPT_RECVTIMEO, SECOND)); + + NUTS_MARRY(s1, s2); + NUTS_MARRY(s1, s3); + + NUTS_SEND(s1, "one"); + NUTS_RECV(s2, "one"); + NUTS_RECV(s3, "one"); + + NUTS_SEND(s2, "two"); + NUTS_SEND(s1, "one"); + NUTS_RECV(s1, "two"); + NUTS_RECV(s2, "one"); + NUTS_RECV(s3, "one"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + NUTS_CLOSE(s3); +} + +static void +test_bus_device(void) +{ + nng_socket s1, s2, s3; + nng_socket none = NNG_SOCKET_INITIALIZER; + nng_aio *aio; + + NUTS_PASS(nng_bus0_open_raw(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_PASS(nng_bus0_open(&s3)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(s3, NNG_OPT_RECVTIMEO, SECOND)); + + NUTS_MARRY(s1, s2); + NUTS_MARRY(s1, s3); + + nng_device_aio(aio, s1, none); + + NUTS_SEND(s2, "two"); + NUTS_SEND(s3, "three"); + NUTS_RECV(s2, "three"); + NUTS_RECV(s3, "two"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + NUTS_CLOSE(s3); + + nng_aio_free(aio); +} + +static void +test_bus_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat *stats; + nng_stat *reject; + char *addr; + + NUTS_ADDR(addr, "inproc"); + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_pair0_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_bus_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_bus0_open(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_bus_recv_cancel(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, SECOND); + nng_recv_aio(s1, aio); + nng_aio_abort(aio, NNG_ECANCELED); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + NUTS_CLOSE(s1); + nng_aio_free(aio); +} + +static void +test_bus_close_recv_abort(void) +{ + nng_socket s1; + nng_aio *aio; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, SECOND); + nng_recv_aio(s1, aio); + NUTS_CLOSE(s1); + + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); + nng_aio_free(aio); +} + +static void +test_bus_aio_stopped(void) +{ + nng_socket s1; + nng_aio *aio; + nng_msg *msg; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + nng_aio_stop(aio); + + nng_recv_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + nng_aio_set_msg(aio, msg); + nng_send_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + + nng_aio_free(aio); + nng_msg_free(msg); + NUTS_CLOSE(s1); +} + +static void +test_bus_send_no_pipes(void) +{ + nng_socket s1; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_SEND(s1, "DROP1"); + NUTS_SEND(s1, "DROP2"); + NUTS_CLOSE(s1); +} + +static void +test_bus_send_flood(void) +{ + nng_socket s1, s2; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1)); + + // Even after connect (no message yet) + NUTS_MARRY(s1, s2); + + // Even if we send messages. + for (int i = 0; i < 1000; i++) { + NUTS_SEND(s2, "one thousand"); + } + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_bus_poll_readable(void) +{ + int fd; + nng_socket s1, s2; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(s2, s1); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(s2, "abc"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(s1, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +static void +test_bus_poll_writeable(void) +{ + int fd; + nng_socket s1, s2; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_socket_get_int(s2, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Pub is *always* writeable + NUTS_TRUE(nuts_poll_fd(fd)); + + // Even after connect (no message yet) + NUTS_MARRY(s1, s2); + NUTS_TRUE(nuts_poll_fd(fd)); + + // Even if we send messages. + NUTS_SEND(s2, "abc"); + NUTS_TRUE(nuts_poll_fd(fd)); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_bus_recv_buf_option(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_RECVBUF; + + NUTS_PASS(nng_bus0_open(&s)); + + NUTS_PASS(nng_socket_set_int(s, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s, opt, 3)); + NUTS_PASS(nng_socket_get_int(s, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(s); +} + +static void +test_bus_send_buf_option(void) +{ + nng_socket s1; + nng_socket s2; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_SENDBUF; + + NUTS_PASS(nng_bus0_open(&s1)); + NUTS_PASS(nng_bus0_open(&s2)); + NUTS_MARRY(s1, s2); + + NUTS_PASS(nng_socket_set_int(s1, opt, 1)); + NUTS_FAIL(nng_socket_set_int(s1, opt, 0), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s1, opt, -1), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_int(s1, opt, 1000000), NNG_EINVAL); + NUTS_PASS(nng_socket_set_int(s1, opt, 3)); + NUTS_PASS(nng_socket_get_int(s1, opt, &v)); + NUTS_TRUE(v == 3); + v = 0; + sz = sizeof(v); + NUTS_PASS(nng_socket_get(s1, opt, &v, &sz)); + NUTS_TRUE(v == 3); + NUTS_TRUE(sz == sizeof(v)); + + NUTS_FAIL(nng_socket_set(s1, opt, "", 1), NNG_EINVAL); + sz = 1; + NUTS_FAIL(nng_socket_get(s1, opt, &v, &sz), NNG_EINVAL); + NUTS_FAIL(nng_socket_set_bool(s1, opt, true), NNG_EBADTYPE); + NUTS_FAIL(nng_socket_get_bool(s1, opt, &b), NNG_EBADTYPE); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_bus_cooked(void) +{ + nng_socket s; + bool b; + + NUTS_PASS(nng_bus0_open(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(!b); + NUTS_FAIL(nng_socket_set_bool(s, NNG_OPT_RAW, true), NNG_EREADONLY); + NUTS_PASS(nng_close(s)); + + // raw pub only differs in the option setting + NUTS_PASS(nng_bus0_open_raw(&s)); + NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); + NUTS_TRUE(b); + NUTS_CLOSE(s); +} + +static void +test_bug1247(void) +{ + nng_socket bus1, bus2; + char *addr; + + NUTS_ADDR(addr, "tcp"); + + NUTS_PASS(nng_bus0_open(&bus1)); + NUTS_PASS(nng_bus0_open(&bus2)); + + NUTS_PASS(nng_listen(bus1, addr, NULL, 0)); + NUTS_FAIL(nng_listen(bus2, addr, NULL, 0), NNG_EADDRINUSE); + + NUTS_CLOSE(bus2); + NUTS_CLOSE(bus1); +} + +TEST_LIST = { + { "bus identity", test_bus_identity }, + { "bus star", test_bus_star }, + { "bus device", test_bus_device }, + { "bus validate peer", test_bus_validate_peer }, + { "bus no context", test_bus_no_context }, + { "bus poll read", test_bus_poll_readable }, + { "bus poll write", test_bus_poll_writeable }, + { "bus send no pipes", test_bus_send_no_pipes }, + { "bus send flood", test_bus_send_flood }, + { "bus recv cancel", test_bus_recv_cancel }, + { "bus close recv abort", test_bus_close_recv_abort }, + { "bus aio stopped", test_bus_aio_stopped }, + { "bus recv buf option", test_bus_recv_buf_option }, + { "bus send buf option", test_bus_send_buf_option }, + { "bus cooked", test_bus_cooked }, + { "bug1247", test_bug1247 }, + { NULL, NULL }, +}; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f9f9ee6e..bc212c0f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -148,7 +148,6 @@ add_nng_test(ws 30) add_nng_test(wss 30) add_nng_test1(zt 60 NNG_TRANSPORT_ZEROTIER) -add_nng_test(bus 5) add_nng_test(reqctx 5) add_nng_test(reqstress 60) diff --git a/tests/bus.c b/tests/bus.c deleted file mode 100644 index ccd48167..00000000 --- a/tests/bus.c +++ /dev/null @@ -1,90 +0,0 @@ -// -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <string.h> - -#include <nng/nng.h> -#include <nng/protocol/bus0/bus.h> - -#include "convey.h" -#include "stubs.h" - -#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) -#define CHECKSTR(m, s) \ - So(nng_msg_len(m) == strlen(s)); \ - So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) - -TestMain("BUS pattern", { - const char *addr = "inproc://test"; - - Convey("We can create a BUS socket", { - nng_socket bus; - - So(nng_bus_open(&bus) == 0); - - Reset({ nng_close(bus); }); - }); - - Convey("We can create a linked BUS topology", { - nng_socket bus1; - nng_socket bus2; - nng_socket bus3; - nng_duration rtimeo; - - So(nng_bus_open(&bus1) == 0); - So(nng_bus_open(&bus2) == 0); - So(nng_bus_open(&bus3) == 0); - - Reset({ - nng_close(bus1); - nng_close(bus2); - nng_close(bus3); - }); - - So(nng_listen(bus1, addr, NULL, 0) == 0); - So(nng_dial(bus2, addr, NULL, 0) == 0); - So(nng_dial(bus3, addr, NULL, 0) == 0); - - rtimeo = 50; - So(nng_setopt_ms(bus1, NNG_OPT_RECVTIMEO, rtimeo) == 0); - So(nng_setopt_ms(bus2, NNG_OPT_RECVTIMEO, rtimeo) == 0); - So(nng_setopt_ms(bus3, NNG_OPT_RECVTIMEO, rtimeo) == 0); - - Convey("Messages delivered", { - nng_msg *msg; - - // This is just a poor man's sleep. - So(nng_recvmsg(bus1, &msg, 0) == NNG_ETIMEDOUT); - So(nng_recvmsg(bus2, &msg, 0) == NNG_ETIMEDOUT); - So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "99bits"); - So(nng_sendmsg(bus2, msg, 0) == 0); - - So(nng_recvmsg(bus1, &msg, 0) == 0); - CHECKSTR(msg, "99bits"); - nng_msg_free(msg); - So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "onthe"); - So(nng_sendmsg(bus1, msg, 0) == 0); - - So(nng_recvmsg(bus2, &msg, 0) == 0); - CHECKSTR(msg, "onthe"); - nng_msg_free(msg); - - So(nng_recvmsg(bus3, &msg, 0) == 0); - CHECKSTR(msg, "onthe"); - nng_msg_free(msg); - }); - }); -}) |
