aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/protocol/pubsub0/sub_test.c10
-rw-r--r--tests/CMakeLists.txt2
-rw-r--r--tests/pubsub.c198
-rw-r--r--tests/pubsubpollfd.c125
4 files changed, 7 insertions, 328 deletions
diff --git a/src/protocol/pubsub0/sub_test.c b/src/protocol/pubsub0/sub_test.c
index 4690ad23..3edf1d16 100644
--- a/src/protocol/pubsub0/sub_test.c
+++ b/src/protocol/pubsub0/sub_test.c
@@ -94,7 +94,7 @@ test_sub_poll_readable(void)
TEST_NNG_PASS(nng_sub0_open(&sub));
TEST_NNG_PASS(nng_pub0_open(&pub));
- TEST_NNG_PASS(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0));
+ TEST_NNG_PASS(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1));
TEST_NNG_PASS(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 1000));
TEST_NNG_PASS(nng_setopt_ms(pub, NNG_OPT_SENDTIMEO, 1000));
TEST_NNG_PASS(nng_getopt_int(sub, NNG_OPT_RECVFD, &fd));
@@ -107,11 +107,15 @@ test_sub_poll_readable(void)
TEST_NNG_PASS(testutil_marry(pub, sub));
TEST_CHECK(testutil_pollfd(fd) == false);
+ // If we send a message we didn't subscribe to, that doesn't matter.
+ TEST_NNG_SEND_STR(pub, "def");
+ testutil_sleep(100);
+ TEST_CHECK(testutil_pollfd(fd) == false);
+
// But once we send messages, it is.
// We have to send a request, in order to send a reply.
TEST_NNG_SEND_STR(pub, "abc");
- testutil_sleep(200);
-
+ testutil_sleep(100);
TEST_CHECK(testutil_pollfd(fd) == true);
// and receiving makes it no longer ready
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 80ee0a45..f64d6286 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -148,7 +148,6 @@ add_nng_test(nonblock 60)
add_nng_test(options 5)
add_nng_test(pipe 5)
add_nng_test(pollfd 5)
-add_nng_test(pubsubpollfd 5)
add_nng_test1(resolv 10 NNG_STATIC_LIB)
add_nng_test(scalability 20 ON)
add_nng_test(set_recvmaxsize 2)
@@ -167,7 +166,6 @@ add_nng_test1(zt 60 NNG_TRANSPORT_ZEROTIER)
add_nng_test(bus 5)
add_nng_test(pipeline 5)
-add_nng_test(pubsub 5)
add_nng_test(reqctx 5)
add_nng_test(reqstress 60)
add_nng_test(respondpoll 5)
diff --git a/tests/pubsub.c b/tests/pubsub.c
deleted file mode 100644
index 14bd20fc..00000000
--- a/tests/pubsub.c
+++ /dev/null
@@ -1,198 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-#include <nng/supplemental/util/platform.h>
-
-#include "convey.h"
-#include "stubs.h"
-
-#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
-#define CHECKSTR(m, s) \
- So(nng_msg_len(m) == strlen(s)); \
- So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
-
-TestMain("PUB/SUB pattern", {
- const char *addr = "inproc://test";
-
- Reset({ nng_fini(); });
-
- Convey("We can create a PUB socket", {
- nng_socket pub;
-
- So(nng_pub_open(&pub) == 0);
-
- Reset({ nng_close(pub); });
-
- Convey("Recv fails", {
- nng_msg *msg;
- So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP);
- });
-
- Convey("It cannot subscribe", {
- So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) ==
- NNG_ENOTSUP);
- });
-
- Convey("It cannot unsubscribe", {
- So(nng_setopt(pub, NNG_OPT_SUB_UNSUBSCRIBE, "", 0) ==
- NNG_ENOTSUP);
- });
- });
-
- Convey("We can create a SUB socket", {
- nng_socket sub;
- So(nng_sub_open(&sub) == 0);
-
- Reset({ nng_close(sub); });
-
- Convey("It can subscribe", {
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) ==
- 0);
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0);
- Convey("And it can unsubscribe", {
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
- "ABC", 3) == 0);
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
- 0) == 0);
-
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
- 0) == NNG_ENOENT);
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
- "HELLO", 0) == NNG_ENOENT);
- });
- });
- });
-
- Convey("We can create a linked PUB/SUB pair", {
- nng_socket pub;
- nng_socket sub;
-
- So(nng_pub_open(&pub) == 0);
-
- So(nng_sub_open(&sub) == 0);
-
- Reset({
- nng_close(pub);
- nng_close(sub);
- });
-
- // Most applications will usually have the pub listen,
- // and the sub dial. However, this creates a problem
- // for our tests, since we can wind up trying to push
- // data before the pipe is fully registered (the accept
- // runs asynchronously.)
- So(nng_listen(sub, addr, NULL, 0) == 0);
- So(nng_dial(pub, addr, NULL, 0) == 0);
-
- nng_msleep(200); // give time for connecting threads
-
- Convey("Subs can receive from pubs", {
- nng_msg *msg;
-
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "/some/",
- strlen("/some/")) == 0);
- So(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 90) == 0);
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/some/like/it/hot");
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, 0) == 0);
- CHECKSTR(msg, "/some/like/it/hot");
- nng_msg_free(msg);
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/somewhere/over/the/rainbow");
- CHECKSTR(msg, "/somewhere/over/the/rainbow");
-
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT);
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/some/day/some/how");
- CHECKSTR(msg, "/some/day/some/how");
-
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, 0) == 0);
- CHECKSTR(msg, "/some/day/some/how");
- nng_msg_free(msg);
- });
-
- Convey("Subs using NNG_FLAG_NONBLOCK and no sub", {
- nng_msg *msg;
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/some/don't/like/it");
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, NNG_FLAG_NONBLOCK) == NNG_EAGAIN);
- });
- Convey("Subs using NNG_FLAG_NONBLOCK and empty sub", {
- nng_msg *msg;
- char *buf;
- size_t size;
-
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0);
- So(nng_recvmsg(sub, &msg, NNG_FLAG_NONBLOCK) == NNG_EAGAIN);
- So(nng_recv(sub, &buf, &size, NNG_FLAG_NONBLOCK | NNG_FLAG_ALLOC) == NNG_EAGAIN);
- });
-
- Convey("Subs without subscriptions don't receive", {
- nng_msg *msg;
- So(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 90) == 0);
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/some/don't/like/it");
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT);
- });
-
- });
-
- Convey("Subs in raw receive", {
- nng_msg * msg;
- nng_socket pub;
- nng_socket sub;
- bool raw;
-
- So(nng_pub_open(&pub) == 0);
-
- So(nng_sub_open_raw(&sub) == 0);
-
- Reset({
- nng_close(pub);
- nng_close(sub);
- });
-
- // Most applications will usually have the pub listen,
- // and the sub dial. However, this creates a problem
- // for our tests, since we can wind up trying to push
- // data before the pipe is fully registered (the accept
- // runs asynchronously.)
- So(nng_listen(sub, addr, NULL, 0) == 0);
- So(nng_dial(pub, addr, NULL, 0) == 0);
-
- nng_msleep(200); // give time for connecting threads
-
- So(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 90) == 0);
- So(nng_getopt_bool(sub, NNG_OPT_RAW, &raw) == 0);
- So(raw == true);
-
- So(nng_msg_alloc(&msg, 0) == 0);
- APPENDSTR(msg, "/some/like/it/raw");
- So(nng_sendmsg(pub, msg, 0) == 0);
- So(nng_recvmsg(sub, &msg, 0) == 0);
- CHECKSTR(msg, "/some/like/it/raw");
- nng_msg_free(msg);
- });
-})
diff --git a/tests/pubsubpollfd.c b/tests/pubsubpollfd.c
deleted file mode 100644
index 6a41d5f9..00000000
--- a/tests/pubsubpollfd.c
+++ /dev/null
@@ -1,125 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2019 Behrooze Sirang <behrooze@gmail.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <string.h>
-
-#ifndef _WIN32
-#include <poll.h>
-#include <unistd.h>
-#define INVALID_SOCKET -1
-#else
-
-#define poll WSAPoll
-#ifndef WIN32_LEAN_AND_MEAN
-#define WIN32_LEAN_AND_MEAN
-#endif
-
-#include <windows.h>
-#include <winsock2.h>
-
-#include <mswsock.h>
-#include <ws2tcpip.h>
-
-#endif
-
-#include <nng/nng.h>
-#include <nng/protocol/pubsub0/sub.h>
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/supplemental/util/platform.h>
-
-#include "convey.h"
-#include "stubs.h"
-
-TestMain("PUBSUB pollable", {
- Convey("Given a connected pair of sockets", {
- nng_socket s1;
- nng_socket s2;
-
- So(nng_pub0_open(&s1) == 0);
- So(nng_sub0_open(&s2) == 0);
- Reset({
- nng_close(s1);
- nng_close(s2);
- });
- So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0);
- So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0);
- So(nng_setopt(s2, NNG_OPT_SUB_SUBSCRIBE, "foo", 3) == 0);
- nng_msleep(50);
-
- Convey("We can get a recv FD", {
- int fd;
- size_t sz;
-
- sz = sizeof(fd);
- So(nng_getopt(s2, NNG_OPT_RECVFD, &fd, &sz) == 0);
- So(fd != (int) INVALID_SOCKET);
-
- Convey("And it is always the same fd", {
- int fd2;
- sz = sizeof(fd2);
- So(nng_getopt(s2, NNG_OPT_RECVFD, &fd2, &sz) ==
- 0);
- So(fd2 == fd);
- });
-
- Convey("And they start non pollable", {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- So(poll(&pfd, 1, 0) == 0);
- So(pfd.revents == 0);
- });
-
- Convey("But if we write to subscribed topic they are pollable", {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- So(nng_send(s1, "foo:kick", 8, 0) == 0);
- So(poll(&pfd, 1, 100) == 1);
- So((pfd.revents & POLLIN) != 0);
- });
- Convey("And if the topic doesn't match, it is not pollable", {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- So(nng_send(s1, "bar:kick", 8, 100) == 0);
- So(poll(&pfd, 1, 0) == 0);
- });
- });
-
- Convey("We can get a send FD", {
- int fd;
- size_t sz;
-
- sz = sizeof(fd);
- So(nng_getopt(s1, NNG_OPT_SENDFD, &fd, &sz) == 0);
- So(fd != (int) INVALID_SOCKET);
- So(nng_send(s1, "oops", 4, 0) == 0);
- });
-
- Convey("Must have a big enough size", {
- int fd;
- size_t sz;
- sz = 1;
- So(nng_getopt(s2, NNG_OPT_RECVFD, &fd, &sz) ==
- NNG_EINVAL);
- sz = 128;
- So(nng_getopt(s2, NNG_OPT_RECVFD, &fd, &sz) == 0);
- So(sz == sizeof(fd));
- });
- });
-})