aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/CMakeLists.txt4
-rw-r--r--tests/compat_surveyttl.c144
-rw-r--r--tests/reqpoll.c6
-rw-r--r--tests/respondpoll.c109
-rw-r--r--tests/stubs.h31
-rw-r--r--tests/survey.c217
-rw-r--r--tests/surveyctx.c298
-rw-r--r--tests/surveypoll.c126
8 files changed, 932 insertions, 3 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 08806679..af570daa 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -166,7 +166,10 @@ add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqstress 60 NNG_PROTO_REQ0 NNG_PROTO_REP0)
+add_nng_proto_test(respondpoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
+add_nng_proto_test(surveyctx 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
+add_nng_proto_test(surveypoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
# compatbility tests
# We only support these if ALL the legacy protocols are supported. This
@@ -186,6 +189,7 @@ add_nng_compat_test(compat_reqrep 5)
add_nng_compat_test(compat_survey 5)
add_nng_compat_test(compat_reqttl 5)
add_nng_compat_test(compat_shutdown 5)
+add_nng_compat_test(compat_surveyttl 5)
add_nng_compat_test(compat_poll 5)
# These are special tests for compat mode, not inherited from the
diff --git a/tests/compat_surveyttl.c b/tests/compat_surveyttl.c
new file mode 100644
index 00000000..1c6f66be
--- /dev/null
+++ b/tests/compat_surveyttl.c
@@ -0,0 +1,144 @@
+/*
+ Copyright (c) 2012 Martin Sustrik All rights reserved.
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2016 Garrett D'Amore <garrett@damore.org>
+ Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include <nanomsg/nn.h>
+
+#include "compat_testutil.h"
+
+static char socket_address_a[128];
+static char socket_address_b[128];
+int dev0;
+int dev1;
+
+void device (NN_UNUSED void *arg)
+{
+ int rc;
+
+ /* Run the device. */
+ rc = nn_device (dev0, dev1);
+ nn_assert (rc < 0 && nn_errno () == EBADF);
+
+ /* Clean up. */
+ test_close (dev0);
+ test_close (dev1);
+}
+
+int main (int argc, const char *argv[])
+{
+ int end0;
+ int end1;
+ struct nn_thread thread1;
+ int timeo;
+ int maxttl;
+ size_t sz;
+ int rc;
+
+ int port = get_test_port(argc, argv);
+
+ test_addr_from(socket_address_a, "tcp", "127.0.0.1", port);
+ test_addr_from(socket_address_b, "tcp", "127.0.0.1", port + 1);
+
+ /* Intialise the device sockets. */
+ dev0 = test_socket (AF_SP_RAW, NN_RESPONDENT);
+ dev1 = test_socket (AF_SP_RAW, NN_SURVEYOR);
+
+ test_bind (dev0, socket_address_a);
+ test_bind (dev1, socket_address_b);
+
+ /* Start the device. */
+ nn_thread_init (&thread1, device, NULL);
+
+ end0 = test_socket (AF_SP, NN_SURVEYOR);
+ end1 = test_socket (AF_SP, NN_RESPONDENT);
+
+ /* Test the bi-directional device TTL */
+ test_connect (end0, socket_address_a);
+ test_connect (end1, socket_address_b);
+
+ /* Wait for TCP to establish. */
+ nn_sleep (100);
+
+ /* Set up max receive timeout. */
+ timeo = 100;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+ timeo = 100;
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo));
+
+ /* Test default TTL is 8. */
+ sz = sizeof (maxttl);
+ maxttl = -1;
+ rc = nn_getsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, &sz);
+ nn_assert (rc == 0);
+ nn_assert (sz == sizeof (maxttl));
+ nn_assert (maxttl == 8);
+
+ /* Test to make sure option TTL cannot be set below 1. */
+ maxttl = -1;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == -1);
+ maxttl = 0;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 0);
+
+ /* Test to set non-integer size */
+ maxttl = 8;
+ rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, 1);
+ nn_assert (rc < 0 && nn_errno () == EINVAL);
+ nn_assert (maxttl == 8);
+
+ /* Pass a message between endpoints. */
+ test_send (end0, "SURVEY");
+ test_recv (end1, "SURVEY");
+
+ /* Now send a reply. */
+ test_send (end1, "REPLYXYZ");
+ test_recv (end0, "REPLYXYZ");
+
+ /* Now set the max TTL. */
+ maxttl = 1;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+
+ test_send (end0, "DROPTHIS");
+ test_drop (end1, ETIMEDOUT);
+
+ maxttl = 2;
+ test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl));
+ test_send (end0, "DONTDROP");
+ test_recv (end1, "DONTDROP");
+
+ /* Clean up. */
+ test_close (end0);
+ test_close (end1);
+
+ /* Shut down the devices. */
+ nn_term ();
+ nn_thread_term (&thread1);
+
+ return 0;
+}
diff --git a/tests/reqpoll.c b/tests/reqpoll.c
index 64c8df66..aeee7d0b 100644
--- a/tests/reqpoll.c
+++ b/tests/reqpoll.c
@@ -55,7 +55,7 @@ TestMain("REQ pollable", {
atexit(nng_fini);
- Convey("Given a connected REQ/REP pair", {
+ Convey("Given a REQ/REP pair", {
nng_socket req;
nng_socket rep;
nng_ctx ctx;
@@ -74,7 +74,7 @@ TestMain("REQ pollable", {
Convey("REQ ctx not pollable", {
int fd;
So(nng_ctx_open(&ctx, req) == 0);
- Reset({ nng_ctx_close(req); });
+ Reset({ nng_ctx_close(ctx); });
So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
NNG_ENOTSUP);
So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
@@ -87,7 +87,7 @@ TestMain("REQ pollable", {
So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0);
So(isready(fd) == false);
- Convey("And becomes readable on connect", {
+ Convey("And becomes writable on connect", {
So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
0);
nng_msleep(100);
diff --git a/tests/respondpoll.c b/tests/respondpoll.c
new file mode 100644
index 00000000..2e24b5b2
--- /dev/null
+++ b/tests/respondpoll.c
@@ -0,0 +1,109 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+TestMain("Respondent pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected survey pair", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_ctx ctx;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ So(nng_ctx_open(&ctx, resp) == 0);
+
+ So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("Respondent ctx not pollable", {
+ int fd;
+
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("Respondent starts not writable", {
+ int fd;
+
+ So(nng_getopt_int(resp, NNG_OPT_SENDFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And remains unwritable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+
+ Convey("Becomes writable after recv", {
+ nng_msg *m;
+ So(nng_msg_alloc(&m, 0) == 0);
+ So(nng_sendmsg(surv, m, 0) == 0);
+ So(nng_recvmsg(resp, &m, 0) == 0);
+ nng_msg_free(m);
+ So(fdready(fd) == true);
+ });
+ });
+ });
+
+ Convey("Respondent starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(resp, NNG_OPT_RECVFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+ });
+
+ Convey("And becomes readable on data", {
+ nng_msg *msg;
+
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(200);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(fdready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ nng_msleep(300); // give time for msg to arrive
+ So(fdready(fd) == true);
+ Convey("Is no longer readable after recv", {
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(fdready(fd) == false);
+ });
+ });
+ });
+ });
+});
diff --git a/tests/stubs.h b/tests/stubs.h
index da3a6d5f..c34bed7a 100644
--- a/tests/stubs.h
+++ b/tests/stubs.h
@@ -12,11 +12,23 @@
#define STUBS_H
#ifdef _WIN32
+
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
#include <windows.h>
+#include <winsock2.h>
+// order counts
+#include <mswsock.h>
+#define PLATFD SOCKET
+#define poll WSAPoll
#else
+#include <poll.h>
#include <stdint.h>
#include <sys/time.h>
#include <time.h>
+#define PLATFD int
#endif
// Stub handlers for some common things.
@@ -46,6 +58,25 @@ getms(void)
#endif
}
+bool
+fdready(int fd)
+{
+ struct pollfd pfd;
+ pfd.fd = (PLATFD) fd;
+ pfd.events = POLLRDNORM;
+ pfd.revents = 0;
+
+ switch (poll(&pfd, 1, 0)) {
+ case 0:
+ return (false);
+ case 1:
+ return (true);
+ default:
+ ConveyError("BAD POLL RETURN!");
+ return (false);
+ }
+}
+
int
nosocket(nng_socket *s)
{
diff --git a/tests/survey.c b/tests/survey.c
index a3a7ba1d..ed73b3dd 100644
--- a/tests/survey.c
+++ b/tests/survey.c
@@ -14,6 +14,7 @@
#include "protocol/survey0/respond.h"
#include "protocol/survey0/survey.h"
#include "stubs.h"
+#include "supplemental/util/platform.h"
#include <string.h>
@@ -114,5 +115,221 @@ TestMain("SURVEY pattern", {
So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE);
});
});
+
+ Convey("Second send cancels pending recv", {
+ nng_msg *msg;
+ nng_aio *aio;
+
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "one");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ msg = NULL;
+ nng_recv_aio(surv, aio);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "two");
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECANCELED);
+ });
+
+ Convey("Sending a NULL message does not panic", {
+ nng_aio *aio;
+
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ Reset({ nng_aio_free(aio); });
+ So(nng_sendmsg(surv, NULL, 0) == NNG_EINVAL);
+ nng_send_aio(surv, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_EINVAL);
+ });
+
+ Convey("Disconnecting before getting response", {
+ nng_msg *msg;
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_close(surv);
+ nng_msleep(100);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ });
+ });
+
+ Convey("Bad backtrace survey is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 1) ==
+ 0); // high order bit not set!
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Bad backtrace survey is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 1) ==
+ 0); // high order bit not set!
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace survey is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace survey is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ nng_msleep(100);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Bad backtrace response is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ nng_msg_header_append_u32(msg, 1);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
});
+
+ Convey("Bad backtrace response is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 0x80000000) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ nng_msg_header_append_u32(msg, 1);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace response is ignored", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
+ Convey("Missing backtrace response is ignored (raw)", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_msg * msg;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_respondent0_open_raw(&resp) == 0);
+ Reset({
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0);
+ nng_msleep(100);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_msg_header_append_u32(msg, 0x80000000) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) == 0);
+ nng_msg_header_clear(msg);
+ So(nng_sendmsg(resp, msg, 0) == 0);
+ So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT);
+ });
+
});
diff --git a/tests/surveyctx.c b/tests/surveyctx.c
new file mode 100644
index 00000000..9ab2de40
--- /dev/null
+++ b/tests/surveyctx.c
@@ -0,0 +1,298 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+#include <string.h>
+
+static struct {
+ nng_aio *aio;
+ enum { START, SEND, RECV } state;
+ nng_socket s;
+ nng_msg * msg;
+ int cnt;
+} resp_state;
+
+void
+resp_cb(void)
+{
+ int rv;
+
+ if (resp_state.state == START) {
+ resp_state.state = RECV;
+ nng_recv_aio(resp_state.s, resp_state.aio);
+ return;
+ }
+ if ((rv = nng_aio_result(resp_state.aio)) != 0) {
+ if (resp_state.msg != NULL) {
+ nng_msg_free(resp_state.msg);
+ resp_state.msg = NULL;
+ }
+ return;
+ }
+ switch (resp_state.state) {
+ case START:
+ break;
+ case RECV:
+ resp_state.msg = nng_aio_get_msg(resp_state.aio);
+ resp_state.state = SEND;
+ nng_aio_set_msg(resp_state.aio, resp_state.msg);
+ nng_send_aio(resp_state.s, resp_state.aio);
+ break;
+ case SEND:
+ resp_state.msg = NULL;
+ resp_state.state = RECV;
+ nng_aio_set_msg(resp_state.aio, NULL);
+ nng_recv_aio(resp_state.s, resp_state.aio);
+ resp_state.cnt++;
+ break;
+ }
+}
+
+#define NCTX 10
+
+void
+markr(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+static void
+marks(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+nng_ctx ctxs[NCTX];
+uint32_t recv_order[NCTX];
+nng_aio *saios[NCTX];
+nng_aio *raios[NCTX];
+bool recd[NCTX];
+bool sent[NCTX];
+
+TestMain("Surveyor concurrent contexts", {
+ int rv;
+ const char *addr = "inproc://test";
+ int i;
+
+ memset(recv_order, 0, NCTX * sizeof(int));
+
+ atexit(nng_fini);
+
+ Convey("We can use Surveyor contexts concurrently", {
+ nng_socket surv;
+
+ So(nng_aio_alloc(&resp_state.aio, (void *) resp_cb, NULL) ==
+ 0);
+ So(nng_respondent0_open(&resp_state.s) == 0);
+ So(nng_surveyor0_open(&surv) == 0);
+
+ for (i = 0; i < NCTX; i++) {
+ sent[i] = recd[i] = false;
+ recv_order[i] = (uint32_t) i;
+ if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(raios[i], 5000);
+ if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(saios[i], 5000);
+ }
+
+ // So(nng_setopt_int(resp_state.s, NNG_OPT_SENDBUF, NCTX) ==
+ // 0);
+ So(i == NCTX);
+ for (i = 0; i < NCTX; i++) {
+ uint32_t tmp;
+ int ni = rand() % NCTX; // recv index
+
+ tmp = recv_order[i];
+ recv_order[i] = recv_order[ni];
+ recv_order[ni] = tmp;
+ }
+ Reset({
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_free(saios[i]);
+ nng_aio_free(raios[i]);
+ }
+ nng_close(surv);
+ nng_close(resp_state.s);
+ nng_aio_free(resp_state.aio);
+ });
+
+ So(nng_listen(resp_state.s, addr, NULL, 0) == 0);
+ So(nng_dial(surv, addr, NULL, 0) == 0);
+
+ nng_msleep(100); // let things establish.
+
+ // Start the rep state machine going.
+ resp_cb();
+
+ for (i = 0; i < NCTX; i++) {
+ if ((rv = nng_ctx_open(&ctxs[i], surv)) != 0) {
+ break;
+ }
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ // Send messages
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *m;
+ if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) {
+ Fail("msg alloc failed: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_msg_append_u32(m, i)) != 0) {
+ Fail("append failed: %s", nng_strerror(rv));
+ }
+ nng_aio_set_msg(saios[i], m);
+ nng_ctx_send(ctxs[i], saios[i]);
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_wait(saios[i]);
+ if ((rv = nng_aio_result(saios[i])) != 0) {
+ Fail("send failed: %s", nng_strerror(rv));
+ So(false);
+ break;
+ }
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!sent[i]) {
+ Fail("Index %d (%d) not sent", i, i);
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ // Receive answers
+ for (i = 0; i < NCTX; i++) {
+ int ri = recv_order[i];
+ nng_ctx_recv(ctxs[ri], raios[ri]);
+ }
+
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *msg;
+ uint32_t x;
+
+ nng_aio_wait(raios[i]);
+ if ((rv = nng_aio_result(raios[i])) != 0) {
+ Fail("recv %d (%d) %d failed: %s", i,
+ recv_order[i], resp_state.cnt,
+ nng_strerror(rv));
+ continue;
+ }
+ msg = nng_aio_get_msg(raios[i]);
+ if ((rv = nng_msg_chop_u32(msg, &x)) != 0) {
+ Fail("recv msg trim: %s", nng_strerror(rv));
+ break;
+ }
+ if (x != (uint32_t) i) {
+ Fail("message body mismatch: %x %x\n", x,
+ (uint32_t) i);
+ break;
+ }
+
+ nng_msg_free(msg);
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!recd[i]) {
+ Fail("Index %d (%d) not received", i,
+ recv_order[i]);
+ break;
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ });
+
+ Convey("Given a socket and a context", {
+ nng_socket surv;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_ctx_open(&ctx, surv) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ nng_aio_set_timeout(aio, 1000);
+
+ Reset({ nng_aio_free(aio); });
+
+ Convey("Recv on the context is ESTATE", {
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ESTATE);
+ });
+
+ Convey("Closing the socket aborts a context recv", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ nng_ctx_recv(ctx, aio);
+ nng_close(surv);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ });
+
+ Convey("Sending a null message fails", {
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_EINVAL);
+ });
+
+ Convey("Closing the context aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ nng_recv_aio(ctx, aio);
+ nng_ctx_close(ctx);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_close(surv);
+ });
+
+ Convey("We can set separate survey times", {
+ nng_duration ms;
+ So(nng_setopt_ms(
+ surv, NNG_OPT_SURVEYOR_SURVEYTIME, 100) == 0);
+ So(nng_ctx_setopt_ms(
+ ctx, NNG_OPT_SURVEYOR_SURVEYTIME, 200) == 0);
+ So(nng_getopt_ms(
+ surv, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0);
+ So(ms == 100);
+ So(nng_ctx_getopt_ms(
+ ctx, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0);
+ So(ms == 200);
+ });
+ });
+
+ Convey("Raw mode does not support contexts", {
+ nng_socket surv;
+ nng_ctx ctx;
+ So(nng_surveyor0_open_raw(&surv) == 0);
+ So(nng_ctx_open(&ctx, surv) == NNG_ENOTSUP);
+ nng_close(surv);
+ });
+});
diff --git a/tests/surveypoll.c b/tests/surveypoll.c
new file mode 100644
index 00000000..f97e1c22
--- /dev/null
+++ b/tests/surveypoll.c
@@ -0,0 +1,126 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+TestMain("Survey pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected survey pair", {
+ nng_socket surv;
+ nng_socket resp;
+ nng_ctx ctx;
+
+ So(nng_surveyor0_open(&surv) == 0);
+ So(nng_respondent0_open(&resp) == 0);
+ So(nng_ctx_open(&ctx, surv) == 0);
+
+ So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0);
+ So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0);
+ So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(surv);
+ nng_close(resp);
+ });
+ So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("Surveyor ctx not pollable", {
+ int fd;
+
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("Suveyor starts writable", {
+ int fd;
+
+ So(nng_getopt_int(surv, NNG_OPT_SENDFD, &fd) == 0);
+ So(fdready(fd) == true);
+
+ Convey("And becomes readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == true);
+
+ Convey("And stays writable", {
+ // 500 messages should force all
+ // the way to send depth.
+ int i;
+ for (i = 0; i < 500; i++) {
+ nng_msg *m;
+ if (nng_msg_alloc(&m, 0) !=
+ 0) {
+ break;
+ }
+ // Fill intermediate queues.
+ if (nng_sendmsg(surv, m,
+ NNG_FLAG_NONBLOCK) !=
+ 0) {
+ nng_msg_free(m);
+ }
+ }
+ So(i == 500);
+ So(fdready(fd) == true);
+ });
+ });
+ });
+
+ Convey("Surveyor starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(surv, NNG_OPT_RECVFD, &fd) == 0);
+ So(fdready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(fdready(fd) == false);
+ });
+
+ Convey("And becomes readable on data", {
+ nng_msg *msg;
+
+ So(nng_dial(surv, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(200);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(fdready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(surv, msg, 0) == 0);
+ So(nng_recvmsg(resp, &msg, 0) ==
+ 0); // recv on rep
+ So(nng_sendmsg(resp, msg, 0) ==
+ 0); // echo it back
+ nng_msleep(
+ 300); // give time for message to arrive
+ So(fdready(fd) == true);
+ Convey("Is no longer readable after recv", {
+ So(nng_recvmsg(surv, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(fdready(fd) == false);
+ });
+ });
+ });
+ });
+});