aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-04 13:36:54 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-10 15:40:00 -0700
commit5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch)
tree39debf4ecde234b2a0be19c9cb15628cc32c2edb /tests
parent56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff)
downloadnng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets. More discussion around this is in the issue itself. Optionally we would like to extend this to the surveyor pattern. Note that we specifically do not support pollable descriptors for non-default contexts, and the results of using file descriptors for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined. In the future, it might be nice to figure out how to factor in optional use of a message queue for users who want more buffering, but we think there is little need for this with cooked mode.
Diffstat (limited to 'tests')
-rw-r--r--tests/CMakeLists.txt8
-rw-r--r--tests/reqctx.c258
-rw-r--r--tests/reqpoll.c148
-rw-r--r--tests/reqrep.c84
4 files changed, 493 insertions, 5 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index df7e5701..2177c4df 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -147,9 +147,9 @@ add_nng_test(scalability 20 ON)
add_nng_test(sha1 5 NNG_SUPP_SHA1)
add_nng_test(sock 5 ON)
add_nng_test(synch 5 ON)
-add_nng_test(tls 10 NNG_TRANSPORT_TLS)
-add_nng_test(tcp 5 NNG_TRANSPORT_TCP)
-add_nng_test(tcp6 5 NNG_TRANSPORT_TCP)
+add_nng_test(tls 60 NNG_TRANSPORT_TLS)
+add_nng_test(tcp 60 NNG_TRANSPORT_TCP)
+add_nng_test(tcp6 60 NNG_TRANSPORT_TCP)
add_nng_test(transport 5 ON)
add_nng_test(udp 5 ON)
add_nng_test(url 5 ON)
@@ -162,6 +162,8 @@ add_nng_proto_test(bus 5 NNG_PROTO_BUS0 NNG_PROTO_BUS0)
add_nng_test(pipeline 5 NNG_PROTO_PULL0 NNG_PROTO_PIPELINE0)
add_nng_proto_test(pair1 5 NNG_PROTO_PAIR1 NNG_PROTO_PAIR1)
add_nng_proto_test(pubsub 5 NNG_PROTO_PUB0 NNG_PROTO_SUB0)
+add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
+add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0)
add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0)
diff --git a/tests/reqctx.c b/tests/reqctx.c
new file mode 100644
index 00000000..4aae2e24
--- /dev/null
+++ b/tests/reqctx.c
@@ -0,0 +1,258 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+#include <string.h>
+
+static struct {
+ nng_aio *aio;
+ enum { START, SEND, RECV } state;
+ nng_socket s;
+ nng_msg * msg;
+ int cnt;
+} rep_state;
+
+void
+rep_cb(void)
+{
+ int rv;
+
+ if (rep_state.state == START) {
+ rep_state.state = RECV;
+ nng_recv_aio(rep_state.s, rep_state.aio);
+ return;
+ }
+ if ((rv = nng_aio_result(rep_state.aio)) != 0) {
+ if (rep_state.msg != NULL) {
+ nng_msg_free(rep_state.msg);
+ rep_state.msg = NULL;
+ }
+ return;
+ }
+ switch (rep_state.state) {
+ case START:
+ break;
+ case RECV:
+ rep_state.msg = nng_aio_get_msg(rep_state.aio);
+ rep_state.state = SEND;
+ nng_aio_set_msg(rep_state.aio, rep_state.msg);
+ nng_send_aio(rep_state.s, rep_state.aio);
+ break;
+ case SEND:
+ rep_state.msg = NULL;
+ rep_state.state = RECV;
+ nng_aio_set_msg(rep_state.aio, NULL);
+ nng_recv_aio(rep_state.s, rep_state.aio);
+ rep_state.cnt++;
+ break;
+ }
+}
+
+#define NCTX 1000
+
+void
+markr(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+static void
+marks(void *arg)
+{
+ *(bool *) arg = true;
+}
+
+nng_ctx ctxs[NCTX];
+uint32_t recv_order[NCTX];
+nng_aio *saios[NCTX];
+nng_aio *raios[NCTX];
+bool recd[NCTX];
+bool sent[NCTX];
+
+TestMain("REQ concurrent contexts", {
+ int rv;
+ const char *addr = "inproc://test";
+ int i;
+
+ memset(recv_order, 0, NCTX * sizeof(int));
+
+ atexit(nng_fini);
+
+ Convey("We can use REQ contexts concurrently", {
+ nng_socket req;
+
+ So(nng_aio_alloc(&rep_state.aio, (void *) rep_cb, NULL) == 0);
+ So(nng_rep_open(&rep_state.s) == 0);
+ So(nng_req_open(&req) == 0);
+
+ for (i = 0; i < NCTX; i++) {
+ sent[i] = recd[i] = false;
+ recv_order[i] = (uint32_t) i;
+ if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(raios[i], 5000);
+ if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) {
+ break;
+ }
+ nng_aio_set_timeout(saios[i], 5000);
+ }
+
+ So(nng_setopt_int(rep_state.s, NNG_OPT_SENDBUF, NCTX) == 0);
+ So(i == NCTX);
+ for (i = 0; i < NCTX; i++) {
+ uint32_t tmp;
+ int ni = rand() % NCTX; // recv index
+
+ tmp = recv_order[i];
+ recv_order[i] = recv_order[ni];
+ recv_order[ni] = tmp;
+ }
+ Reset({
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_free(saios[i]);
+ nng_aio_free(raios[i]);
+ }
+ nng_close(req);
+ nng_close(rep_state.s);
+ nng_aio_free(rep_state.aio);
+ });
+
+ So(nng_listen(rep_state.s, addr, NULL, 0) == 0);
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ nng_msleep(100); // let things establish.
+
+ // Start the rep state machine going.
+ rep_cb();
+
+ for (i = 0; i < NCTX; i++) {
+ if ((rv = nng_ctx_open(&ctxs[i], req)) != 0) {
+ break;
+ }
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ // Send messages
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *m;
+ if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) {
+ Fail("msg alloc failed: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_msg_append_u32(m, i)) != 0) {
+ Fail("append failed: %s", nng_strerror(rv));
+ }
+ nng_aio_set_msg(saios[i], m);
+ nng_ctx_send(ctxs[i], saios[i]);
+ }
+ So(rv == 0);
+ So(i == NCTX);
+
+ for (i = 0; i < NCTX; i++) {
+ nng_aio_wait(saios[i]);
+ if ((rv = nng_aio_result(saios[i])) != 0) {
+ Fail("send failed: %s", nng_strerror(rv));
+ So(false);
+ break;
+ }
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!sent[i]) {
+ Fail("Index %d (%d) not sent", i, i);
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ // Receive answers
+ for (i = 0; i < NCTX; i++) {
+ int ri = recv_order[i];
+ nng_ctx_recv(ctxs[ri], raios[ri]);
+ }
+
+ for (i = 0; i < NCTX; i++) {
+ nng_msg *msg;
+ uint32_t x;
+
+ nng_aio_wait(raios[i]);
+ if ((rv = nng_aio_result(raios[i])) != 0) {
+ Fail("recv %d (%d) %d failed: %s", i,
+ recv_order[i], rep_state.cnt,
+ nng_strerror(rv));
+ continue;
+ }
+ msg = nng_aio_get_msg(raios[i]);
+ if ((rv = nng_msg_chop_u32(msg, &x)) != 0) {
+ Fail("recv msg trim: %s", nng_strerror(rv));
+ break;
+ }
+ if (x != (uint32_t) i) {
+ Fail("message body mismatch: %x %x\n", x,
+ (uint32_t) i);
+ break;
+ }
+
+ nng_msg_free(msg);
+ }
+ for (i = 0; i < NCTX; i++) {
+ if (!recd[i]) {
+ Fail("Index %d (%d) not received", i,
+ recv_order[i]);
+ break;
+ }
+ }
+
+ So(rv == 0);
+ So(i == NCTX);
+ });
+
+ Convey("Given a socket and a context", {
+ nng_socket req;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ So(nng_req0_open(&req) == 0);
+ So(nng_ctx_open(&ctx, req) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ nng_aio_set_timeout(aio, 1000);
+
+ Reset({ nng_aio_free(aio); });
+
+ Convey("Closing the socket aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_close(req);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_msg_free(msg);
+ });
+
+ Convey("Closing the context aborts a context send", {
+ nng_msg *msg;
+ So(nng_msg_alloc(&msg, 0) == 0);
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_ctx_close(ctx);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECLOSED);
+ nng_msg_free(msg);
+ nng_close(req);
+ });
+ });
+});
diff --git a/tests/reqpoll.c b/tests/reqpoll.c
new file mode 100644
index 00000000..64c8df66
--- /dev/null
+++ b/tests/reqpoll.c
@@ -0,0 +1,148 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef _WIN32
+#include <poll.h>
+#include <unistd.h>
+#define SOCKET int
+#else
+
+#define poll WSAPoll
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+#include <winsock2.h>
+
+#include <mswsock.h>
+#endif
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
+#include "supplemental/util/platform.h"
+
+bool
+isready(SOCKET fd)
+{
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLRDNORM;
+ pfd.revents = 0;
+
+ switch (poll(&pfd, 1, 0)) {
+ case 0:
+ return (false);
+ case 1:
+ return (true);
+ default:
+ printf("BAD POLL RETURN!\n");
+ abort();
+ }
+}
+
+TestMain("REQ pollable", {
+
+ atexit(nng_fini);
+
+ Convey("Given a connected REQ/REP pair", {
+ nng_socket req;
+ nng_socket rep;
+ nng_ctx ctx;
+
+ So(nng_req0_open(&req) == 0);
+ So(nng_rep0_open(&rep) == 0);
+ So(nng_ctx_open(&ctx, req) == 0);
+
+ Reset({
+ nng_ctx_close(ctx);
+ nng_close(req);
+ nng_close(rep);
+ });
+ So(nng_listen(rep, "inproc://ctx1", NULL, 0) == 0);
+
+ Convey("REQ ctx not pollable", {
+ int fd;
+ So(nng_ctx_open(&ctx, req) == 0);
+ Reset({ nng_ctx_close(req); });
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) ==
+ NNG_ENOTSUP);
+ So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("REQ starts not writable", {
+ int fd;
+
+ So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0);
+ So(isready(fd) == false);
+
+ Convey("And becomes readable on connect", {
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(isready(fd) == true);
+
+ Convey("Not writable with message pending", {
+ for (int i = 0; i < 10; i++) {
+ nng_msg *m;
+ So(nng_msg_alloc(&m, 0) == 0);
+ // Fill intermediate queues.
+ if (nng_sendmsg(req, m,
+ NNG_FLAG_NONBLOCK) !=
+ 0) {
+ nng_msg_free(m);
+ }
+ }
+ So(isready(fd) == false);
+ });
+ });
+ });
+
+ Convey("REQ starts not readable", {
+ int fd;
+
+ So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0);
+ So(isready(fd) == false);
+
+ Convey("And doesn't become readable on connect", {
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) ==
+ 0);
+ nng_msleep(100);
+ So(isready(fd) == false);
+ });
+ });
+
+ Convey("REQ becomes readable", {
+ int fd;
+ nng_msg *msg;
+
+ So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0);
+ So(isready(fd) == false);
+ So(nng_msg_append(msg, "xyz", 3) == 0);
+ So(nng_sendmsg(req, msg, 0) == 0);
+ So(nng_recvmsg(rep, &msg, 0) == 0); // recv on rep
+ So(nng_sendmsg(rep, msg, 0) == 0); // echo it back
+ nng_msleep(200); // give time for message to arrive
+ So(isready(fd) == true);
+ Convey("And is no longer readable after receive", {
+ So(nng_recvmsg(req, &msg, 0) == 0);
+ nng_msg_free(msg);
+ So(isready(fd) == false);
+ });
+ });
+ });
+});
diff --git a/tests/reqrep.c b/tests/reqrep.c
index 97ed371a..76cf279c 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -175,5 +175,85 @@ TestMain("REQ/REP pattern", {
nng_msg_free(cmd);
});
+ Convey("Request cancellation aborts pending recv", {
+ nng_msg * abc;
+ nng_msg * def;
+ nng_msg * cmd;
+ nng_aio * aio;
+ nng_duration retry = 100; // 100 ms
+
+ nng_socket req;
+ nng_socket rep;
+
+ So(nng_rep_open(&rep) == 0);
+
+ So(nng_req_open(&req) == 0);
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+
+ Reset({
+ nng_close(rep);
+ nng_close(req);
+ nng_aio_free(aio);
+ });
+
+ So(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0);
+ So(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0);
+
+ So(nng_msg_alloc(&abc, 0) == 0);
+ So(nng_msg_append(abc, "abc", 4) == 0);
+ So(nng_msg_alloc(&def, 0) == 0);
+ So(nng_msg_append(def, "def", 4) == 0);
+
+ So(nng_listen(rep, addr, NULL, 0) == 0);
+ So(nng_dial(req, addr, NULL, 0) == 0);
+
+ // Send req #1 (abc).
+ So(nng_sendmsg(req, abc, 0) == 0);
+
+ // Sleep a bit. This is so that we ensure that our
+ // request gets to the far side. (If we cancel too
+ // fast, then our outgoing send will be canceled before
+ // it gets to the wire.)
+ nng_msleep(20);
+
+ nng_aio_set_timeout(aio, 1000); // an entire second
+ nng_recv_aio(req, aio);
+
+ // Give time for this recv to post properly.
+ nng_msleep(20);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to
+ // abc.
+ So(nng_sendmsg(req, def, 0) == 0);
+
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == NNG_ECANCELED);
+
+ // Receive the first request (should be abc) on the REP server.
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "abc") == 0);
+
+ // REP sends the reply to first command. This will be
+ // discarded by the REQ server.
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Now get the next command from the REP; should be "def".
+ So(nng_recvmsg(rep, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "def") == 0);
+
+ // And send it back to REQ.
+ So(nng_sendmsg(rep, cmd, 0) == 0);
+
+ // Try a req command. This should give back "def"
+ So(nng_recvmsg(req, &cmd, 0) == 0);
+ So(nng_msg_len(cmd) == 4);
+ So(strcmp(nng_msg_body(cmd), "def") == 0);
+ nng_msg_free(cmd);
+ });
+
nng_fini();
})