From 5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 4 Apr 2018 13:36:54 -0700 Subject: fixes #334 Separate context for state machines from sockets This provides context support for REQ and REP sockets. More discussion around this is in the issue itself. Optionally we would like to extend this to the surveyor pattern. Note that we specifically do not support pollable descriptors for non-default contexts, and the results of using file descriptors for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined. In the future, it might be nice to figure out how to factor in optional use of a message queue for users who want more buffering, but we think there is little need for this with cooked mode. --- tests/reqpoll.c | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 tests/reqpoll.c (limited to 'tests/reqpoll.c') diff --git a/tests/reqpoll.c b/tests/reqpoll.c new file mode 100644 index 00000000..64c8df66 --- /dev/null +++ b/tests/reqpoll.c @@ -0,0 +1,148 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef _WIN32 +#include +#include +#define SOCKET int +#else + +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif + +#include +#include + +#include +#endif + +#include "convey.h" +#include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +bool +isready(SOCKET fd) +{ + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLRDNORM; + pfd.revents = 0; + + switch (poll(&pfd, 1, 0)) { + case 0: + return (false); + case 1: + return (true); + default: + printf("BAD POLL RETURN!\n"); + abort(); + } +} + +TestMain("REQ pollable", { + + atexit(nng_fini); + + Convey("Given a connected REQ/REP pair", { + nng_socket req; + nng_socket rep; + nng_ctx ctx; + + So(nng_req0_open(&req) == 0); + So(nng_rep0_open(&rep) == 0); + So(nng_ctx_open(&ctx, req) == 0); + + Reset({ + nng_ctx_close(ctx); + nng_close(req); + nng_close(rep); + }); + So(nng_listen(rep, "inproc://ctx1", NULL, 0) == 0); + + Convey("REQ ctx not pollable", { + int fd; + So(nng_ctx_open(&ctx, req) == 0); + Reset({ nng_ctx_close(req); }); + So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == + NNG_ENOTSUP); + So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == + NNG_ENOTSUP); + }); + + Convey("REQ starts not writable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And becomes readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == true); + + Convey("Not writable with message pending", { + for (int i = 0; i < 10; i++) { + nng_msg *m; + So(nng_msg_alloc(&m, 0) == 0); + // Fill intermediate queues. + if (nng_sendmsg(req, m, + NNG_FLAG_NONBLOCK) != + 0) { + nng_msg_free(m); + } + } + So(isready(fd) == false); + }); + }); + }); + + Convey("REQ starts not readable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And doesn't become readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == false); + }); + }); + + Convey("REQ becomes readable", { + int fd; + nng_msg *msg; + + So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0); + + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + So(nng_msg_append(msg, "xyz", 3) == 0); + So(nng_sendmsg(req, msg, 0) == 0); + So(nng_recvmsg(rep, &msg, 0) == 0); // recv on rep + So(nng_sendmsg(rep, msg, 0) == 0); // echo it back + nng_msleep(200); // give time for message to arrive + So(isready(fd) == true); + Convey("And is no longer readable after receive", { + So(nng_recvmsg(req, &msg, 0) == 0); + nng_msg_free(msg); + So(isready(fd) == false); + }); + }); + }); +}); -- cgit v1.2.3-70-g09d2