aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/CMakeLists.txt8
-rw-r--r--tests/reqctx.c258
-rw-r--r--tests/reqpoll.c148
-rw-r--r--tests/reqrep.c84
4 files changed, 493 insertions, 5 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index df7e5701..2177c4df 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -147,9 +147,9 @@ add_nng_test(scalability 20 ON)
add_nng_test(sha1 5 NNG_SUPP_SHA1)
add_nng_test(sock 5 ON)
add_nng_test(synch 5 ON)
-add_nng_test(tls 10 NNG_TRANSPORT_TLS)
-add_nng_test(tcp 5 NNG_TRANSPORT_TCP)
-add_nng_test(tcp6 5 NNG_TRANSPORT_TCP)
+add_nng_test(tls 60 NNG_TRANSPORT_TLS)
+add_nng_test(tcp 60 NNG_TRANSPORT_TCP)
+add_nng_test(tcp6 60 NNG_TRANSPORT_TCP)
add_nng_test(transport 5 ON)
add_nng_test(udp 5 ON)
add_nng_test(url 5 ON)
@@ -162,6 +162,8 @@ add_nng_proto_test(bus 5 NNG_PROTO_BUS0 NNG_PROTO_BUS0)
add_nng_test(pipeline 5 NNG_PROTO_PULL0 NNG_PROTO_PIPELINE0)
add_nng_proto_test(pair1 5 NNG_PROTO_PAIR1 NNG_PROTO_PAIR1)
add_nng_proto_test(pubsub 5 NNG_PROTO_PUB0 NNG_PROTO_SUB0)
+add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
+add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
diff --git a/tests/reqctx.c b/tests/reqctx.c
new file mode 100644
index 00000000..4aae2e24
--- /dev/null
+++ b/tests/reqctx.c
@@ -0,0 +1,258 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+#include <string.h>
+
+static struct {
+ nng_aio *aio;
+ enum { START, SEND, RECV } state;
+ nng_socket s;
+ nng_msg * msg;
+ int cnt;
+} rep_state;
+
+void
+rep_cb(void)
+{
+ int rv;
+
+ if (rep_state.state == START) {
+ rep_state.state = RECV;
+ nng_recv_aio(rep_state.s, rep_state.aio);
+ return;
+ }
+ if ((rv = nng_aio_result(rep_state.aio)) != 0) {
+ if (rep_state.msg != NULL) {
+ nng_msg_free(rep_state.msg);
+ rep_state.msg = NULL;
+ }
+ return;
+ }
+ switch (rep_state.state) {
+ case START:
+ break;
+ case RECV:
+ rep_state.msg = nng_aio_get_msg(rep_state.aio);
+ rep_state.state = SEND;
+ nng_aio_set_msg(rep_state.aio, rep_state.msg);
+ nng_send_aio(rep_state.s, rep_state.aio);
+ break;
+ case SEND:
+ rep_state.msg = NULL;
+ rep_state.state = RECV;
+ nng_aio_set_msg(rep_state.aio, NULL);
+ nng_recv_aio(rep_state.s, rep_state.aio);
+ rep_state.cnt++;
+ break;
+ }
+}
+
+#define NCTX 1000
+
+void
+markr(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+static void
+marks(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+nng_ctx ctxs[NCTX];
+uint32_t recv_order[NCTX];
+nng_aio *saios[NCTX];
+nng_aio *raios[NCTX];
+bool recd[NCTX];
+bool sent[NCTX];
+
+TestMain("REQ concurrent contexts", {
+ int rv;
+ const char *addr = "inproc://test";
+ int i;
+
+ memset(recv_order, 0, NCTX * sizeof(int));
+
+ atexit(nng_fini);
+
+ Convey("We can use REQ contexts concurrently", {
+ nng_socket req;
+
+ So(nng_aio_alloc(&rep_state.aio, (void *) rep_cb, NULL) == 0);
+ So(nng_rep_open(&rep_state.s) == 0);
+ So(nng_req_open(&req) == 0);
+
+ for (i = 0; i < NCTX; i++) {
+ sent[i] = recd[i] = false;
+ recv_order[i] = (uint32_t) i;
+ if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(raios[i], 5000);
+ if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(saios[i], 5000);
+ }
+
+ So(nng_setopt_int(rep_state.s, NNG_OPT_SENDBUF, NCTX) == 0);
+ So(i == NCTX);
+ for (i = 0; i < NCTX; i++) {
+ uint32_t tmp;
+ int ni = rand() % NCTX; // recv index
+
+ tmp = recv_order[i];
+ recv_order[i] = recv_order[ni];
+ recv_order[ni] = tmp;
+ }
+ Reset({
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_free(saios[i]);
+ nng_aio_free(raios[i]);
+ }
+ nng_close(req);
+ nng_close(rep_state.s);
+ nng_aio_free(rep_state.aio);
+ });
+
+ So(nng_listen(rep_state.s, addr, NULL, 0) == 0);
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ nng_msleep(100); // let things establish.
+
+ // Start the rep state machine going.
+ rep_cb();
+
+ for (i = 0; i < NCTX; i++) {
+ if ((rv = nng_ctx_open(&ctxs[i], req)) != 0) {
+ break;
+ }
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ // Send messages
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *m;
+ if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) {
+ Fail("msg alloc failed: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_msg_append_u32(m, i)) != 0) {
+ Fail("append failed: %s", nng_strerror(rv));
+ }
+ nng_aio_set_msg(saios[i], m);
+ nng_ctx_send(ctxs[i], saios[i]);
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_wait(saios[i]);
+ if ((rv = nng_aio_result(saios[i])) != 0) {
+ Fail("send failed: %s", nng_strerror(rv));
+ So(false);
+ break;
+ }
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!sent[i]) {
+ Fail("Index %d (%d) not sent", i, i);
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ // Receive answers
+ for (i = 0; i < NCTX; i++) {
+ int ri = recv_order[i];
+ nng_ctx_recv(ctxs[ri], raios[ri]);
+ }
+
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *msg;
+ uint32_t x;
+
+ nng_aio_wait(raios[i]);
+ if ((rv = nng_aio_result(raios[i])) != 0) {
+ Fail("recv %d (%d) %d failed: %s", i,
+ recv_order[i], rep_state.cnt,
+ nng_strerror(rv));
+ continue;
+ }
+ msg = nng_aio_get_msg(raios[i]);
+ if ((rv = nng_msg_chop_u32(msg, &x)) != 0) {
+ Fail("recv msg trim: %s", nng_strerror(rv));
+ break;
+ }
+ if (x != (uint32_t) i) {
+ Fail("message body mismatch: %x %x\n", x,
+ (uint32_t) i);
+ break;
+ }
+
+ nng_msg_free(msg);
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!recd[i]) {
+ Fail("Index %d (%d) not received", i,
+ recv_order[i]);
+ break;
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ });
+
+ Convey("Given a socket and a context", {
+ nng_socket req;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ So(nng_req0_open(&req) == 0);
+ So(nng_ctx_open(&ctx, req) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ nng_aio_set_timeout(aio, 1000);
+
+ Reset({ nng_aio_free(aio); });
+
+ Convey("Closing the socket aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_close(req);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_msg_free(msg);
+ });
+
+ Convey("Closing the context aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_ctx_close(ctx);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_msg_free(msg);
+ nng_close(req);
+ });
+ });
+});
diff --git a/tests/reqpoll.c b/tests/reqpoll.c
new file mode 100644
index 00000000..64c8df66
--- /dev/null
+++ b/tests/reqpoll.c
@@ -0,0 +1,148 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef _WIN32
+#include <poll.h>
+#include <unistd.h>
+#define SOCKET int
+#else
+
+#define poll WSAPoll
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+#include <winsock2.h>
+
+#include <mswsock.h>
+#endif
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+bool
+isready(SOCKET fd)
+{
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLRDNORM;
+ pfd.revents = 0;
+
+ switch (poll(&pfd, 1, 0)) {
+ case 0:
+ return (false);
+ case 1:
+ return (true);
+ default:
+ printf("BAD POLL RETURN!\n");
+ abort();
+ }
+}
+
+TestMain("REQ pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected REQ/REP pair", {
+ nng_socket req;
+ nng_socket rep;
+ nng_ctx ctx;
+
+ So(nng_req0_open(&req) == 0);
+ So(nng_rep0_open(&rep) == 0);
+ So(nng_ctx_open(&ctx, req) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(req);
+ nng_close(rep);
+ });
+ So(nng_listen(rep, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("REQ ctx not pollable", {
+ int fd;
+ So(nng_ctx_open(&ctx, req) == 0);
+ Reset({ nng_ctx_close(req); });
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("REQ starts not writable", {
+ int fd;
+
+ So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0);
+ So(isready(fd) == false);
+
+ Convey("And becomes readable on connect", {
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(isready(fd) == true);
+
+ Convey("Not writable with message pending", {
+ for (int i = 0; i < 10; i++) {
+ nng_msg *m;
+ So(nng_msg_alloc(&m, 0) == 0);
+ // Fill intermediate queues.
+ if (nng_sendmsg(req, m,
+ NNG_FLAG_NONBLOCK) !=
+ 0) {
+ nng_msg_free(m);
+ }
+ }
+ So(isready(fd) == false);
+ });
+ });
+ });
+
+ Convey("REQ starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0);
+ So(isready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(isready(fd) == false);
+ });
+ });
+
+ Convey("REQ becomes readable", {
+ int fd;
+ nng_msg *msg;
+
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0);
+ So(isready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(req, msg, 0) == 0);
+ So(nng_recvmsg(rep, &msg, 0) == 0); // recv on rep
+ So(nng_sendmsg(rep, msg, 0) == 0); // echo it back
+ nng_msleep(200); // give time for message to arrive
+ So(isready(fd) == true);
+ Convey("And is no longer readable after receive", {
+ So(nng_recvmsg(req, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(isready(fd) == false);
+ });
+ });
+ });
+});
diff --git a/tests/reqrep.c b/tests/reqrep.c
index 97ed371a..76cf279c 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -175,5 +175,85 @@ TestMain("REQ/REP pattern", {
nng_msg_free(cmd);
});
+ Convey("Request cancellation aborts pending recv", {
+ nng_msg * abc;
+ nng_msg * def;
+ nng_msg * cmd;
+ nng_aio * aio;
+ nng_duration retry = 100; // 100 ms
+
+ nng_socket req;
+ nng_socket rep;
+
+ So(nng_rep_open(&rep) == 0);
+
+ So(nng_req_open(&req) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+
+ Reset({
+ nng_close(rep);
+ nng_close(req);
+ nng_aio_free(aio);
+ });
+
+ So(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0);
+ So(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0);
+
+ So(nng_msg_alloc(&abc, 0) == 0);
+ So(nng_msg_append(abc, "abc", 4) == 0);
+ So(nng_msg_alloc(&def, 0) == 0);
+ So(nng_msg_append(def, "def", 4) == 0);
+
+ So(nng_listen(rep, addr, NULL, 0) == 0);
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ // Send req #1 (abc).
+ So(nng_sendmsg(req, abc, 0) == 0);
+
+ // Sleep a bit. This is so that we ensure that our
+ // request gets to the far side. (If we cancel too
+ // fast, then our outgoing send will be canceled before
+ // it gets to the wire.)
+ nng_msleep(20);
+
+ nng_aio_set_timeout(aio, 1000); // an entire second
+ nng_recv_aio(req, aio);
+
+ // Give time for this recv to post properly.
+ nng_msleep(20);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to
+ // abc.
+ So(nng_sendmsg(req, def, 0) == 0);
+
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECANCELED);
+
+ // Receive the first request (should be abc) on the REP server.
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "abc") == 0);
+
+ // REP sends the reply to first command. This will be
+ // discarded by the REQ server.
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Now get the next command from the REP; should be "def".
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "def") == 0);
+
+ // And send it back to REQ.
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Try a req command. This should give back "def"
+ So(nng_recvmsg(req, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "def") == 0);
+ nng_msg_free(cmd);
+ });
+
nng_fini();
})