diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/CMakeLists.txt | 8 | ||||
| -rw-r--r-- | tests/reqctx.c | 258 | ||||
| -rw-r--r-- | tests/reqpoll.c | 148 | ||||
| -rw-r--r-- | tests/reqrep.c | 84 |
4 files changed, 493 insertions, 5 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index df7e5701..2177c4df 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -147,9 +147,9 @@ add_nng_test(scalability 20 ON) add_nng_test(sha1 5 NNG_SUPP_SHA1) add_nng_test(sock 5 ON) add_nng_test(synch 5 ON) -add_nng_test(tls 10 NNG_TRANSPORT_TLS) -add_nng_test(tcp 5 NNG_TRANSPORT_TCP) -add_nng_test(tcp6 5 NNG_TRANSPORT_TCP) +add_nng_test(tls 60 NNG_TRANSPORT_TLS) +add_nng_test(tcp 60 NNG_TRANSPORT_TCP) +add_nng_test(tcp6 60 NNG_TRANSPORT_TCP) add_nng_test(transport 5 ON) add_nng_test(udp 5 ON) add_nng_test(url 5 ON) @@ -162,6 +162,8 @@ add_nng_proto_test(bus 5 NNG_PROTO_BUS0 NNG_PROTO_BUS0) add_nng_test(pipeline 5 NNG_PROTO_PULL0 NNG_PROTO_PIPELINE0) add_nng_proto_test(pair1 5 NNG_PROTO_PAIR1 NNG_PROTO_PAIR1) add_nng_proto_test(pubsub 5 NNG_PROTO_PUB0 NNG_PROTO_SUB0) +add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) +add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) diff --git a/tests/reqctx.c b/tests/reqctx.c new file mode 100644 index 00000000..4aae2e24 --- /dev/null +++ b/tests/reqctx.c @@ -0,0 +1,258 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +#include <string.h> + +static struct { + nng_aio *aio; + enum { START, SEND, RECV } state; + nng_socket s; + nng_msg * msg; + int cnt; +} rep_state; + +void +rep_cb(void) +{ + int rv; + + if (rep_state.state == START) { + rep_state.state = RECV; + nng_recv_aio(rep_state.s, rep_state.aio); + return; + } + if ((rv = nng_aio_result(rep_state.aio)) != 0) { + if (rep_state.msg != NULL) { + nng_msg_free(rep_state.msg); + rep_state.msg = NULL; + } + return; + } + switch (rep_state.state) { + case START: + break; + case RECV: + rep_state.msg = nng_aio_get_msg(rep_state.aio); + rep_state.state = SEND; + nng_aio_set_msg(rep_state.aio, rep_state.msg); + nng_send_aio(rep_state.s, rep_state.aio); + break; + case SEND: + rep_state.msg = NULL; + rep_state.state = RECV; + nng_aio_set_msg(rep_state.aio, NULL); + nng_recv_aio(rep_state.s, rep_state.aio); + rep_state.cnt++; + break; + } +} + +#define NCTX 1000 + +void +markr(void *arg) +{ + *(bool *) arg = true; +} + +static void +marks(void *arg) +{ + *(bool *) arg = true; +} + +nng_ctx ctxs[NCTX]; +uint32_t recv_order[NCTX]; +nng_aio *saios[NCTX]; +nng_aio *raios[NCTX]; +bool recd[NCTX]; +bool sent[NCTX]; + +TestMain("REQ concurrent contexts", { + int rv; + const char *addr = "inproc://test"; + int i; + + memset(recv_order, 0, NCTX * sizeof(int)); + + atexit(nng_fini); + + Convey("We can use REQ contexts concurrently", { + nng_socket req; + + So(nng_aio_alloc(&rep_state.aio, (void *) rep_cb, NULL) == 0); + So(nng_rep_open(&rep_state.s) == 0); + So(nng_req_open(&req) == 0); + + for (i = 0; i < NCTX; i++) { + sent[i] = recd[i] = false; + recv_order[i] = (uint32_t) i; + if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) { + break; + } + nng_aio_set_timeout(raios[i], 5000); + if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) { + break; + } + nng_aio_set_timeout(saios[i], 5000); + } + + So(nng_setopt_int(rep_state.s, NNG_OPT_SENDBUF, NCTX) == 0); + So(i == NCTX); + for (i = 0; i < NCTX; i++) { + uint32_t tmp; + int ni = rand() % NCTX; // recv index + + tmp = recv_order[i]; + recv_order[i] = recv_order[ni]; + recv_order[ni] = tmp; + } + Reset({ + for (i = 0; i < NCTX; i++) { + nng_aio_free(saios[i]); + nng_aio_free(raios[i]); + } + nng_close(req); + nng_close(rep_state.s); + nng_aio_free(rep_state.aio); + }); + + So(nng_listen(rep_state.s, addr, NULL, 0) == 0); + So(nng_dial(req, addr, NULL, 0) == 0); + + nng_msleep(100); // let things establish. + + // Start the rep state machine going. + rep_cb(); + + for (i = 0; i < NCTX; i++) { + if ((rv = nng_ctx_open(&ctxs[i], req)) != 0) { + break; + } + } + So(rv == 0); + So(i == NCTX); + + // Send messages + for (i = 0; i < NCTX; i++) { + nng_msg *m; + if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) { + Fail("msg alloc failed: %s", nng_strerror(rv)); + } + if ((rv = nng_msg_append_u32(m, i)) != 0) { + Fail("append failed: %s", nng_strerror(rv)); + } + nng_aio_set_msg(saios[i], m); + nng_ctx_send(ctxs[i], saios[i]); + } + So(rv == 0); + So(i == NCTX); + + for (i = 0; i < NCTX; i++) { + nng_aio_wait(saios[i]); + if ((rv = nng_aio_result(saios[i])) != 0) { + Fail("send failed: %s", nng_strerror(rv)); + So(false); + break; + } + } + for (i = 0; i < NCTX; i++) { + if (!sent[i]) { + Fail("Index %d (%d) not sent", i, i); + } + } + + So(rv == 0); + So(i == NCTX); + // Receive answers + for (i = 0; i < NCTX; i++) { + int ri = recv_order[i]; + nng_ctx_recv(ctxs[ri], raios[ri]); + } + + for (i = 0; i < NCTX; i++) { + nng_msg *msg; + uint32_t x; + + nng_aio_wait(raios[i]); + if ((rv = nng_aio_result(raios[i])) != 0) { + Fail("recv %d (%d) %d failed: %s", i, + recv_order[i], rep_state.cnt, + nng_strerror(rv)); + continue; + } + msg = nng_aio_get_msg(raios[i]); + if ((rv = nng_msg_chop_u32(msg, &x)) != 0) { + Fail("recv msg trim: %s", nng_strerror(rv)); + break; + } + if (x != (uint32_t) i) { + Fail("message body mismatch: %x %x\n", x, + (uint32_t) i); + break; + } + + nng_msg_free(msg); + } + for (i = 0; i < NCTX; i++) { + if (!recd[i]) { + Fail("Index %d (%d) not received", i, + recv_order[i]); + break; + } + } + + So(rv == 0); + So(i == NCTX); + }); + + Convey("Given a socket and a context", { + nng_socket req; + nng_ctx ctx; + nng_aio * aio; + + So(nng_req0_open(&req) == 0); + So(nng_ctx_open(&ctx, req) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + nng_aio_set_timeout(aio, 1000); + + Reset({ nng_aio_free(aio); }); + + Convey("Closing the socket aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_close(req); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_msg_free(msg); + }); + + Convey("Closing the context aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_ctx_close(ctx); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_msg_free(msg); + nng_close(req); + }); + }); +}); diff --git a/tests/reqpoll.c b/tests/reqpoll.c new file mode 100644 index 00000000..64c8df66 --- /dev/null +++ b/tests/reqpoll.c @@ -0,0 +1,148 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef _WIN32 +#include <poll.h> +#include <unistd.h> +#define SOCKET int +#else + +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif + +#include <windows.h> +#include <winsock2.h> + +#include <mswsock.h> +#endif + +#include "convey.h" +#include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +bool +isready(SOCKET fd) +{ + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLRDNORM; + pfd.revents = 0; + + switch (poll(&pfd, 1, 0)) { + case 0: + return (false); + case 1: + return (true); + default: + printf("BAD POLL RETURN!\n"); + abort(); + } +} + +TestMain("REQ pollable", { + + atexit(nng_fini); + + Convey("Given a connected REQ/REP pair", { + nng_socket req; + nng_socket rep; + nng_ctx ctx; + + So(nng_req0_open(&req) == 0); + So(nng_rep0_open(&rep) == 0); + So(nng_ctx_open(&ctx, req) == 0); + + Reset({ + nng_ctx_close(ctx); + nng_close(req); + nng_close(rep); + }); + So(nng_listen(rep, "inproc://ctx1", NULL, 0) == 0); + + Convey("REQ ctx not pollable", { + int fd; + So(nng_ctx_open(&ctx, req) == 0); + Reset({ nng_ctx_close(req); }); + So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == + NNG_ENOTSUP); + So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == + NNG_ENOTSUP); + }); + + Convey("REQ starts not writable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And becomes readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == true); + + Convey("Not writable with message pending", { + for (int i = 0; i < 10; i++) { + nng_msg *m; + So(nng_msg_alloc(&m, 0) == 0); + // Fill intermediate queues. + if (nng_sendmsg(req, m, + NNG_FLAG_NONBLOCK) != + 0) { + nng_msg_free(m); + } + } + So(isready(fd) == false); + }); + }); + }); + + Convey("REQ starts not readable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And doesn't become readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == false); + }); + }); + + Convey("REQ becomes readable", { + int fd; + nng_msg *msg; + + So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0); + + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + So(nng_msg_append(msg, "xyz", 3) == 0); + So(nng_sendmsg(req, msg, 0) == 0); + So(nng_recvmsg(rep, &msg, 0) == 0); // recv on rep + So(nng_sendmsg(rep, msg, 0) == 0); // echo it back + nng_msleep(200); // give time for message to arrive + So(isready(fd) == true); + Convey("And is no longer readable after receive", { + So(nng_recvmsg(req, &msg, 0) == 0); + nng_msg_free(msg); + So(isready(fd) == false); + }); + }); + }); +}); diff --git a/tests/reqrep.c b/tests/reqrep.c index 97ed371a..76cf279c 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -175,5 +175,85 @@ TestMain("REQ/REP pattern", { nng_msg_free(cmd); }); + Convey("Request cancellation aborts pending recv", { + nng_msg * abc; + nng_msg * def; + nng_msg * cmd; + nng_aio * aio; + nng_duration retry = 100; // 100 ms + + nng_socket req; + nng_socket rep; + + So(nng_rep_open(&rep) == 0); + + So(nng_req_open(&req) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + + Reset({ + nng_close(rep); + nng_close(req); + nng_aio_free(aio); + }); + + So(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0); + So(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0); + + So(nng_msg_alloc(&abc, 0) == 0); + So(nng_msg_append(abc, "abc", 4) == 0); + So(nng_msg_alloc(&def, 0) == 0); + So(nng_msg_append(def, "def", 4) == 0); + + So(nng_listen(rep, addr, NULL, 0) == 0); + So(nng_dial(req, addr, NULL, 0) == 0); + + // Send req #1 (abc). + So(nng_sendmsg(req, abc, 0) == 0); + + // Sleep a bit. This is so that we ensure that our + // request gets to the far side. (If we cancel too + // fast, then our outgoing send will be canceled before + // it gets to the wire.) + nng_msleep(20); + + nng_aio_set_timeout(aio, 1000); // an entire second + nng_recv_aio(req, aio); + + // Give time for this recv to post properly. + nng_msleep(20); + + // Send the next next request ("def"). Note that + // the REP side server will have already buffered the receive + // request, and should simply be waiting for us to reply to + // abc. + So(nng_sendmsg(req, def, 0) == 0); + + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECANCELED); + + // Receive the first request (should be abc) on the REP server. + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "abc") == 0); + + // REP sends the reply to first command. This will be + // discarded by the REQ server. + So(nng_sendmsg(rep, cmd, 0) == 0); + + // Now get the next command from the REP; should be "def". + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "def") == 0); + + // And send it back to REQ. + So(nng_sendmsg(rep, cmd, 0) == 0); + + // Try a req command. This should give back "def" + So(nng_recvmsg(req, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "def") == 0); + nng_msg_free(cmd); + }); + nng_fini(); }) |
