summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-12-21 10:20:55 -0800
committerGarrett D'Amore <garrett@damore.org>2019-12-24 00:34:29 -0800
commit3f7561417bec08226bcfeb107d94be0dbf71b09e (patch)
tree409901d7929df5eeb7295ab971b34c2e1040f507 /src/protocol
parent9e7a4aff25139703bbc375b6dda263d6d42341a8 (diff)
downloadnng-3f7561417bec08226bcfeb107d94be0dbf71b09e.tar.gz
nng-3f7561417bec08226bcfeb107d94be0dbf71b09e.tar.bz2
nng-3f7561417bec08226bcfeb107d94be0dbf71b09e.zip
fixes #1032 Figure out Darwin bustedness
fixes #1035 Convey is awkward -- consider acutest.h This represents a rather large effort towards cleaning up our testing and optional configuration infrastructure. A separate test library is built by default, which is static, and includes some useful utilities design to make it easier to write shorter and more robust (not timing dependent) tests. This also means that we can cover pretty nearly all the tests (protocols etc.) in every case, even if the shipped image will be minimized. Subsystems which are optional can now use a few new macros to configure what they need see nng_sources_if, nng_headers_if, and nng_defines_if. This goes a long way to making the distributed CMakefiles a lot simpler. Additionally, tests for different parts of the tree can now be located outside of the tests/ tree, so that they can be placed next to the code that they are testing. Beyond the enabling work, the work has only begun, but these changes have resolved the most often failing tests for Darwin in the cloud.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pair1/CMakeLists.txt13
-rw-r--r--src/protocol/pair1/pair.c1
-rw-r--r--src/protocol/pair1/pair1_test.c607
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt32
-rw-r--r--src/protocol/reqrep0/reqrep_test.c300
5 files changed, 922 insertions, 31 deletions
diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt
index cbcb2bc2..3f92c585 100644
--- a/src/protocol/pair1/CMakeLists.txt
+++ b/src/protocol/pair1/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2018 Staysail Systems, Inc. <info@staystail.tech>
+# Copyright 2019 Staysail Systems, Inc. <info@staystail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -12,10 +12,7 @@
option (NNG_PROTO_PAIR1 "Enable PAIRv1 protocol." ON)
mark_as_advanced(NNG_PROTO_PAIR1)
-if (NNG_PROTO_PAIR1)
- set(_DEFS -DNNG_HAVE_PAIR1)
- set(_SRCS protocol/pair1/pair.c ${PROJECT_SOURCE_DIR}/include/nng/protocol/pair1/pair.h)
-
- set(NNG_DEFS ${NNG_DEFS} ${_DEFS} PARENT_SCOPE)
- set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE)
-endif()
+nng_sources_if(NNG_PROTO_PAIR1 pair.c)
+nng_headers_if(NNG_PROTO_PAIR1 nng/protocol/pair1/pair.h)
+nng_defines_if(NNG_PROTO_PAIR1 NNG_HAVE_PAIR1)
+nng_test(pair1_test) \ No newline at end of file
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 451122ea..7adb8bd8 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -12,7 +12,6 @@
#include <string.h>
#include "core/nng_impl.h"
-
#include "nng/protocol/pair1/pair.h"
// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern,
diff --git a/src/protocol/pair1/pair1_test.c b/src/protocol/pair1/pair1_test.c
new file mode 100644
index 00000000..3a78bb18
--- /dev/null
+++ b/src/protocol/pair1/pair1_test.c
@@ -0,0 +1,607 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/pair1/pair.h>
+
+#include <testutil.h>
+
+#include <acutest.h>
+
+#define SECOND 1000
+
+#define APPEND_STR(m, s) TEST_CHECK(nng_msg_append(m, s, strlen(s)) == 0)
+#define CHECK_STR(m, s) \
+ TEST_CHECK(nng_msg_len(m) == strlen(s)); \
+ TEST_CHECK(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+
+void
+test_mono_cooked(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_msg * msg;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append(msg, "ALPHA", strlen("ALPHA") + 1) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_len(msg) == strlen("ALPHA") + 1);
+ TEST_CHECK(strcmp(nng_msg_body(msg), "ALPHA") == 0);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append(msg, "BETA", strlen("BETA") + 1) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_len(msg) == strlen("BETA") + 1);
+ TEST_CHECK(strcmp(nng_msg_body(msg), "BETA") == 0);
+
+ nng_msg_free(msg);
+ TEST_CHECK(nng_close(c1) == 0);
+ TEST_CHECK(nng_close(s1) == 0);
+}
+
+void
+test_mono_faithful(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_socket c2;
+ nng_msg * msg;
+ const char *addr = "inproc://pair1_mono_faithful";
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(nng_pair1_open(&c2) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 4) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c2, NNG_OPT_SENDTIMEO, SECOND) == 0);
+
+ TEST_CHECK(nng_listen(s1, addr, NULL, 0) == 0);
+ TEST_CHECK(nng_dial(c1, addr, NULL, 0) == 0);
+ testutil_sleep(100);
+ TEST_CHECK(nng_dial(c2, addr, NULL, 0) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "ONE");
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "ONE");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "TWO");
+ TEST_CHECK(nng_sendmsg(c2, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+ TEST_CHECK(nng_close(c2) == 0);
+}
+
+void
+test_mono_back_pressure(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ int i;
+ int rv;
+ nng_msg * msg;
+ nng_duration to = 100;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_RECVBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_SENDBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_int(c1, NNG_OPT_RECVBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to) == 0);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ // We choose to allow some buffering. In reality the
+ // buffer size is just 1, and we will fail after 2.
+ for (i = 0, rv = 0; i < 10; i++) {
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ if ((rv = nng_sendmsg(s1, msg, 0)) != 0) {
+ nng_msg_free(msg);
+ break;
+ }
+ }
+ TEST_CHECK(rv == NNG_ETIMEDOUT);
+ TEST_CHECK(i < 10);
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+void
+test_mono_raw_exchange(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+
+ nng_msg *msg;
+ uint32_t hops;
+
+ TEST_CHECK(nng_pair1_open_raw(&s1) == 0);
+ TEST_CHECK(nng_pair1_open_raw(&c1) == 0);
+
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ nng_pipe p = NNG_PIPE_INITIALIZER;
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "GAMMA");
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_msg_header_len(msg) == sizeof(uint32_t));
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ p = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p) > 0);
+
+ CHECK_STR(msg, "GAMMA");
+ TEST_CHECK(nng_msg_header_len(msg) == sizeof(uint32_t));
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &hops) == 0);
+ TEST_CHECK(hops == 2);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "EPSILON");
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c1, &msg, 0) == 0);
+ CHECK_STR(msg, "EPSILON");
+ TEST_CHECK(nng_msg_header_len(msg) == sizeof(uint32_t));
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &hops) == 0);
+ p = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p) > 0);
+
+ TEST_CHECK(hops == 2);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+void
+test_mono_raw_header(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_msg * msg;
+ uint32_t v;
+
+ TEST_CHECK(nng_pair1_open_raw(&s1) == 0);
+ TEST_CHECK(nng_pair1_open_raw(&c1) == 0);
+
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ // Missing bits in the header
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT);
+
+ // Valid header works
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append_u32(msg, 0xFEEDFACE) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_trim_u32(msg, &v) == 0);
+ TEST_CHECK(v == 0xFEEDFACE);
+ nng_msg_free(msg);
+
+ // Header with reserved bits set dropped
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 0xDEAD0000) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT);
+
+ // With the same bits clear it works
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append_u32(msg, 0xFEEDFACE) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_trim_u32(msg, &v) == 0);
+ TEST_CHECK(v == 0xFEEDFACE);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+void
+test_poly_best_effort(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_msg * msg;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0);
+
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_RECVBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_SENDBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_int(c1, NNG_OPT_RECVBUF, 1) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ for (int i = 0; i < 10; i++) {
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ }
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+void
+test_poly_cooked(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_socket c2;
+ nng_msg * msg;
+ bool v;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(nng_pair1_open(&c2) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c2, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 10) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND / 10) == 0);
+ TEST_CHECK(nng_setopt_ms(c2, NNG_OPT_RECVTIMEO, SECOND / 10) == 0);
+
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == false);
+
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0);
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == true);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+ TEST_CHECK(testutil_marry(s1, c2) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "ONE");
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "ONE");
+ p1 = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p1) > 0);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "TWO");
+ TEST_CHECK(nng_sendmsg(c2, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "TWO");
+ p2 = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p2) > 0);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_pipe_id(p1) != nng_pipe_id(p2));
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+
+ nng_msg_set_pipe(msg, p1);
+ APPEND_STR(msg, "UNO");
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c1, &msg, 0) == 0);
+ CHECK_STR(msg, "UNO");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ nng_msg_set_pipe(msg, p2);
+ APPEND_STR(msg, "DOS");
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c2, &msg, 0) == 0);
+ CHECK_STR(msg, "DOS");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(c1) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ nng_msg_set_pipe(msg, p1);
+ APPEND_STR(msg, "EIN");
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c2, &msg, 0) == NNG_ETIMEDOUT);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c2) == 0);
+}
+
+void
+test_poly_late(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ bool v;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == false);
+
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0);
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == true);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ TEST_CHECK(
+ nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == NNG_ESTATE);
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+void
+test_poly_default(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_socket c2;
+ nng_msg * msg;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(nng_pair1_open(&c2) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(c2, NNG_OPT_SENDTIMEO, SECOND) == 0);
+
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+ TEST_CHECK(testutil_marry(s1, c2) == 0);
+
+ // This assumes poly picks the first suitor. Applications
+ // should not make the same assumption.
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "YES");
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c1, &msg, 0) == 0);
+ CHECK_STR(msg, "YES");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(c1) == 0);
+ testutil_sleep(10);
+
+ // Verify that the other pipe is chosen as the next suitor.
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "AGAIN");
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c2, &msg, 0) == 0);
+ CHECK_STR(msg, "AGAIN");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c2) == 0);
+}
+
+void
+test_poly_raw(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_socket c2;
+ nng_msg * msg;
+ bool v;
+ uint32_t hops;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ TEST_CHECK(nng_pair1_open_raw(&s1) == 0);
+ TEST_CHECK(nng_pair1_open(&c1) == 0);
+ TEST_CHECK(nng_pair1_open(&c2) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+ TEST_CHECK(nng_setopt_ms(c2, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == 0);
+
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0);
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0);
+ TEST_CHECK(v == true);
+
+ v = false;
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_RAW, &v) == 0);
+ TEST_CHECK(v == true);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+ TEST_CHECK(testutil_marry(s1, c2) == 0);
+
+ // send/recv works
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "ONE");
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "ONE");
+ p1 = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p1) > 0);
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &hops) == 0);
+ TEST_CHECK(hops == 1);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "TWO");
+ TEST_CHECK(nng_sendmsg(c2, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "TWO");
+ p2 = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p2) > 0);
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &hops) == 0);
+ TEST_CHECK(hops == 1);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_pipe_id(p1) != nng_pipe_id(p2));
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ nng_msg_set_pipe(msg, p1);
+ APPEND_STR(msg, "UNO");
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c1, &msg, 0) == 0);
+ CHECK_STR(msg, "UNO");
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ nng_msg_set_pipe(msg, p2);
+ APPEND_STR(msg, "DOS");
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c2, &msg, 0) == 0);
+ CHECK_STR(msg, "DOS");
+ nng_msg_free(msg);
+
+ // Verify closing the pipe stops any of its traffic
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ APPEND_STR(msg, "ONE");
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ CHECK_STR(msg, "ONE");
+ p1 = nng_msg_get_pipe(msg);
+ TEST_CHECK(nng_pipe_id(p1) > 0);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(c1) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ nng_msg_set_pipe(msg, p1);
+ APPEND_STR(msg, "EIN");
+ TEST_CHECK(nng_msg_header_append_u32(msg, 1) == 0);
+ TEST_CHECK(nng_sendmsg(s1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(c2, &msg, 0) == NNG_ETIMEDOUT);
+}
+
+void
+test_raw(void)
+{
+ nng_socket s1;
+ bool raw;
+
+ TEST_CHECK(nng_pair1_open(&s1) == 0);
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_RAW, &raw) == 0);
+ TEST_CHECK(raw == false);
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_RAW, true) == NNG_EREADONLY);
+ TEST_CHECK(nng_close(s1) == 0);
+
+ TEST_CHECK(nng_pair1_open_raw(&s1) == 0);
+ TEST_CHECK(nng_getopt_bool(s1, NNG_OPT_RAW, &raw) == 0);
+ TEST_CHECK(raw == true);
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_RAW, false) == NNG_EREADONLY);
+ TEST_CHECK(nng_close(s1) == 0);
+}
+
+void
+test_ttl(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_msg * msg;
+ uint32_t val;
+ int ttl;
+
+ TEST_CHECK(nng_pair1_open_raw(&s1) == 0);
+ TEST_CHECK(nng_pair1_open_raw(&c1) == 0);
+ TEST_CHECK(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+ TEST_CHECK(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND / 5) == 0);
+
+ // cannot set insane TTLs
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0) == NNG_EINVAL);
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_MAXTTL, 1000) == NNG_EINVAL);
+ ttl = 8;
+ TEST_CHECK(nng_setopt(s1, NNG_OPT_MAXTTL, &ttl, 1) == NNG_EINVAL);
+ TEST_CHECK(nng_setopt_bool(s1, NNG_OPT_MAXTTL, true) == NNG_EBADTYPE);
+
+ TEST_CHECK(testutil_marry(s1, c1) == 0);
+
+ // Let's check enforcement of TTL
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_MAXTTL, 4) == 0);
+ TEST_CHECK(nng_getopt_int(s1, NNG_OPT_MAXTTL, &ttl) == 0);
+ TEST_CHECK(ttl == 4);
+
+ // Bad TTL bounces
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 4) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT);
+
+ // Good TTL passes
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append_u32(msg, 0xFEEDFACE) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 3) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_trim_u32(msg, &val) == 0);
+ TEST_CHECK(val == 0xFEEDFACE);
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &val) == 0);
+ TEST_CHECK(val == 4);
+ nng_msg_free(msg);
+
+ // Large TTL passes
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0xff) == 0);
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append_u32(msg, 1234) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 0xfe) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == 0);
+ TEST_CHECK(nng_msg_trim_u32(msg, &val) == 0);
+ TEST_CHECK(val == 1234);
+ TEST_CHECK(nng_msg_header_trim_u32(msg, &val) == 0);
+ TEST_CHECK(val == 0xff);
+ nng_msg_free(msg);
+
+ // Max TTL fails
+ TEST_CHECK(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0xff) == 0);
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_header_append_u32(msg, 0xff) == 0);
+ TEST_CHECK(nng_sendmsg(c1, msg, 0) == 0);
+ TEST_CHECK(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT);
+
+ TEST_CHECK(nng_close(s1) == 0);
+ TEST_CHECK(nng_close(c1) == 0);
+}
+
+TEST_LIST = {
+ { "pair1 monogamous cooked", test_mono_cooked },
+ { "pair1 monogamous faithful", test_mono_faithful },
+ { "pair1 monogamous back pressure", test_mono_back_pressure },
+ { "pair1 monogamous raw exchange", test_mono_raw_exchange },
+ { "pair1 monogamous raw header", test_mono_raw_header },
+ { "pair1 polyamorous best effort", test_poly_best_effort },
+ { "pair1 polyamorous cooked", test_poly_cooked },
+ { "pair1 polyamorous late", test_poly_late },
+ { "pair1 polyamorous default", test_poly_default },
+ { "pair1 polyamorous raw", test_poly_raw },
+ { "pair1 raw", test_raw },
+ { "pair1 ttl", test_ttl },
+
+ { NULL, NULL },
+};
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
index bae31433..4778f4ea 100644
--- a/src/protocol/reqrep0/CMakeLists.txt
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -9,30 +9,18 @@
#
# Req/Rep protocol
-option (NNG_PROTO_REQ0 "Enable REQv0 protocol." ON)
+option(NNG_PROTO_REQ0 "Enable REQv0 protocol." ON)
mark_as_advanced(NNG_PROTO_REQ0)
-option (NNG_PROTO_REP0 "Enable REPv0 protocol." ON)
+option(NNG_PROTO_REP0 "Enable REPv0 protocol." ON)
mark_as_advanced(NNG_PROTO_REP0)
-set(_DEFS)
-set(_SRCS)
+nng_sources_if(NNG_PROTO_REQ0 req.c xreq.c)
+nng_headers_if(NNG_PROTO_REQ0 nng/protocol/reqrep0/req.h)
+nng_defines_if(NNG_PROTO_REQ0 NNG_HAVE_REQ0)
-if (NNG_PROTO_REQ0)
- list(APPEND _DEFS -DNNG_HAVE_REQ0)
- list(APPEND _SRCS
- protocol/reqrep0/req.c protocol/reqrep0/xreq.c
- ${PROJECT_SOURCE_DIR}/include/nng/protocol/reqrep0/req.h)
-
-endif()
+nng_sources_if(NNG_PROTO_REP0 rep.c xrep.c)
+nng_headers_if(NNG_PROTO_REP0 nng/protocol/reqrep0/rep.h)
+nng_defines_if(NNG_PROTO_REP0 NNG_HAVE_REP0)
-if (NNG_PROTO_REP0)
- list(APPEND _DEFS -DNNG_HAVE_REP0)
- list(APPEND _SRCS
- protocol/reqrep0/rep.c protocol/reqrep0/xrep.c
- ${PROJECT_SOURCE_DIR}/include/nng/protocol/reqrep0/rep.h)
-
-endif()
-
-set(NNG_DEFS ${NNG_DEFS} ${_DEFS} PARENT_SCOPE)
-set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE)
+nng_test(reqrep_test)
diff --git a/src/protocol/reqrep0/reqrep_test.c b/src/protocol/reqrep0/reqrep_test.c
new file mode 100644
index 00000000..564cf158
--- /dev/null
+++ b/src/protocol/reqrep0/reqrep_test.c
@@ -0,0 +1,300 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+
+#include <acutest.h>
+#include <testutil.h>
+
+#ifndef NNI_PROTO
+#define NNI_PROTO(x, y) (((x) << 4u) | (y))
+#endif
+
+void
+test_req_rep_identity(void)
+{
+ nng_socket s;
+ int p;
+ char * n;
+
+ TEST_CHECK(nng_req0_open(&s) == 0);
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "req") == 0);
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "rep") == 0);
+ TEST_CHECK(nng_close(s) == 0);
+
+ TEST_CHECK(nng_rep0_open(&s) == 0);
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "rep") == 0);
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "req") == 0);
+ TEST_CHECK(nng_close(s) == 0);
+}
+
+void
+test_resend_option(void)
+{
+ nng_socket req;
+ bool b;
+ size_t sz = sizeof(b);
+ const char *opt = NNG_OPT_REQ_RESENDTIME;
+
+ TEST_CHECK(nng_req0_open(&req) == 0);
+
+ TEST_CHECK(nng_setopt_ms(req, opt, 10) == 0);
+ TEST_CHECK(nng_setopt(req, opt, "", 1) == NNG_EINVAL);
+ TEST_CHECK(nng_getopt(req, opt, &b, &sz) == NNG_EINVAL);
+ TEST_CHECK(nng_setopt_bool(req, opt, true) == NNG_EBADTYPE);
+ TEST_CHECK(nng_getopt_bool(req, opt, &b) == NNG_EBADTYPE);
+
+ TEST_CHECK(nng_close(req) == 0);
+}
+
+void
+test_req_recv_bad_state(void)
+{
+ nng_socket req;
+ nng_msg * msg = NULL;
+
+ TEST_CHECK(nng_req0_open(&req) == 0);
+ TEST_CHECK(nng_recvmsg(req, &msg, 0) == NNG_ESTATE);
+ TEST_CHECK(msg == NULL);
+ TEST_CHECK(nng_close(req) == 0);
+}
+
+void
+test_rep_send_bad_state(void)
+{
+ nng_socket rep;
+ nng_msg * msg = NULL;
+
+ TEST_CHECK(nng_rep0_open(&rep) == 0);
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_sendmsg(rep, msg, 0) == NNG_ESTATE);
+ nng_msg_free(msg);
+ TEST_CHECK(nng_close(rep) == 0);
+}
+
+#define SECOND 1000
+
+void
+test_req_rep_exchange(void)
+{
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg = NULL;
+
+ TEST_CHECK(nng_req0_open(&req) == 0);
+ TEST_CHECK(nng_rep0_open(&rep) == 0);
+
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, SECOND) == 0);
+
+ TEST_CHECK(testutil_marry(rep, req) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&msg, 0) == 0);
+ TEST_CHECK(nng_msg_append(msg, "ping", 5) == 0);
+ TEST_CHECK(nng_msg_len(msg) == 5);
+ TEST_CHECK(strcmp(nng_msg_body(msg), "ping") == 0);
+ TEST_CHECK(nng_sendmsg(req, msg, 0) == 0);
+ msg = NULL;
+ TEST_CHECK(nng_recvmsg(rep, &msg, 0) == 0);
+ TEST_CHECK(msg != NULL);
+ TEST_CHECK(nng_msg_len(msg) == 5);
+ TEST_CHECK(strcmp(nng_msg_body(msg), "ping") == 0);
+ nng_msg_trim(msg, 5);
+ TEST_CHECK(nng_msg_append(msg, "pong", 5) == 0);
+ TEST_CHECK(nng_sendmsg(rep, msg, 0) == 0);
+ msg = NULL;
+ TEST_CHECK(nng_recvmsg(req, &msg, 0) == 0);
+ TEST_CHECK(msg != NULL);
+ TEST_CHECK(nng_msg_len(msg) == 5);
+ TEST_CHECK(strcmp(nng_msg_body(msg), "pong") == 0);
+ nng_msg_free(msg);
+
+ TEST_CHECK(nng_close(req) == 0);
+ TEST_CHECK(nng_close(rep) == 0);
+}
+
+void
+test_req_cancel(void)
+{
+ nng_msg * abc;
+ nng_msg * def;
+ nng_msg * cmd;
+ nng_duration retry = SECOND;
+ nng_socket req;
+ nng_socket rep;
+
+ TEST_CHECK(nng_rep_open(&rep) == 0);
+ TEST_CHECK(nng_req_open(&req) == 0);
+
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0);
+ TEST_CHECK(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&abc, 0) == 0);
+ TEST_CHECK(nng_msg_append(abc, "abc", 4) == 0);
+ TEST_CHECK(nng_msg_alloc(&def, 0) == 0);
+ TEST_CHECK(nng_msg_append(def, "def", 4) == 0);
+
+ TEST_CHECK(testutil_marry(rep, req) == 0);
+
+ // Send req #1 (abc).
+ TEST_CHECK(nng_sendmsg(req, abc, 0) == 0);
+
+ // Sleep a bit. This is so that we ensure that our request gets
+ // to the far side. (If we cancel too fast, then our outgoing send
+ // will be canceled before it gets to the peer.)
+ testutil_sleep(100);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to abc.
+ TEST_CHECK(nng_sendmsg(req, def, 0) == 0);
+
+ // Receive the first request (should be abc) on the REP server.
+ TEST_CHECK(nng_recvmsg(rep, &cmd, 0) == 0);
+ TEST_ASSERT(cmd != NULL);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "abc") == 0);
+
+ // REP sends the reply to first command. This will be discarded
+ // by the REQ socket.
+ TEST_CHECK(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Now get the next command from the REP; should be "def".
+ TEST_CHECK(nng_recvmsg(rep, &cmd, 0) == 0);
+ TEST_ASSERT(cmd != NULL);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "def") == 0);
+ TEST_MSG("Received body was %s", nng_msg_body(cmd));
+
+ // And send it back to REQ.
+ TEST_CHECK(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Try a req command. This should give back "def"
+ TEST_CHECK(nng_recvmsg(req, &cmd, 0) == 0);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "def") == 0);
+ nng_msg_free(cmd);
+
+ TEST_CHECK(nng_close(req) == 0);
+ TEST_CHECK(nng_close(rep) == 0);
+}
+
+void
+test_req_cancel_abort_recv(void)
+{
+
+ nng_msg * abc;
+ nng_msg * def;
+ nng_msg * cmd;
+ nng_aio * aio;
+ nng_duration retry = SECOND * 10; // 10s (kind of never)
+ nng_socket req;
+ nng_socket rep;
+
+ TEST_CHECK(nng_rep_open(&rep) == 0);
+ TEST_CHECK(nng_req_open(&req) == 0);
+ TEST_CHECK(nng_aio_alloc(&aio, NULL, NULL) == 0);
+
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0);
+ TEST_CHECK(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0);
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 5 * SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 5 * SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 5 * SECOND) == 0);
+ TEST_CHECK(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 5 * SECOND) == 0);
+
+ TEST_CHECK(nng_msg_alloc(&abc, 0) == 0);
+ TEST_CHECK(nng_msg_append(abc, "abc", 4) == 0);
+ TEST_CHECK(nng_msg_alloc(&def, 0) == 0);
+ TEST_CHECK(nng_msg_append(def, "def", 4) == 0);
+
+ TEST_CHECK(testutil_marry(rep, req) == 0);
+
+ // Send req #1 (abc).
+ TEST_CHECK(nng_sendmsg(req, abc, 0) == 0);
+
+ // Wait for it to get ot the other side.
+ testutil_sleep(100);
+
+ nng_aio_set_timeout(aio, 5 * SECOND);
+ nng_recv_aio(req, aio);
+
+ // Give time for this recv to post properly.
+ testutil_sleep(100);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to
+ // abc.
+ TEST_CHECK(nng_sendmsg(req, def, 0) == 0);
+
+ // Our pending I/O should have been canceled.
+ nng_aio_wait(aio);
+ TEST_CHECK(nng_aio_result(aio) == NNG_ECANCELED);
+
+ // Receive the first request (should be abc) on the REP server.
+ TEST_CHECK(nng_recvmsg(rep, &cmd, 0) == 0);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "abc") == 0);
+
+ // REP sends the reply to first command. This will be
+ // discarded by the REQ socket.
+ TEST_CHECK(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Now get the next command from the REP; should be "def".
+ TEST_CHECK(nng_recvmsg(rep, &cmd, 0) == 0);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "def") == 0);
+
+ // And send it back to REQ.
+ TEST_CHECK(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Try a req command. This should give back "def"
+ TEST_CHECK(nng_recvmsg(req, &cmd, 0) == 0);
+ TEST_CHECK(nng_msg_len(cmd) == 4);
+ TEST_CHECK(strcmp(nng_msg_body(cmd), "def") == 0);
+ nng_msg_free(cmd);
+
+ nng_aio_free(aio);
+ TEST_CHECK(nng_close(req) == 0);
+ TEST_CHECK(nng_close(rep) == 0);
+}
+
+TEST_LIST = {
+ { "req rep identity", test_req_rep_identity },
+ { "resend option", test_resend_option },
+ { "req recv bad state", test_req_recv_bad_state },
+ { "rep send bad state", test_rep_send_bad_state },
+ { "req rep exchange", test_req_rep_exchange },
+ { "req cancel", test_req_cancel },
+ { "req cancel abort recv", test_req_cancel_abort_recv },
+ { NULL, NULL },
+};