From 4d2661e325c0d0b2fa93642470b66a0c746a72e7 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 11 Jan 2017 00:01:56 -0800 Subject: Bus working, and added bus test. --- src/protocol/bus/bus.c | 1 - tests/CMakeLists.txt | 1 + tests/bus.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 tests/bus.c diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index a9e402ef..3c9d0674 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -51,7 +51,6 @@ nni_bus_sock_init(void **sp, nni_sock *nsock) psock->raw = 0; *sp = psock; - nni_sock_recverr(nsock, NNG_ESTATE); return (0); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c5bd75dd..16389601 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,7 @@ else () endmacro (add_nng_perf) endif () +add_nng_test(bus 5) add_nng_test(idhash 5) add_nng_test(inproc 5) add_nng_test(list 5) diff --git a/tests/bus.c b/tests/bus.c new file mode 100644 index 00000000..ec059bb0 --- /dev/null +++ b/tests/bus.c @@ -0,0 +1,104 @@ +// +// Copyright 2017 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" + +#include + +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) + +Main({ + int rv; + const char *addr = "inproc://test"; + + Test("BUS pattern", { + Convey("We can create a BUS socket", { + nng_socket *bus; + + So(nng_open(&bus, NNG_PROTO_BUS) == 0); + So(bus != NULL); + + Reset({ + nng_close(bus); + }) + + Convey("Protocols match", { + So(nng_protocol(bus) == NNG_PROTO_BUS); + So(nng_peer(bus) == NNG_PROTO_BUS); + }) + }) + + Convey("We can create a linked BUS topology", { + nng_socket *bus1; + nng_socket *bus2; + nng_socket *bus3; + uint64_t rtimeo; + + So((rv = nng_open(&bus1, NNG_PROTO_BUS)) == 0); + So(bus1 != NULL); + + So((rv = nng_open(&bus2, NNG_PROTO_BUS)) == 0); + So(bus2 != NULL); + + So((rv = nng_open(&bus3, NNG_PROTO_BUS)) == 0); + So(bus3 != NULL); + + Reset({ + nng_close(bus1); + nng_close(bus2); + nng_close(bus3); + }) + + So(nng_listen(bus1, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(bus2, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(bus3, addr, NULL, NNG_FLAG_SYNCH) == 0); + + rtimeo = 50000; + So(nng_setopt(bus1, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + rtimeo = 50000; + So(nng_setopt(bus2, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + rtimeo = 50000; + So(nng_setopt(bus3, NNG_OPT_RCVTIMEO, &rtimeo, sizeof (rtimeo)) == 0); + + Convey("Messages delivered", { + nng_msg *msg; + + + // This is just a poor man's sleep. + So(nng_recvmsg(bus1, &msg, 0) == NNG_ETIMEDOUT); + So(nng_recvmsg(bus2, &msg, 0) == NNG_ETIMEDOUT); + So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "99bits"); + So(nng_sendmsg(bus2, msg, 0) == 0); + + So(nng_recvmsg(bus1, &msg, 0) == 0); + CHECKSTR(msg, "99bits"); + nng_msg_free(msg); + So(nng_recvmsg(bus3, &msg, 0) == NNG_ETIMEDOUT); + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "onthe"); + So(nng_sendmsg(bus1, msg, 0) == 0); + + So(nng_recvmsg(bus2, &msg, 0) == 0); + CHECKSTR(msg, "onthe"); + nng_msg_free(msg); + + So(nng_recvmsg(bus3, &msg, 0) == 0); + CHECKSTR(msg, "onthe"); + nng_msg_free(msg); + }) + }) + }) +}) -- cgit v1.2.3-70-g09d2