diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | tests/compat_surveyttl.c | 144 | ||||
| -rw-r--r-- | tests/reqpoll.c | 6 | ||||
| -rw-r--r-- | tests/respondpoll.c | 109 | ||||
| -rw-r--r-- | tests/stubs.h | 31 | ||||
| -rw-r--r-- | tests/survey.c | 217 | ||||
| -rw-r--r-- | tests/surveyctx.c | 298 | ||||
| -rw-r--r-- | tests/surveypoll.c | 126 |
8 files changed, 932 insertions, 3 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 08806679..af570daa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -166,7 +166,10 @@ add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_proto_test(reqstress 60 NNG_PROTO_REQ0 NNG_PROTO_REP0) +add_nng_proto_test(respondpoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) +add_nng_proto_test(surveyctx 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) +add_nng_proto_test(surveypoll 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) # compatbility tests # We only support these if ALL the legacy protocols are supported. This @@ -186,6 +189,7 @@ add_nng_compat_test(compat_reqrep 5) add_nng_compat_test(compat_survey 5) add_nng_compat_test(compat_reqttl 5) add_nng_compat_test(compat_shutdown 5) +add_nng_compat_test(compat_surveyttl 5) add_nng_compat_test(compat_poll 5) # These are special tests for compat mode, not inherited from the diff --git a/tests/compat_surveyttl.c b/tests/compat_surveyttl.c new file mode 100644 index 00000000..1c6f66be --- /dev/null +++ b/tests/compat_surveyttl.c @@ -0,0 +1,144 @@ +/* + Copyright (c) 2012 Martin Sustrik All rights reserved. + Copyright (c) 2013 GoPivotal, Inc. All rights reserved. + Copyright 2016 Garrett D'Amore <garrett@damore.org> + Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include <nanomsg/nn.h> + +#include "compat_testutil.h" + +static char socket_address_a[128]; +static char socket_address_b[128]; +int dev0; +int dev1; + +void device (NN_UNUSED void *arg) +{ + int rc; + + /* Run the device. */ + rc = nn_device (dev0, dev1); + nn_assert (rc < 0 && nn_errno () == EBADF); + + /* Clean up. */ + test_close (dev0); + test_close (dev1); +} + +int main (int argc, const char *argv[]) +{ + int end0; + int end1; + struct nn_thread thread1; + int timeo; + int maxttl; + size_t sz; + int rc; + + int port = get_test_port(argc, argv); + + test_addr_from(socket_address_a, "tcp", "127.0.0.1", port); + test_addr_from(socket_address_b, "tcp", "127.0.0.1", port + 1); + + /* Intialise the device sockets. */ + dev0 = test_socket (AF_SP_RAW, NN_RESPONDENT); + dev1 = test_socket (AF_SP_RAW, NN_SURVEYOR); + + test_bind (dev0, socket_address_a); + test_bind (dev1, socket_address_b); + + /* Start the device. */ + nn_thread_init (&thread1, device, NULL); + + end0 = test_socket (AF_SP, NN_SURVEYOR); + end1 = test_socket (AF_SP, NN_RESPONDENT); + + /* Test the bi-directional device TTL */ + test_connect (end0, socket_address_a); + test_connect (end1, socket_address_b); + + /* Wait for TCP to establish. */ + nn_sleep (100); + + /* Set up max receive timeout. */ + timeo = 100; + test_setsockopt (end0, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo)); + timeo = 100; + test_setsockopt (end1, NN_SOL_SOCKET, NN_RCVTIMEO, &timeo, sizeof (timeo)); + + /* Test default TTL is 8. */ + sz = sizeof (maxttl); + maxttl = -1; + rc = nn_getsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, &sz); + nn_assert (rc == 0); + nn_assert (sz == sizeof (maxttl)); + nn_assert (maxttl == 8); + + /* Test to make sure option TTL cannot be set below 1. */ + maxttl = -1; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == -1); + maxttl = 0; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == 0); + + /* Test to set non-integer size */ + maxttl = 8; + rc = nn_setsockopt(end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, 1); + nn_assert (rc < 0 && nn_errno () == EINVAL); + nn_assert (maxttl == 8); + + /* Pass a message between endpoints. */ + test_send (end0, "SURVEY"); + test_recv (end1, "SURVEY"); + + /* Now send a reply. */ + test_send (end1, "REPLYXYZ"); + test_recv (end0, "REPLYXYZ"); + + /* Now set the max TTL. */ + maxttl = 1; + test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + + test_send (end0, "DROPTHIS"); + test_drop (end1, ETIMEDOUT); + + maxttl = 2; + test_setsockopt (end0, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + test_setsockopt (end1, NN_SOL_SOCKET, NN_MAXTTL, &maxttl, sizeof (maxttl)); + test_send (end0, "DONTDROP"); + test_recv (end1, "DONTDROP"); + + /* Clean up. */ + test_close (end0); + test_close (end1); + + /* Shut down the devices. */ + nn_term (); + nn_thread_term (&thread1); + + return 0; +} diff --git a/tests/reqpoll.c b/tests/reqpoll.c index 64c8df66..aeee7d0b 100644 --- a/tests/reqpoll.c +++ b/tests/reqpoll.c @@ -55,7 +55,7 @@ TestMain("REQ pollable", { atexit(nng_fini); - Convey("Given a connected REQ/REP pair", { + Convey("Given a REQ/REP pair", { nng_socket req; nng_socket rep; nng_ctx ctx; @@ -74,7 +74,7 @@ TestMain("REQ pollable", { Convey("REQ ctx not pollable", { int fd; So(nng_ctx_open(&ctx, req) == 0); - Reset({ nng_ctx_close(req); }); + Reset({ nng_ctx_close(ctx); }); So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == NNG_ENOTSUP); So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == @@ -87,7 +87,7 @@ TestMain("REQ pollable", { So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0); So(isready(fd) == false); - Convey("And becomes readable on connect", { + Convey("And becomes writable on connect", { So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0); nng_msleep(100); diff --git a/tests/respondpoll.c b/tests/respondpoll.c new file mode 100644 index 00000000..2e24b5b2 --- /dev/null +++ b/tests/respondpoll.c @@ -0,0 +1,109 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +TestMain("Respondent pollable", { + + atexit(nng_fini); + + Convey("Given a connected survey pair", { + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + + So(nng_surveyor0_open(&surv) == 0); + So(nng_respondent0_open(&resp) == 0); + So(nng_ctx_open(&ctx, resp) == 0); + + So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0); + So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0); + + Reset({ + nng_ctx_close(ctx); + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0); + + Convey("Respondent ctx not pollable", { + int fd; + + So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == + NNG_ENOTSUP); + So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == + NNG_ENOTSUP); + }); + + Convey("Respondent starts not writable", { + int fd; + + So(nng_getopt_int(resp, NNG_OPT_SENDFD, &fd) == 0); + So(fdready(fd) == false); + + Convey("And remains unwritable on connect", { + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(fdready(fd) == false); + + Convey("Becomes writable after recv", { + nng_msg *m; + So(nng_msg_alloc(&m, 0) == 0); + So(nng_sendmsg(surv, m, 0) == 0); + So(nng_recvmsg(resp, &m, 0) == 0); + nng_msg_free(m); + So(fdready(fd) == true); + }); + }); + }); + + Convey("Respondent starts not readable", { + int fd; + + So(nng_getopt_int(resp, NNG_OPT_RECVFD, &fd) == 0); + So(fdready(fd) == false); + + Convey("And doesn't become readable on connect", { + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(fdready(fd) == false); + }); + + Convey("And becomes readable on data", { + nng_msg *msg; + + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(200); + + So(nng_msg_alloc(&msg, 0) == 0); + So(fdready(fd) == false); + So(nng_msg_append(msg, "xyz", 3) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + nng_msleep(300); // give time for msg to arrive + So(fdready(fd) == true); + Convey("Is no longer readable after recv", { + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_msg_free(msg); + So(fdready(fd) == false); + }); + }); + }); + }); +}); diff --git a/tests/stubs.h b/tests/stubs.h index da3a6d5f..c34bed7a 100644 --- a/tests/stubs.h +++ b/tests/stubs.h @@ -12,11 +12,23 @@ #define STUBS_H #ifdef _WIN32 + +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif + #include <windows.h> +#include <winsock2.h> +// order counts +#include <mswsock.h> +#define PLATFD SOCKET +#define poll WSAPoll #else +#include <poll.h> #include <stdint.h> #include <sys/time.h> #include <time.h> +#define PLATFD int #endif // Stub handlers for some common things. @@ -46,6 +58,25 @@ getms(void) #endif } +bool +fdready(int fd) +{ + struct pollfd pfd; + pfd.fd = (PLATFD) fd; + pfd.events = POLLRDNORM; + pfd.revents = 0; + + switch (poll(&pfd, 1, 0)) { + case 0: + return (false); + case 1: + return (true); + default: + ConveyError("BAD POLL RETURN!"); + return (false); + } +} + int nosocket(nng_socket *s) { diff --git a/tests/survey.c b/tests/survey.c index a3a7ba1d..ed73b3dd 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -14,6 +14,7 @@ #include "protocol/survey0/respond.h" #include "protocol/survey0/survey.h" #include "stubs.h" +#include "supplemental/util/platform.h" #include <string.h> @@ -114,5 +115,221 @@ TestMain("SURVEY pattern", { So(nng_recvmsg(surv, &msg, 0) == NNG_ESTATE); }); }); + + Convey("Second send cancels pending recv", { + nng_msg *msg; + nng_aio *aio; + + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "one"); + So(nng_sendmsg(surv, msg, 0) == 0); + msg = NULL; + nng_recv_aio(surv, aio); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "two"); + So(nng_sendmsg(surv, msg, 0) == 0); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECANCELED); + }); + + Convey("Sending a NULL message does not panic", { + nng_aio *aio; + + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + Reset({ nng_aio_free(aio); }); + So(nng_sendmsg(surv, NULL, 0) == NNG_EINVAL); + nng_send_aio(surv, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_EINVAL); + }); + + Convey("Disconnecting before getting response", { + nng_msg *msg; + + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_close(surv); + nng_msleep(100); + So(nng_sendmsg(resp, msg, 0) == 0); + }); + }); + + Convey("Bad backtrace survey is ignored", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + nng_msleep(100); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_msg_header_append_u32(msg, 1) == + 0); // high order bit not set! + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Bad backtrace survey is ignored (raw)", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + nng_msleep(100); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_msg_header_append_u32(msg, 1) == + 0); // high order bit not set! + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Missing backtrace survey is ignored", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + nng_msleep(100); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Missing backtrace survey is ignored (raw)", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + nng_msleep(100); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Bad backtrace response is ignored", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0); + nng_msleep(100); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_msg_header_clear(msg); + nng_msg_header_append_u32(msg, 1); + So(nng_sendmsg(resp, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); }); + + Convey("Bad backtrace response is ignored (raw)", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0); + nng_msleep(100); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_msg_header_append_u32(msg, 0x80000000) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_msg_header_clear(msg); + nng_msg_header_append_u32(msg, 1); + So(nng_sendmsg(resp, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Missing backtrace response is ignored", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0); + nng_msleep(100); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_msg_header_clear(msg); + So(nng_sendmsg(resp, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + }); + + Convey("Missing backtrace response is ignored (raw)", { + nng_socket surv; + nng_socket resp; + nng_msg * msg; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_respondent0_open_raw(&resp) == 0); + Reset({ + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://badsurvback", NULL, 0) == 0); + So(nng_dial(surv, "inproc://badsurvback", NULL, 0) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 200) == 0); + nng_msleep(100); + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_msg_header_append_u32(msg, 0x80000000) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == 0); + nng_msg_header_clear(msg); + So(nng_sendmsg(resp, msg, 0) == 0); + So(nng_recvmsg(surv, &msg, 0) == NNG_ETIMEDOUT); + }); + }); diff --git a/tests/surveyctx.c b/tests/surveyctx.c new file mode 100644 index 00000000..9ab2de40 --- /dev/null +++ b/tests/surveyctx.c @@ -0,0 +1,298 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +#include <string.h> + +static struct { + nng_aio *aio; + enum { START, SEND, RECV } state; + nng_socket s; + nng_msg * msg; + int cnt; +} resp_state; + +void +resp_cb(void) +{ + int rv; + + if (resp_state.state == START) { + resp_state.state = RECV; + nng_recv_aio(resp_state.s, resp_state.aio); + return; + } + if ((rv = nng_aio_result(resp_state.aio)) != 0) { + if (resp_state.msg != NULL) { + nng_msg_free(resp_state.msg); + resp_state.msg = NULL; + } + return; + } + switch (resp_state.state) { + case START: + break; + case RECV: + resp_state.msg = nng_aio_get_msg(resp_state.aio); + resp_state.state = SEND; + nng_aio_set_msg(resp_state.aio, resp_state.msg); + nng_send_aio(resp_state.s, resp_state.aio); + break; + case SEND: + resp_state.msg = NULL; + resp_state.state = RECV; + nng_aio_set_msg(resp_state.aio, NULL); + nng_recv_aio(resp_state.s, resp_state.aio); + resp_state.cnt++; + break; + } +} + +#define NCTX 10 + +void +markr(void *arg) +{ + *(bool *) arg = true; +} + +static void +marks(void *arg) +{ + *(bool *) arg = true; +} + +nng_ctx ctxs[NCTX]; +uint32_t recv_order[NCTX]; +nng_aio *saios[NCTX]; +nng_aio *raios[NCTX]; +bool recd[NCTX]; +bool sent[NCTX]; + +TestMain("Surveyor concurrent contexts", { + int rv; + const char *addr = "inproc://test"; + int i; + + memset(recv_order, 0, NCTX * sizeof(int)); + + atexit(nng_fini); + + Convey("We can use Surveyor contexts concurrently", { + nng_socket surv; + + So(nng_aio_alloc(&resp_state.aio, (void *) resp_cb, NULL) == + 0); + So(nng_respondent0_open(&resp_state.s) == 0); + So(nng_surveyor0_open(&surv) == 0); + + for (i = 0; i < NCTX; i++) { + sent[i] = recd[i] = false; + recv_order[i] = (uint32_t) i; + if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) { + break; + } + nng_aio_set_timeout(raios[i], 5000); + if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) { + break; + } + nng_aio_set_timeout(saios[i], 5000); + } + + // So(nng_setopt_int(resp_state.s, NNG_OPT_SENDBUF, NCTX) == + // 0); + So(i == NCTX); + for (i = 0; i < NCTX; i++) { + uint32_t tmp; + int ni = rand() % NCTX; // recv index + + tmp = recv_order[i]; + recv_order[i] = recv_order[ni]; + recv_order[ni] = tmp; + } + Reset({ + for (i = 0; i < NCTX; i++) { + nng_aio_free(saios[i]); + nng_aio_free(raios[i]); + } + nng_close(surv); + nng_close(resp_state.s); + nng_aio_free(resp_state.aio); + }); + + So(nng_listen(resp_state.s, addr, NULL, 0) == 0); + So(nng_dial(surv, addr, NULL, 0) == 0); + + nng_msleep(100); // let things establish. + + // Start the rep state machine going. + resp_cb(); + + for (i = 0; i < NCTX; i++) { + if ((rv = nng_ctx_open(&ctxs[i], surv)) != 0) { + break; + } + } + So(rv == 0); + So(i == NCTX); + + // Send messages + for (i = 0; i < NCTX; i++) { + nng_msg *m; + if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) { + Fail("msg alloc failed: %s", nng_strerror(rv)); + } + if ((rv = nng_msg_append_u32(m, i)) != 0) { + Fail("append failed: %s", nng_strerror(rv)); + } + nng_aio_set_msg(saios[i], m); + nng_ctx_send(ctxs[i], saios[i]); + } + So(rv == 0); + So(i == NCTX); + + for (i = 0; i < NCTX; i++) { + nng_aio_wait(saios[i]); + if ((rv = nng_aio_result(saios[i])) != 0) { + Fail("send failed: %s", nng_strerror(rv)); + So(false); + break; + } + } + for (i = 0; i < NCTX; i++) { + if (!sent[i]) { + Fail("Index %d (%d) not sent", i, i); + } + } + + So(rv == 0); + So(i == NCTX); + // Receive answers + for (i = 0; i < NCTX; i++) { + int ri = recv_order[i]; + nng_ctx_recv(ctxs[ri], raios[ri]); + } + + for (i = 0; i < NCTX; i++) { + nng_msg *msg; + uint32_t x; + + nng_aio_wait(raios[i]); + if ((rv = nng_aio_result(raios[i])) != 0) { + Fail("recv %d (%d) %d failed: %s", i, + recv_order[i], resp_state.cnt, + nng_strerror(rv)); + continue; + } + msg = nng_aio_get_msg(raios[i]); + if ((rv = nng_msg_chop_u32(msg, &x)) != 0) { + Fail("recv msg trim: %s", nng_strerror(rv)); + break; + } + if (x != (uint32_t) i) { + Fail("message body mismatch: %x %x\n", x, + (uint32_t) i); + break; + } + + nng_msg_free(msg); + } + for (i = 0; i < NCTX; i++) { + if (!recd[i]) { + Fail("Index %d (%d) not received", i, + recv_order[i]); + break; + } + } + + So(rv == 0); + So(i == NCTX); + }); + + Convey("Given a socket and a context", { + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + + So(nng_surveyor0_open(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + nng_aio_set_timeout(aio, 1000); + + Reset({ nng_aio_free(aio); }); + + Convey("Recv on the context is ESTATE", { + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ESTATE); + }); + + Convey("Closing the socket aborts a context recv", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_ctx_recv(ctx, aio); + nng_close(surv); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + }); + + Convey("Sending a null message fails", { + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_EINVAL); + }); + + Convey("Closing the context aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_recv_aio(ctx, aio); + nng_ctx_close(ctx); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_close(surv); + }); + + Convey("We can set separate survey times", { + nng_duration ms; + So(nng_setopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, 100) == 0); + So(nng_ctx_setopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, 200) == 0); + So(nng_getopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 100); + So(nng_ctx_getopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 200); + }); + }); + + Convey("Raw mode does not support contexts", { + nng_socket surv; + nng_ctx ctx; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == NNG_ENOTSUP); + nng_close(surv); + }); +}); diff --git a/tests/surveypoll.c b/tests/surveypoll.c new file mode 100644 index 00000000..f97e1c22 --- /dev/null +++ b/tests/surveypoll.c @@ -0,0 +1,126 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +TestMain("Survey pollable", { + + atexit(nng_fini); + + Convey("Given a connected survey pair", { + nng_socket surv; + nng_socket resp; + nng_ctx ctx; + + So(nng_surveyor0_open(&surv) == 0); + So(nng_respondent0_open(&resp) == 0); + So(nng_ctx_open(&ctx, surv) == 0); + + So(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 2000) == 0); + So(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 2000) == 0); + So(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 2000) == 0); + So(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 2000) == 0); + + Reset({ + nng_ctx_close(ctx); + nng_close(surv); + nng_close(resp); + }); + So(nng_listen(resp, "inproc://ctx1", NULL, 0) == 0); + + Convey("Surveyor ctx not pollable", { + int fd; + + So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == + NNG_ENOTSUP); + So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == + NNG_ENOTSUP); + }); + + Convey("Suveyor starts writable", { + int fd; + + So(nng_getopt_int(surv, NNG_OPT_SENDFD, &fd) == 0); + So(fdready(fd) == true); + + Convey("And becomes readable on connect", { + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(fdready(fd) == true); + + Convey("And stays writable", { + // 500 messages should force all + // the way to send depth. + int i; + for (i = 0; i < 500; i++) { + nng_msg *m; + if (nng_msg_alloc(&m, 0) != + 0) { + break; + } + // Fill intermediate queues. + if (nng_sendmsg(surv, m, + NNG_FLAG_NONBLOCK) != + 0) { + nng_msg_free(m); + } + } + So(i == 500); + So(fdready(fd) == true); + }); + }); + }); + + Convey("Surveyor starts not readable", { + int fd; + + So(nng_getopt_int(surv, NNG_OPT_RECVFD, &fd) == 0); + So(fdready(fd) == false); + + Convey("And doesn't become readable on connect", { + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(fdready(fd) == false); + }); + + Convey("And becomes readable on data", { + nng_msg *msg; + + So(nng_dial(surv, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(200); + + So(nng_msg_alloc(&msg, 0) == 0); + So(fdready(fd) == false); + So(nng_msg_append(msg, "xyz", 3) == 0); + So(nng_sendmsg(surv, msg, 0) == 0); + So(nng_recvmsg(resp, &msg, 0) == + 0); // recv on rep + So(nng_sendmsg(resp, msg, 0) == + 0); // echo it back + nng_msleep( + 300); // give time for message to arrive + So(fdready(fd) == true); + Convey("Is no longer readable after recv", { + So(nng_recvmsg(surv, &msg, 0) == 0); + nng_msg_free(msg); + So(fdready(fd) == false); + }); + }); + }); + }); +}); |
