aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/reqrep0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-01 11:30:03 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-01 12:46:17 -0800
commited542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch)
tree673924ff077d468e6756529c2c204698d3faa47c /src/sp/protocol/reqrep0
parent1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff)
downloadnng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/protocol/reqrep0')
-rw-r--r--src/sp/protocol/reqrep0/CMakeLists.txt25
-rw-r--r--src/sp/protocol/reqrep0/rep.c705
-rw-r--r--src/sp/protocol/reqrep0/rep_test.c669
-rw-r--r--src/sp/protocol/reqrep0/req.c869
-rw-r--r--src/sp/protocol/reqrep0/req_test.c968
-rw-r--r--src/sp/protocol/reqrep0/xrep.c432
-rw-r--r--src/sp/protocol/reqrep0/xrep_test.c434
-rw-r--r--src/sp/protocol/reqrep0/xreq.c319
-rw-r--r--src/sp/protocol/reqrep0/xreq_test.c367
9 files changed, 4788 insertions, 0 deletions
diff --git a/src/sp/protocol/reqrep0/CMakeLists.txt b/src/sp/protocol/reqrep0/CMakeLists.txt
new file mode 100644
index 00000000..a3cecfd0
--- /dev/null
+++ b/src/sp/protocol/reqrep0/CMakeLists.txt
@@ -0,0 +1,25 @@
+#
+# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+nng_directory(reqrep0)
+
+nng_sources_if(NNG_PROTO_REQ0 req.c xreq.c)
+nng_headers_if(NNG_PROTO_REQ0 nng/protocol/reqrep0/req.h)
+nng_defines_if(NNG_PROTO_REQ0 NNG_HAVE_REQ0)
+
+nng_sources_if(NNG_PROTO_REP0 rep.c xrep.c)
+nng_headers_if(NNG_PROTO_REP0 nng/protocol/reqrep0/rep.h)
+nng_defines_if(NNG_PROTO_REP0 NNG_HAVE_REP0)
+
+nng_test(req_test)
+nng_test(rep_test)
+nng_test(xrep_test)
+nng_test(xreq_test)
diff --git a/src/sp/protocol/reqrep0/rep.c b/src/sp/protocol/reqrep0/rep.c
new file mode 100644
index 00000000..aa32d249
--- /dev/null
+++ b/src/sp/protocol/reqrep0/rep.c
@@ -0,0 +1,705 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/reqrep0/rep.h"
+
+// Response protocol. The REP protocol is the "reply" side of a
+// request-reply pair. This is useful for building RPC servers, for
+// example.
+
+typedef struct rep0_pipe rep0_pipe;
+typedef struct rep0_sock rep0_sock;
+typedef struct rep0_ctx rep0_ctx;
+
+static void rep0_pipe_send_cb(void *);
+static void rep0_pipe_recv_cb(void *);
+static void rep0_pipe_fini(void *);
+
+struct rep0_ctx {
+ rep0_sock * sock;
+ uint32_t pipe_id;
+ rep0_pipe * spipe; // send pipe
+ nni_aio * saio; // send aio
+ nni_aio * raio; // recv aio
+ nni_list_node sqnode;
+ nni_list_node rqnode;
+ size_t btrace_len;
+ uint32_t btrace[NNI_MAX_MAX_TTL + 1];
+};
+
+// rep0_sock is our per-socket protocol private structure.
+struct rep0_sock {
+ nni_mtx lk;
+ nni_atomic_int ttl;
+ nni_id_map pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ rep0_ctx ctx;
+ nni_pollable readable;
+ nni_pollable writable;
+};
+
+// rep0_pipe is our per-pipe protocol private structure.
+struct rep0_pipe {
+ nni_pipe * pipe;
+ rep0_sock * rep;
+ uint32_t id;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_list_node rnode; // receivable list linkage
+ nni_list sendq; // contexts waiting to send
+ bool busy;
+ bool closed;
+};
+
+static void
+rep0_ctx_close(void *arg)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&s->lk);
+ if ((aio = ctx->saio) != NULL) {
+ rep0_pipe *pipe = ctx->spipe;
+ ctx->saio = NULL;
+ ctx->spipe = NULL;
+ nni_list_remove(&pipe->sendq, ctx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if ((aio = ctx->raio) != NULL) {
+ nni_list_remove(&s->recvq, ctx);
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+rep0_ctx_fini(void *arg)
+{
+ rep0_ctx *ctx = arg;
+
+ rep0_ctx_close(ctx);
+}
+
+static int
+rep0_ctx_init(void *carg, void *sarg)
+{
+ rep0_sock *s = sarg;
+ rep0_ctx * ctx = carg;
+
+ NNI_LIST_NODE_INIT(&ctx->sqnode);
+ NNI_LIST_NODE_INIT(&ctx->rqnode);
+ ctx->btrace_len = 0;
+ ctx->sock = s;
+ ctx->pipe_id = 0;
+
+ return (0);
+}
+
+static void
+rep0_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->lk);
+ if (ctx->saio != aio) {
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ nni_list_node_remove(&ctx->sqnode);
+ ctx->saio = NULL;
+ nni_mtx_unlock(&s->lk);
+
+ nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+rep0_ctx_send(void *arg, nni_aio *aio)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ rep0_pipe *p;
+ nni_msg * msg;
+ int rv;
+ size_t len;
+ uint32_t p_id; // pipe id
+
+ msg = nni_aio_get_msg(aio);
+ nni_msg_header_clear(msg);
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->lk);
+ len = ctx->btrace_len;
+ p_id = ctx->pipe_id;
+
+ // Assert "completion" of the previous req request. This ensures
+ // exactly one send for one receive ordering.
+ ctx->btrace_len = 0;
+ ctx->pipe_id = 0;
+
+ if (ctx == &s->ctx) {
+ // No matter how this goes, we will no longer be able
+ // to send on the socket (root context). That's because
+ // we will have finished (successfully or otherwise) the
+ // reply for the single request we got.
+ nni_pollable_clear(&s->writable);
+ }
+ if (len == 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if ((p = nni_id_get(&s->pipes, p_id)) == NULL) {
+ // Pipe is gone. Make this look like a good send to avoid
+ // disrupting the state machine. We don't care if the peer
+ // lost interest in our reply.
+ nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ return;
+ }
+ if (!p->busy) {
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ nni_mtx_unlock(&s->lk);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ ctx->saio = aio;
+ ctx->spipe = p;
+ nni_list_append(&p->sendq, ctx);
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+rep0_sock_fini(void *arg)
+{
+ rep0_sock *s = arg;
+
+ nni_id_map_fini(&s->pipes);
+ rep0_ctx_fini(&s->ctx);
+ nni_pollable_fini(&s->writable);
+ nni_pollable_fini(&s->readable);
+ nni_mtx_fini(&s->lk);
+}
+
+static int
+rep0_sock_init(void *arg, nni_sock *sock)
+{
+ rep0_sock *s = arg;
+
+ NNI_ARG_UNUSED(sock);
+
+ nni_mtx_init(&s->lk);
+ nni_id_map_init(&s->pipes, 0, 0, false);
+ NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode);
+ NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode);
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
+
+ (void) rep0_ctx_init(&s->ctx, s);
+
+ // We start off without being either readable or writable.
+ // Readability comes when there is something on the socket.
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
+
+ return (0);
+}
+
+static void
+rep0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+rep0_sock_close(void *arg)
+{
+ rep0_sock *s = arg;
+
+ rep0_ctx_close(&s->ctx);
+}
+
+static void
+rep0_pipe_stop(void *arg)
+{
+ rep0_pipe *p = arg;
+
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
+}
+
+static void
+rep0_pipe_fini(void *arg)
+{
+ rep0_pipe *p = arg;
+ nng_msg * msg;
+
+ if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msg_free(msg);
+ }
+
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+}
+
+static int
+rep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
+{
+ rep0_pipe *p = arg;
+
+ nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p);
+
+ NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode);
+
+ p->id = nni_pipe_id(pipe);
+ p->pipe = pipe;
+ p->rep = s;
+ return (0);
+}
+
+static int
+rep0_pipe_start(void *arg)
+{
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ int rv;
+
+ if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
+ nni_mtx_lock(&s->lk);
+ rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p);
+ nni_mtx_unlock(&s->lk);
+ if (rv != 0) {
+ return (rv);
+ }
+ // By definition, we have not received a request yet on this pipe,
+ // so it cannot cause us to become writable.
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ return (0);
+}
+
+static void
+rep0_pipe_close(void *arg)
+{
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
+
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
+
+ nni_mtx_lock(&s->lk);
+ p->closed = true;
+ if (nni_list_active(&s->recvpipes, p)) {
+ // We are no longer "receivable".
+ nni_list_remove(&s->recvpipes, p);
+ }
+ while ((ctx = nni_list_first(&p->sendq)) != NULL) {
+ nni_aio *aio;
+ nni_msg *msg;
+ // Pipe was closed. To avoid pushing an error back to the
+ // entire socket, we pretend we completed this successfully.
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
+ if (p->id == s->ctx.pipe_id) {
+ // We "can" send. (Well, not really, but we will happily
+ // accept a message and discard it.)
+ nni_pollable_raise(&s->writable);
+ }
+ nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+rep0_pipe_send_cb(void *arg)
+{
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
+ nni_aio * aio;
+ nni_msg * msg;
+ size_t len;
+
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_mtx_lock(&s->lk);
+ p->busy = false;
+ if ((ctx = nni_list_first(&p->sendq)) == NULL) {
+ // Nothing else to send.
+ if (p->id == s->ctx.pipe_id) {
+ // Mark us ready for the other side to send!
+ nni_pollable_raise(&s->writable);
+ }
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ ctx->spipe = NULL;
+ p->busy = true;
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+
+ nni_mtx_unlock(&s->lk);
+
+ nni_aio_finish_sync(aio, 0, len);
+}
+
+static void
+rep0_cancel_recv(nni_aio *aio, void *arg, int rv)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->lk);
+ if (ctx->raio == aio) {
+ nni_list_remove(&s->recvq, ctx);
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+rep0_ctx_recv(void *arg, nni_aio *aio)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ rep0_pipe *p;
+ size_t len;
+ nni_msg * msg;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&s->lk);
+ if ((p = nni_list_first(&s->recvpipes)) == NULL) {
+ int rv;
+ if ((rv = nni_aio_schedule(aio, rep0_cancel_recv, ctx)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ if (ctx->raio != NULL) {
+ // Cannot have a second receive operation pending.
+ // This could be ESTATE, or we could cancel the first
+ // with ECANCELED. We elect the former.
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ ctx->raio = aio;
+ nni_list_append(&s->recvq, ctx);
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_list_remove(&s->recvpipes, p);
+ if (nni_list_empty(&s->recvpipes)) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ if ((ctx == &s->ctx) && !p->busy) {
+ nni_pollable_raise(&s->writable);
+ }
+
+ len = nni_msg_header_len(msg);
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ ctx->btrace_len = len;
+ ctx->pipe_id = nni_pipe_id(p->pipe);
+ nni_mtx_unlock(&s->lk);
+
+ nni_msg_header_clear(msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+rep0_pipe_recv_cb(void *arg)
+{
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
+ nni_msg * msg;
+ uint8_t * body;
+ nni_aio * aio;
+ size_t len;
+ int hops;
+ int ttl;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(&p->aio_recv);
+ ttl = nni_atomic_get(&s->ttl);
+
+ nni_msg_set_pipe(msg, p->id);
+
+ // Move backtrace from body to header
+ hops = 1;
+ for (;;) {
+ bool end;
+
+ if (hops > ttl) {
+ // This isn't malformed, but it has gone
+ // through too many hops. Do not disconnect,
+ // because we can legitimately receive messages
+ // with too many hops from devices, etc.
+ goto drop;
+ }
+ hops++;
+ if (nni_msg_len(msg) < 4) {
+ // Peer is speaking garbage. Kick it.
+ nni_msg_free(msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
+ // Out of memory, so drop it.
+ goto drop;
+ }
+ nni_msg_trim(msg, 4);
+ if (end) {
+ break;
+ }
+ }
+
+ len = nni_msg_header_len(msg);
+
+ nni_mtx_lock(&s->lk);
+
+ if (p->closed) {
+ // If we are closed, then we can't return data.
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_mtx_unlock(&s->lk);
+ nni_msg_free(msg);
+ return;
+ }
+
+ if ((ctx = nni_list_first(&s->recvq)) == NULL) {
+ // No one waiting to receive yet, holding pattern.
+ nni_list_append(&s->recvpipes, p);
+ nni_pollable_raise(&s->readable);
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+
+ nni_list_remove(&s->recvq, ctx);
+ aio = ctx->raio;
+ ctx->raio = NULL;
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ if ((ctx == &s->ctx) && !p->busy) {
+ nni_pollable_raise(&s->writable);
+ }
+
+ // schedule another receive
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+
+ ctx->btrace_len = len;
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ nni_msg_header_clear(msg);
+ ctx->pipe_id = p->id;
+
+ nni_mtx_unlock(&s->lk);
+
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
+ return;
+
+drop:
+ nni_msg_free(msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+}
+
+static int
+rep0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ rep0_sock *s = arg;
+ int ttl;
+ int rv;
+
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
+}
+
+static int
+rep0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ rep0_sock *s = arg;
+
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
+}
+
+static int
+rep0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ rep0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+rep0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ rep0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
+ return (rv);
+ }
+
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static void
+rep0_sock_send(void *arg, nni_aio *aio)
+{
+ rep0_sock *s = arg;
+
+ rep0_ctx_send(&s->ctx, aio);
+}
+
+static void
+rep0_sock_recv(void *arg, nni_aio *aio)
+{
+ rep0_sock *s = arg;
+
+ rep0_ctx_recv(&s->ctx, aio);
+}
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe_ops rep0_pipe_ops = {
+ .pipe_size = sizeof(rep0_pipe),
+ .pipe_init = rep0_pipe_init,
+ .pipe_fini = rep0_pipe_fini,
+ .pipe_start = rep0_pipe_start,
+ .pipe_close = rep0_pipe_close,
+ .pipe_stop = rep0_pipe_stop,
+};
+
+static nni_proto_ctx_ops rep0_ctx_ops = {
+ .ctx_size = sizeof(rep0_ctx),
+ .ctx_init = rep0_ctx_init,
+ .ctx_fini = rep0_ctx_fini,
+ .ctx_send = rep0_ctx_send,
+ .ctx_recv = rep0_ctx_recv,
+};
+
+static nni_option rep0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_MAXTTL,
+ .o_get = rep0_sock_get_max_ttl,
+ .o_set = rep0_sock_set_max_ttl,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = rep0_sock_get_recvfd,
+ },
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = rep0_sock_get_sendfd,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops rep0_sock_ops = {
+ .sock_size = sizeof(rep0_sock),
+ .sock_init = rep0_sock_init,
+ .sock_fini = rep0_sock_fini,
+ .sock_open = rep0_sock_open,
+ .sock_close = rep0_sock_close,
+ .sock_options = rep0_sock_options,
+ .sock_send = rep0_sock_send,
+ .sock_recv = rep0_sock_recv,
+};
+
+static nni_proto rep0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME },
+ .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_sock_ops = &rep0_sock_ops,
+ .proto_pipe_ops = &rep0_pipe_ops,
+ .proto_ctx_ops = &rep0_ctx_ops,
+};
+
+int
+nng_rep0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &rep0_proto));
+}
diff --git a/src/sp/protocol/reqrep0/rep_test.c b/src/sp/protocol/reqrep0/rep_test.c
new file mode 100644
index 00000000..5a47e67a
--- /dev/null
+++ b/src/sp/protocol/reqrep0/rep_test.c
@@ -0,0 +1,669 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+static void
+test_rep_identity(void)
+{
+ nng_socket s;
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ NUTS_PASS(nng_rep0_open(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p1));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p2));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n1));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n2));
+ NUTS_CLOSE(s);
+ NUTS_TRUE(p1 == NNG_REP0_SELF);
+ NUTS_TRUE(p2 == NNG_REP0_PEER);
+ NUTS_MATCH(n1, NNG_REP0_SELF_NAME);
+ NUTS_MATCH(n2, NNG_REP0_PEER_NAME);
+ nng_strfree(n1);
+ nng_strfree(n2);
+}
+
+void
+test_rep_send_bad_state(void)
+{
+ nng_socket rep;
+ nng_msg * msg = NULL;
+
+ NUTS_TRUE(nng_rep0_open(&rep) == 0);
+ NUTS_TRUE(nng_msg_alloc(&msg, 0) == 0);
+ NUTS_TRUE(nng_sendmsg(rep, msg, 0) == NNG_ESTATE);
+ nng_msg_free(msg);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_poll_writeable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(rep, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable before connect.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_MARRY(req, rep);
+
+ // Still not writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // If we get a job, *then* we become writable
+ NUTS_SEND(req, "abc");
+ NUTS_RECV(rep, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ // And is no longer writable once we send a message
+ NUTS_SEND(rep, "def");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ // Even after receiving it
+ NUTS_RECV(req, "def");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_poll_readable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(rep, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(req, rep);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(req, "abc");
+ NUTS_SLEEP(100);
+
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ // and receiving makes it no longer ready
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0));
+ nng_msg_free(msg);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // TODO verify unsolicited response
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_context_no_poll(void)
+{
+ int fd;
+ nng_socket req;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_rep0_open(&req));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+ NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP);
+ NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(req);
+}
+
+void
+test_rep_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char * addr;
+
+ NUTS_ADDR(addr, "inproc");
+ NUTS_PASS(nng_rep0_open(&s1));
+ NUTS_PASS(nng_rep0_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ nng_stats_free(stats);
+}
+
+void
+test_rep_double_recv(void)
+{
+ nng_socket s1;
+ nng_aio * aio1;
+ nng_aio * aio2;
+
+ NUTS_PASS(nng_rep0_open(&s1));
+ NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ NUTS_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+
+ nng_recv_aio(s1, aio1);
+ nng_recv_aio(s1, aio2);
+
+ nng_aio_wait(aio2);
+ NUTS_FAIL(nng_aio_result(aio2), NNG_ESTATE);
+ NUTS_CLOSE(s1);
+ NUTS_FAIL(nng_aio_result(aio1), NNG_ECLOSED);
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+}
+
+void
+test_rep_close_pipe_before_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p;
+ nng_aio * aio1;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+
+ NUTS_MARRY(req, rep);
+ NUTS_SEND(req, "test");
+
+ nng_recv_aio(rep, aio1);
+ nng_aio_wait(aio1);
+ NUTS_PASS(nng_aio_result(aio1));
+ NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL);
+ p = nng_msg_get_pipe(m);
+ NUTS_PASS(nng_pipe_close(p));
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio1);
+}
+
+void
+test_rep_close_pipe_during_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p = NNG_PIPE_INITIALIZER;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 200));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_SENDBUF, 20));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 20));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 20));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_RECVBUF, 1));
+
+ NUTS_MARRY(req, rep);
+
+ for (int i = 0; i < 100; i++) {
+ int rv;
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+ p = nng_msg_get_pipe(m);
+ rv = nng_sendmsg(rep, m, 0);
+ if (rv == NNG_ETIMEDOUT) {
+ // Queue is backed up, senders are busy.
+ nng_msg_free(m);
+ break;
+ }
+ NUTS_PASS(rv);
+ }
+ NUTS_PASS(nng_pipe_close(p));
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_ctx_recv_aio_stopped(void)
+{
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_ctx_open(&ctx, rep));
+
+ nng_aio_stop(aio);
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+void
+test_rep_close_pipe_context_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p = NNG_PIPE_INITIALIZER;
+ nng_msg * m;
+ nng_ctx ctx[100];
+ nng_aio * aio[100];
+ int i;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ for (i = 0; i < 100; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], rep));
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ }
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 1));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_RECVBUF, 1));
+
+ NUTS_MARRY(req, rep);
+
+ for (i = 0; i < 100; i++) {
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ nng_ctx_recv(ctx[i], aio[i]);
+ }
+ for (i = 0; i < 100; i++) {
+ nng_aio_wait(aio[i]);
+ NUTS_PASS(nng_aio_result(aio[i]));
+ NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL);
+ p = nng_msg_get_pipe(m);
+ nng_aio_set_msg(aio[i], m);
+ nng_ctx_send(ctx[i], aio[i]);
+ }
+
+ // Note that REQ socket is not reading the results.
+ NUTS_PASS(nng_pipe_close(p));
+
+ for (i = 0; i < 100; i++) {
+ int rv;
+ nng_aio_wait(aio[i]);
+ rv = nng_aio_result(aio[i]);
+ if (rv != 0) {
+ NUTS_FAIL(rv, NNG_ECLOSED);
+ nng_msg_free(nng_aio_get_msg(aio[i]));
+ }
+ nng_aio_free(aio[i]);
+ NUTS_PASS(nng_ctx_close(ctx[i]));
+ }
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_close_context_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_ctx ctx[100];
+ nng_aio * aio[100];
+ int i;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ for (i = 0; i < 100; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], rep));
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ }
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 1));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_RECVBUF, 1));
+
+ NUTS_MARRY(req, rep);
+
+ for (i = 0; i < 100; i++) {
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, (unsigned) i | 0x80000000u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ nng_ctx_recv(ctx[i], aio[i]);
+ }
+ for (i = 0; i < 100; i++) {
+ nng_aio_wait(aio[i]);
+ NUTS_PASS(nng_aio_result(aio[i]));
+ NUTS_TRUE((m = nng_aio_get_msg(aio[i])) != NULL);
+ nng_aio_set_msg(aio[i], m);
+ nng_ctx_send(ctx[i], aio[i]);
+ }
+
+ // Note that REQ socket is not reading the results.
+ for (i = 0; i < 100; i++) {
+ int rv;
+ NUTS_PASS(nng_ctx_close(ctx[i]));
+ nng_aio_wait(aio[i]);
+ rv = nng_aio_result(aio[i]);
+ if (rv != 0) {
+ NUTS_FAIL(rv, NNG_ECLOSED);
+ nng_msg_free(nng_aio_get_msg(aio[i]));
+ }
+ nng_aio_free(aio[i]);
+ }
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_rep_close_recv(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ nng_recv_aio(rep, aio);
+ NUTS_CLOSE(rep);
+ NUTS_CLOSE(req);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED);
+ nng_aio_free(aio);
+}
+
+struct rep_close_recv_cb_state {
+ nng_aio *aio;
+ nng_mtx *mtx;
+ nng_cv * cv;
+ int done;
+ int result;
+ nng_msg *msg;
+};
+
+static void
+rep_close_recv_cb(void *arg)
+{
+ struct rep_close_recv_cb_state *state = arg;
+
+ nng_mtx_lock(state->mtx);
+ state->result = nng_aio_result(state->aio);
+ state->msg = nng_aio_get_msg(state->aio);
+ state->done = true;
+ nng_cv_wake(state->cv);
+ nng_mtx_unlock(state->mtx);
+}
+
+void
+test_rep_close_recv_cb(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ struct rep_close_recv_cb_state state;
+
+ memset(&state, 0, sizeof(state));
+ NUTS_PASS(nng_mtx_alloc(&state.mtx));
+ NUTS_PASS(nng_cv_alloc(&state.cv, state.mtx));
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+ NUTS_PASS(nng_aio_alloc(&state.aio, rep_close_recv_cb, &state));
+ nng_recv_aio(rep, state.aio);
+ NUTS_CLOSE(rep);
+ NUTS_CLOSE(req);
+ nng_mtx_lock(state.mtx);
+ while (!state.done) {
+ NUTS_PASS(nng_cv_until(state.cv, nng_clock() + 1000));
+ }
+ nng_mtx_unlock(state.mtx);
+ NUTS_TRUE(state.done != 0);
+ NUTS_FAIL(nng_aio_result(state.aio), NNG_ECLOSED);
+ NUTS_TRUE(nng_aio_get_msg(state.aio) == NULL);
+ nng_aio_free(state.aio);
+ nng_cv_free(state.cv);
+ nng_mtx_free(state.mtx);
+}
+
+static void
+test_rep_ctx_recv_nonblock(void)
+{
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_ctx_open(&ctx, rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_set_timeout(aio, 0); // Instant timeout
+ nng_ctx_recv(ctx, aio);
+
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+static void
+test_rep_ctx_send_nonblock(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_ctx ctx;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 2000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_ctx_open(&ctx, rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_MARRY(req, rep);
+
+ NUTS_SEND(req, "SEND");
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ NUTS_PASS(nng_aio_result(aio));
+ // message carries over
+ msg = nng_aio_get_msg(aio);
+ nng_aio_set_msg(aio, msg);
+ nng_aio_set_timeout(aio, 0); // Instant timeout
+ nng_ctx_send(ctx, aio);
+
+ nng_aio_wait(aio);
+ NUTS_PASS(nng_aio_result(aio));
+ NUTS_CLOSE(rep);
+ NUTS_CLOSE(req);
+ nng_aio_free(aio);
+}
+
+static void
+test_rep_ctx_send_nonblock2(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_ctx rep_ctx[10];
+ nng_aio * rep_aio[10];
+ int num_good = 0;
+ int num_fail = 0;
+
+ // We are going to send a bunch of requests, receive them,
+ // but then see that non-block pressure exerts for some, but
+ // that at least one non-blocking send works.
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ for (int i = 0; i < 10; i++) {
+ NUTS_PASS(nng_ctx_open(&rep_ctx[i], rep));
+ NUTS_PASS(nng_aio_alloc(&rep_aio[i], NULL, NULL));
+ }
+ NUTS_MARRY(req, rep);
+
+ for (int i = 0; i < 10; i++) {
+ nng_msg *msg;
+ NUTS_PASS(nng_msg_alloc(&msg, 4));
+ NUTS_PASS(nng_msg_append_u32(msg, (unsigned) i | 0x80000000u));
+ nng_ctx_recv(rep_ctx[i], rep_aio[i]);
+ NUTS_PASS(nng_sendmsg(req, msg, 0));
+ }
+ for (int i = 0; i < 10; i++) {
+ nng_msg *msg;
+ nng_aio_wait(rep_aio[i]);
+ NUTS_PASS(nng_aio_result(rep_aio[i]));
+ msg = nng_aio_get_msg(rep_aio[i]);
+ nng_aio_set_timeout(rep_aio[i], 0);
+ nng_aio_set_msg(rep_aio[i], msg);
+ nng_ctx_send(rep_ctx[i], rep_aio[i]);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ int rv;
+ nng_aio_wait(rep_aio[i]);
+ rv = nng_aio_result(rep_aio[i]);
+ if (rv == 0) {
+ num_good++;
+ } else {
+ NUTS_FAIL(rv, NNG_ETIMEDOUT);
+ nng_msg_free(nng_aio_get_msg(rep_aio[i]));
+ num_fail++;
+ }
+ }
+
+ TEST_ASSERT(num_good > 0);
+ TEST_ASSERT(num_fail > 0);
+
+ for (int i = 0; i < 10; i++) {
+ nng_aio_free(rep_aio[i]);
+ nng_ctx_close(rep_ctx[i]);
+ }
+ NUTS_CLOSE(rep);
+ NUTS_CLOSE(req);
+}
+
+static void
+test_rep_send_nonblock(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ int rv;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_MARRY(req, rep);
+
+ NUTS_SEND(req, "SEND");
+ NUTS_RECV(rep, "SEND");
+
+ // Use the nonblock flag
+ rv = nng_send(rep, "RECV", 5, NNG_FLAG_NONBLOCK);
+
+ NUTS_PASS(rv);
+ NUTS_RECV(req, "RECV");
+ NUTS_CLOSE(rep);
+ NUTS_CLOSE(req);
+}
+
+void
+test_rep_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 200));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 200));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, 1u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ NUTS_FAIL(nng_recvmsg(rep, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+NUTS_TESTS = {
+ { "rep identity", test_rep_identity },
+ { "rep send bad state", test_rep_send_bad_state },
+ { "rep poll readable", test_rep_poll_readable },
+ { "rep poll writable", test_rep_poll_writeable },
+ { "rep context does not poll", test_rep_context_no_poll },
+ { "rep validate peer", test_rep_validate_peer },
+ { "rep double recv", test_rep_double_recv },
+ { "rep send nonblock", test_rep_send_nonblock },
+ { "rep close pipe before send", test_rep_close_pipe_before_send },
+ { "rep close pipe during send", test_rep_close_pipe_during_send },
+ { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped },
+ { "rep close pipe context send", test_rep_close_pipe_context_send },
+ { "rep close context send", test_rep_close_context_send },
+ { "rep close recv", test_rep_close_recv },
+ { "rep close recv cb", test_rep_close_recv_cb },
+ { "rep context send nonblock", test_rep_ctx_send_nonblock },
+ { "rep context send nonblock 2", test_rep_ctx_send_nonblock2 },
+ { "rep context recv nonblock", test_rep_ctx_recv_nonblock },
+ { "rep recv garbage", test_rep_recv_garbage },
+ { NULL, NULL },
+};
diff --git a/src/sp/protocol/reqrep0/req.c b/src/sp/protocol/reqrep0/req.c
new file mode 100644
index 00000000..cb3c9395
--- /dev/null
+++ b/src/sp/protocol/reqrep0/req.c
@@ -0,0 +1,869 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+#include <stdio.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/reqrep0/req.h"
+
+// Request protocol. The REQ protocol is the "request" side of a
+// request-reply pair. This is useful for building RPC clients, for example.
+
+typedef struct req0_pipe req0_pipe;
+typedef struct req0_sock req0_sock;
+typedef struct req0_ctx req0_ctx;
+
+static void req0_run_send_queue(req0_sock *, nni_list *);
+static void req0_ctx_reset(req0_ctx *);
+static void req0_ctx_timeout(void *);
+static void req0_pipe_fini(void *);
+static void req0_ctx_fini(void *);
+static int req0_ctx_init(void *, void *);
+
+// A req0_ctx is a "context" for the request. It uses most of the
+// socket, but keeps track of its own outstanding replays, the request ID,
+// and so forth.
+struct req0_ctx {
+ req0_sock * sock;
+ nni_list_node sock_node; // node on the socket context list
+ nni_list_node send_node; // node on the send_queue
+ nni_list_node pipe_node; // node on the pipe list
+ uint32_t request_id; // request ID, without high bit set
+ nni_aio * recv_aio; // user aio waiting to recv - only one!
+ nni_aio * send_aio; // user aio waiting to send
+ nng_msg * req_msg; // request message (owned by protocol)
+ size_t req_len; // length of request message (for stats)
+ nng_msg * rep_msg; // reply message
+ nni_timer_node timer;
+ nni_duration retry;
+ bool conn_reset; // sent message w/o retry, peer disconnect
+};
+
+// A req0_sock is our per-socket protocol private structure.
+struct req0_sock {
+ nni_duration retry;
+ bool closed;
+ nni_atomic_int ttl;
+ req0_ctx master; // base socket master
+ nni_list ready_pipes;
+ nni_list busy_pipes;
+ nni_list stop_pipes;
+ nni_list contexts;
+ nni_list send_queue; // contexts waiting to send.
+ nni_id_map requests; // contexts by request ID
+ nni_pollable readable;
+ nni_pollable writable;
+ nni_mtx mtx;
+};
+
+// A req0_pipe is our per-pipe protocol private structure.
+struct req0_pipe {
+ nni_pipe * pipe;
+ req0_sock * req;
+ nni_list_node node;
+ nni_list contexts; // contexts with pending traffic
+ bool closed;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+};
+
+static void req0_sock_fini(void *);
+static void req0_send_cb(void *);
+static void req0_recv_cb(void *);
+
+static int
+req0_sock_init(void *arg, nni_sock *sock)
+{
+ req0_sock *s = arg;
+
+ NNI_ARG_UNUSED(sock);
+
+ // Request IDs are 32 bits, with the high order bit set.
+ // We start at a random point, to minimize likelihood of
+ // accidental collision across restarts.
+ nni_id_map_init(&s->requests, 0x80000000u, 0xffffffffu, true);
+
+ nni_mtx_init(&s->mtx);
+
+ NNI_LIST_INIT(&s->ready_pipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node);
+ NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node);
+
+ // this is "semi random" start for request IDs.
+ s->retry = NNI_SECOND * 60;
+
+ (void) req0_ctx_init(&s->master, s);
+
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
+
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
+ return (0);
+}
+
+static void
+req0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+req0_sock_close(void *arg)
+{
+ req0_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ s->closed = true;
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+req0_sock_fini(void *arg)
+{
+ req0_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ NNI_ASSERT(nni_list_empty(&s->busy_pipes));
+ NNI_ASSERT(nni_list_empty(&s->stop_pipes));
+ NNI_ASSERT(nni_list_empty(&s->ready_pipes));
+ nni_mtx_unlock(&s->mtx);
+
+ req0_ctx_fini(&s->master);
+ nni_pollable_fini(&s->readable);
+ nni_pollable_fini(&s->writable);
+ nni_id_map_fini(&s->requests);
+ nni_mtx_fini(&s->mtx);
+}
+
+static void
+req0_pipe_stop(void *arg)
+{
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
+
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_mtx_lock(&s->mtx);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+req0_pipe_fini(void *arg)
+{
+ req0_pipe *p = arg;
+
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+}
+
+static int
+req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
+{
+ req0_pipe *p = arg;
+
+ nni_aio_init(&p->aio_recv, req0_recv_cb, p);
+ nni_aio_init(&p->aio_send, req0_send_cb, p);
+ NNI_LIST_NODE_INIT(&p->node);
+ NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node);
+ p->pipe = pipe;
+ p->req = s;
+ return (0);
+}
+
+static int
+req0_pipe_start(void *arg)
+{
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
+
+ if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
+ return (NNG_EPROTO);
+ }
+
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->ready_pipes, p);
+ nni_pollable_raise(&s->writable);
+ req0_run_send_queue(s, NULL);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ return (0);
+}
+
+static void
+req0_pipe_close(void *arg)
+{
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
+ req0_ctx * ctx;
+
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_send);
+
+ nni_mtx_lock(&s->mtx);
+ // This removes the node from either busy_pipes or ready_pipes.
+ // It doesn't much matter which. We stick the pipe on the stop
+ // list, so that we can wait for that to close down safely.
+ p->closed = true;
+ nni_list_node_remove(&p->node);
+ nni_list_append(&s->stop_pipes, p);
+ if (nni_list_empty(&s->ready_pipes)) {
+ nni_pollable_clear(&s->writable);
+ }
+
+ while ((ctx = nni_list_first(&p->contexts)) != NULL) {
+ nni_list_remove(&p->contexts, ctx);
+ nng_aio *aio;
+ if (ctx->retry <= 0) {
+ // If we can't retry, then just cancel the operation
+ // altogether. We should only be waiting for recv,
+ // because we will already have sent if we are here.
+ if ((aio = ctx->recv_aio) != NULL) {
+ ctx->recv_aio = NULL;
+ nni_aio_finish_error(aio, NNG_ECONNRESET);
+ req0_ctx_reset(ctx);
+ } else {
+ req0_ctx_reset(ctx);
+ ctx->conn_reset = true;
+ }
+ } else {
+ // Reset the timer on this so it expires immediately.
+ // This is actually easier than canceling the timer and
+ // running the send_queue separately. (In particular,
+ // it avoids a potential deadlock on cancelling the
+ // timer.)
+ nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+// For cooked mode, we use a context, and send out that way. This
+// completely bypasses the upper write queue. Each context keeps one
+// message pending; these are "scheduled" via the send_queue. The send_queue
+// is ordered, so FIFO ordering between contexts is provided for.
+
+static void
+req0_send_cb(void *arg)
+{
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
+ nni_aio * aio;
+ nni_list sent_list;
+
+ nni_aio_list_init(&sent_list);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ // We failed to send... clean up and deal with it.
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ // We completed a cooked send, so we need to reinsert ourselves
+ // in the ready list, and re-run the send_queue.
+
+ nni_mtx_lock(&s->mtx);
+ if (p->closed || s->closed) {
+ // This occurs if the req0_pipe_close has been called.
+ // In that case we don't want any more processing.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_remove(&s->busy_pipes, p);
+ nni_list_append(&s->ready_pipes, p);
+ if (nni_list_empty(&s->send_queue)) {
+ nni_pollable_raise(&s->writable);
+ }
+ req0_run_send_queue(s, &sent_list);
+ nni_mtx_unlock(&s->mtx);
+
+ while ((aio = nni_list_first(&sent_list)) != NULL) {
+ nni_list_remove(&sent_list, aio);
+ nni_aio_finish_sync(aio, 0, 0);
+ }
+}
+
+static void
+req0_recv_cb(void *arg)
+{
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
+ req0_ctx * ctx;
+ nni_msg * msg;
+ nni_aio * aio;
+ uint32_t id;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ // We yank 4 bytes from front of body, and move them to the header.
+ if (nni_msg_len(msg) < 4) {
+ // Malformed message.
+ goto malformed;
+ }
+ id = nni_msg_trim_u32(msg);
+
+ // Schedule another receive while we are processing this.
+ nni_mtx_lock(&s->mtx);
+
+ // NB: If close was called, then this will just abort.
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+
+ // Look for a context to receive it.
+ if (((ctx = nni_id_get(&s->requests, id)) == NULL) ||
+ (ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) {
+ nni_mtx_unlock(&s->mtx);
+ // No waiting context, we have not sent the request out to
+ // the wire yet, or context already has a reply ready.
+ // Discard the message.
+ nni_msg_free(msg);
+ return;
+ }
+
+ // We have our match, so we can remove this.
+ nni_list_node_remove(&ctx->send_node);
+ nni_id_remove(&s->requests, id);
+ ctx->request_id = 0;
+ if (ctx->req_msg != NULL) {
+ nni_msg_free(ctx->req_msg);
+ ctx->req_msg = NULL;
+ }
+
+ // Is there an aio waiting for us?
+ if ((aio = ctx->recv_aio) != NULL) {
+ ctx->recv_aio = NULL;
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
+ } else {
+ // No AIO, so stash msg. Receive will pick it up later.
+ ctx->rep_msg = msg;
+ if (ctx == &s->master) {
+ nni_pollable_raise(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ }
+ return;
+
+malformed:
+ nni_msg_free(msg);
+ nni_pipe_close(p->pipe);
+}
+
+static void
+req0_ctx_timeout(void *arg)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->mtx);
+ if ((ctx->req_msg != NULL) && (!s->closed)) {
+ if (!nni_list_node_active(&ctx->send_node)) {
+ nni_list_append(&s->send_queue, ctx);
+ }
+ req0_run_send_queue(s, NULL);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static int
+req0_ctx_init(void *arg, void *sock)
+{
+ req0_sock *s = sock;
+ req0_ctx * ctx = arg;
+
+ nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);
+
+ nni_mtx_lock(&s->mtx);
+ ctx->sock = s;
+ ctx->recv_aio = NULL;
+ ctx->retry = s->retry;
+ nni_list_append(&s->contexts, ctx);
+ nni_mtx_unlock(&s->mtx);
+
+ return (0);
+}
+
+static void
+req0_ctx_fini(void *arg)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&s->mtx);
+ if ((aio = ctx->recv_aio) != NULL) {
+ ctx->recv_aio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if ((aio = ctx->send_aio) != NULL) {
+ ctx->send_aio = NULL;
+ nni_aio_set_msg(aio, ctx->req_msg);
+ ctx->req_msg = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ req0_ctx_reset(ctx);
+ nni_list_remove(&s->contexts, ctx);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_timer_cancel(&ctx->timer);
+ nni_timer_fini(&ctx->timer);
+}
+
+static int
+req0_ctx_set_resend_time(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ req0_ctx *ctx = arg;
+ return (nni_copyin_ms(&ctx->retry, buf, sz, t));
+}
+
+static int
+req0_ctx_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_ctx *ctx = arg;
+ return (nni_copyout_ms(ctx->retry, buf, szp, t));
+}
+
+static void
+req0_run_send_queue(req0_sock *s, nni_list *sent_list)
+{
+ req0_ctx *ctx;
+ nni_aio * aio;
+
+ // Note: This routine should be called with the socket lock held.
+ while ((ctx = nni_list_first(&s->send_queue)) != NULL) {
+ req0_pipe *p;
+
+ if ((p = nni_list_first(&s->ready_pipes)) == NULL) {
+ return;
+ }
+
+ // We have a place to send it, so do the send.
+ // If a sending error occurs that causes the message to
+ // be dropped, we rely on the resend timer to pick it up.
+ // We also notify the completion callback if this is the
+ // first send attempt.
+ nni_list_remove(&s->send_queue, ctx);
+
+ // Schedule a resubmit timer. We only do this if we got
+ // a pipe to send to. Otherwise, we should get handled
+ // the next time that the send_queue is run. We don't do this
+ // if the retry is "disabled" with NNG_DURATION_INFINITE.
+ if (ctx->retry > 0) {
+ nni_timer_schedule(
+ &ctx->timer, nni_clock() + ctx->retry);
+ }
+
+ // Put us on the pipe list of active contexts.
+ // This gives the pipe a chance to kick a resubmit
+ // if the pipe is removed.
+ nni_list_node_remove(&ctx->pipe_node);
+ nni_list_append(&p->contexts, ctx);
+
+ nni_list_remove(&s->ready_pipes, p);
+ nni_list_append(&s->busy_pipes, p);
+ if (nni_list_empty(&s->ready_pipes)) {
+ nni_pollable_clear(&s->writable);
+ }
+
+ if ((aio = ctx->send_aio) != NULL) {
+ ctx->send_aio = NULL;
+ nni_aio_bump_count(aio, ctx->req_len);
+ // If the list was passed in, we want to do a
+ // synchronous completion later.
+ if (sent_list != NULL) {
+ nni_list_append(sent_list, aio);
+ } else {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ // At this point, we will never give this message back to
+ // to the user, so we don't have to worry about making it
+ // unique. We can freely clone it.
+ nni_msg_clone(ctx->req_msg);
+ nni_aio_set_msg(&p->aio_send, ctx->req_msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ }
+}
+
+void
+req0_ctx_reset(req0_ctx *ctx)
+{
+ req0_sock *s = ctx->sock;
+ // Call with sock lock held!
+
+ // We cannot safely "wait" using nni_timer_cancel, but this removes
+ // any scheduled timer activation. If the timeout is already running
+ // concurrently, it will still run. It should do nothing, because
+ // we toss the request. There is still a very narrow race if the
+ // timeout fires, but doesn't actually start running before we
+ // both finish this function, *and* manage to reschedule another
+ // request. The consequence of that occurring is that the request
+ // will be emitted on the wire twice. This is not actually tragic.
+ nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);
+
+ nni_list_node_remove(&ctx->pipe_node);
+ nni_list_node_remove(&ctx->send_node);
+ if (ctx->request_id != 0) {
+ nni_id_remove(&s->requests, ctx->request_id);
+ ctx->request_id = 0;
+ }
+ if (ctx->req_msg != NULL) {
+ nni_msg_free(ctx->req_msg);
+ ctx->req_msg = NULL;
+ }
+ if (ctx->rep_msg != NULL) {
+ nni_msg_free(ctx->rep_msg);
+ ctx->rep_msg = NULL;
+ }
+ ctx->conn_reset = false;
+}
+
+static void
+req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->mtx);
+ if (ctx->recv_aio == aio) {
+ ctx->recv_aio = NULL;
+
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the
+ // semantic of exactly one receive operation per send
+ // operation, and should be the least surprising for users. The
+ // main consequence is that if a receive operation is completed
+ // (in error or otherwise), the user must submit a new send
+ // operation to restart the state machine.
+ req0_ctx_reset(ctx);
+
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+req0_ctx_recv(void *arg, nni_aio *aio)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nni_msg * msg;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&s->mtx);
+ if ((ctx->recv_aio != NULL) ||
+ ((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) {
+ // We have already got a pending receive or have not
+ // tried to send a request yet.
+ // Either of these violate our basic state assumptions.
+ int rv;
+ if (ctx->conn_reset) {
+ ctx->conn_reset = false;
+ rv = NNG_ECONNRESET;
+ } else {
+ rv = NNG_ESTATE;
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if ((msg = ctx->rep_msg) == NULL) {
+ int rv;
+ rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ ctx->recv_aio = aio;
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ ctx->rep_msg = NULL;
+
+ // We have got a message to pass up, yay!
+ nni_aio_set_msg(aio, msg);
+ if (ctx == &s->master) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->mtx);
+ if (ctx->send_aio == aio) {
+ // There should not be a pending reply, because we canceled
+ // it while we were waiting.
+ NNI_ASSERT(ctx->recv_aio == NULL);
+ ctx->send_aio = NULL;
+ // Restore the message back to the aio.
+ nni_aio_set_msg(aio, ctx->req_msg);
+ nni_msg_header_clear(ctx->req_msg);
+ ctx->req_msg = NULL;
+
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the
+ // semantic of exactly one receive operation per send
+ // operation, and should be the least surprising for users. The
+ // main consequence is that if a receive operation is completed
+ // (in error or otherwise), the user must submit a new send
+ // operation to restart the state machine.
+ req0_ctx_reset(ctx);
+
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+req0_ctx_send(void *arg, nni_aio *aio)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nng_msg * msg = nni_aio_get_msg(aio);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&s->mtx);
+ if (s->closed) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ // Sending a new request cancels the old one, including any
+ // outstanding reply.
+ if (ctx->recv_aio != NULL) {
+ nni_aio_finish_error(ctx->recv_aio, NNG_ECANCELED);
+ ctx->recv_aio = NULL;
+ }
+ if (ctx->send_aio != NULL) {
+ nni_aio_set_msg(ctx->send_aio, ctx->req_msg);
+ nni_msg_header_clear(ctx->req_msg);
+ ctx->req_msg = NULL;
+ nni_aio_finish_error(ctx->send_aio, NNG_ECANCELED);
+ ctx->send_aio = NULL;
+ nni_list_remove(&s->send_queue, ctx);
+ }
+
+ // This resets the entire state machine.
+ req0_ctx_reset(ctx);
+
+ // Insert us on the per ID hash list, so that receives can find us.
+ if ((rv = nni_id_alloc(&s->requests, &ctx->request_id, ctx)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_msg_header_clear(msg);
+ nni_msg_header_append_u32(msg, ctx->request_id);
+
+ // If no pipes are ready, and the request was a poll (no background
+ // schedule), then fail it. Should be NNG_ETIMEDOUT.
+ rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx);
+ if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) {
+ nni_id_remove(&s->requests, ctx->request_id);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ ctx->req_len = nni_msg_len(msg);
+ ctx->req_msg = msg;
+ ctx->send_aio = aio;
+ nni_aio_set_msg(aio, NULL);
+
+ // Stick us on the send_queue list.
+ nni_list_append(&s->send_queue, ctx);
+
+ req0_run_send_queue(s, NULL);
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+req0_sock_send(void *arg, nni_aio *aio)
+{
+ req0_sock *s = arg;
+ req0_ctx_send(&s->master, aio);
+}
+
+static void
+req0_sock_recv(void *arg, nni_aio *aio)
+{
+ req0_sock *s = arg;
+ req0_ctx_recv(&s->master, aio);
+}
+
+static int
+req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
+}
+
+static int
+req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
+}
+
+static int
+req0_sock_set_resend_time(
+ void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ int rv;
+ rv = req0_ctx_set_resend_time(&s->master, buf, sz, t);
+ s->retry = s->master.retry;
+ return (rv);
+}
+
+static int
+req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ return (req0_ctx_get_resend_time(&s->master, buf, szp, t));
+}
+
+static int
+req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+req0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ req0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
+ return (rv);
+ }
+
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static nni_proto_pipe_ops req0_pipe_ops = {
+ .pipe_size = sizeof(req0_pipe),
+ .pipe_init = req0_pipe_init,
+ .pipe_fini = req0_pipe_fini,
+ .pipe_start = req0_pipe_start,
+ .pipe_close = req0_pipe_close,
+ .pipe_stop = req0_pipe_stop,
+};
+
+static nni_option req0_ctx_options[] = {
+ {
+ .o_name = NNG_OPT_REQ_RESENDTIME,
+ .o_get = req0_ctx_get_resend_time,
+ .o_set = req0_ctx_set_resend_time,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_ctx_ops req0_ctx_ops = {
+ .ctx_size = sizeof(req0_ctx),
+ .ctx_init = req0_ctx_init,
+ .ctx_fini = req0_ctx_fini,
+ .ctx_recv = req0_ctx_recv,
+ .ctx_send = req0_ctx_send,
+ .ctx_options = req0_ctx_options,
+};
+
+static nni_option req0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_MAXTTL,
+ .o_get = req0_sock_get_max_ttl,
+ .o_set = req0_sock_set_max_ttl,
+ },
+ {
+ .o_name = NNG_OPT_REQ_RESENDTIME,
+ .o_get = req0_sock_get_resend_time,
+ .o_set = req0_sock_set_resend_time,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = req0_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = req0_sock_get_send_fd,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops req0_sock_ops = {
+ .sock_size = sizeof(req0_sock),
+ .sock_init = req0_sock_init,
+ .sock_fini = req0_sock_fini,
+ .sock_open = req0_sock_open,
+ .sock_close = req0_sock_close,
+ .sock_options = req0_sock_options,
+ .sock_send = req0_sock_send,
+ .sock_recv = req0_sock_recv,
+};
+
+static nni_proto req0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME },
+ .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_sock_ops = &req0_sock_ops,
+ .proto_pipe_ops = &req0_pipe_ops,
+ .proto_ctx_ops = &req0_ctx_ops,
+};
+
+int
+nng_req0_open(nng_socket *sock)
+{
+ return (nni_proto_open(sock, &req0_proto));
+}
diff --git a/src/sp/protocol/reqrep0/req_test.c b/src/sp/protocol/reqrep0/req_test.c
new file mode 100644
index 00000000..fb78efa0
--- /dev/null
+++ b/src/sp/protocol/reqrep0/req_test.c
@@ -0,0 +1,968 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+static void
+test_req_identity(void)
+{
+ nng_socket s;
+ int p;
+ char * n;
+
+ NUTS_PASS(nng_req0_open(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p));
+ NUTS_TRUE(p == NNG_REQ0_SELF);
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p));
+ NUTS_TRUE(p == NNG_REQ0_PEER); // 49
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n));
+ NUTS_MATCH(n, NNG_REQ0_SELF_NAME);
+ nng_strfree(n);
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n));
+ NUTS_MATCH(n, NNG_REQ0_PEER_NAME);
+ nng_strfree(n);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_req_ttl_option(void)
+{
+ nng_socket req;
+ int v;
+ bool b;
+ size_t sz;
+ const char *opt = NNG_OPT_MAXTTL;
+
+ NUTS_PASS(nng_req0_open(&req));
+
+ NUTS_PASS(nng_socket_set_int(req, opt, 1));
+ NUTS_FAIL(nng_socket_set_int(req, opt, 0), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(req, opt, -1), NNG_EINVAL);
+ // This test will fail if the NNI_MAX_MAX_TTL is changed from the
+ // builtin default of 15.
+ NUTS_FAIL(nng_socket_set_int(req, opt, 16), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(req, opt, 256), NNG_EINVAL);
+ NUTS_PASS(nng_socket_set_int(req, opt, 3));
+ NUTS_PASS(nng_socket_get_int(req, opt, &v));
+ NUTS_TRUE(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ NUTS_PASS(nng_socket_get(req, opt, &v, &sz));
+ NUTS_TRUE(v == 3);
+ NUTS_TRUE(sz == sizeof(v));
+
+ NUTS_FAIL(nng_socket_set(req, opt, "", 1), NNG_EINVAL);
+ sz = 1;
+ NUTS_FAIL(nng_socket_get(req, opt, &v, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_bool(req, opt, true), NNG_EBADTYPE);
+ NUTS_FAIL(nng_socket_get_bool(req, opt, &b), NNG_EBADTYPE);
+
+ NUTS_CLOSE(req);
+}
+
+static void
+test_req_resend_option(void)
+{
+ nng_socket req;
+ nng_duration d;
+ bool b;
+ size_t sz = sizeof(b);
+ const char * opt = NNG_OPT_REQ_RESENDTIME;
+
+ NUTS_PASS(nng_req0_open(&req));
+
+ NUTS_TRUE(nng_socket_set_ms(req, opt, 10) == 0);
+ NUTS_FAIL(nng_socket_set(req, opt, "", 1), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_get(req, opt, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_bool(req, opt, true), NNG_EBADTYPE);
+ NUTS_FAIL(nng_socket_get_bool(req, opt, &b), NNG_EBADTYPE);
+
+ NUTS_PASS(nng_socket_get_ms(req, opt, &d));
+ NUTS_TRUE(d == 10);
+ NUTS_CLOSE(req);
+}
+
+void
+test_req_recv_bad_state(void)
+{
+ nng_socket req;
+ nng_msg * msg = NULL;
+
+ NUTS_TRUE(nng_req0_open(&req) == 0);
+ NUTS_TRUE(nng_recvmsg(req, &msg, 0) == NNG_ESTATE);
+ NUTS_NULL(msg);
+ NUTS_CLOSE(req);
+}
+
+static void
+test_req_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ uint32_t req_id;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+
+ // The message will have a header that contains the 32-bit pipe ID,
+ // followed by the 32-bit request ID. We will discard the request
+ // ID before sending it out.
+ NUTS_TRUE(nng_msg_header_len(m) == 8);
+ NUTS_PASS(nng_msg_header_chop_u32(m, &req_id));
+
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+ NUTS_FAIL(nng_recvmsg(req, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+#define SECOND 1000
+
+void
+test_req_rep_exchange(void)
+{
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_TRUE(nng_req0_open(&req) == 0);
+ NUTS_TRUE(nng_rep0_open(&rep) == 0);
+
+ NUTS_TRUE(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ NUTS_TRUE(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, SECOND) == 0);
+ NUTS_TRUE(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND) == 0);
+ NUTS_TRUE(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, SECOND) == 0);
+
+ NUTS_MARRY(rep, req);
+
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep, "ping");
+ NUTS_SEND(rep, "pong");
+ NUTS_RECV(req, "pong");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_resend(void)
+{
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 10));
+
+ NUTS_MARRY(rep, req);
+
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep, "ping");
+ NUTS_RECV(rep, "ping");
+ NUTS_RECV(rep, "ping");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_resend_reconnect(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep1));
+ NUTS_PASS(nng_rep0_open(&rep2));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_SENDTIMEO, SECOND));
+ // We intentionally set the retry time long; that way we only see
+ // the retry from loss of our original peer.
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 60 * SECOND));
+
+ NUTS_MARRY(rep1, req);
+
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep1, "ping");
+
+ NUTS_CLOSE(rep1);
+ NUTS_MARRY(rep2, req);
+
+ NUTS_RECV(rep2, "ping");
+ NUTS_SEND(rep2, "rep2");
+ NUTS_RECV(req, "rep2");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep2);
+}
+
+void
+test_req_resend_disconnect(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep1));
+ NUTS_PASS(nng_rep0_open(&rep2));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_SENDTIMEO, SECOND));
+ // We intentionally set the retry time long; that way we only see
+ // the retry from loss of our original peer.
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 60 * SECOND));
+
+ NUTS_MARRY(rep1, req);
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep1, "ping");
+
+ NUTS_MARRY(rep2, req);
+ NUTS_CLOSE(rep1);
+
+ NUTS_RECV(rep2, "ping");
+ NUTS_SEND(rep2, "rep2");
+ NUTS_RECV(req, "rep2");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep2);
+}
+
+void
+test_req_disconnect_no_retry(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep1));
+ NUTS_PASS(nng_rep0_open(&rep2));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_SENDTIMEO, SECOND / 10));
+ // Setting the resend time to zero so we will force an error
+ // if the peer disconnects without sending us an answer.
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 0));
+
+ NUTS_MARRY(rep1, req);
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep1, "ping");
+
+ NUTS_MARRY(rep2, req);
+ NUTS_CLOSE(rep1);
+
+ nng_msg *msg = NULL;
+ NUTS_FAIL(nng_recvmsg(req, &msg, 0), NNG_ECONNRESET);
+ NUTS_FAIL(nng_recvmsg(rep2, &msg, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep2);
+}
+
+void
+test_req_disconnect_abort(void)
+{
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep1));
+ NUTS_PASS(nng_rep0_open(&rep2));
+ NUTS_PASS(nng_aio_alloc(&aio, 0, 0));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep2, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep1, NNG_OPT_SENDTIMEO, SECOND / 10));
+ // Setting the resend time to zero so we will force an error
+ // if the peer disconnects without sending us an answer.
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, 0));
+
+ NUTS_MARRY(rep1, req);
+ NUTS_SEND(req, "ping");
+ NUTS_RECV(rep1, "ping");
+ nng_recv_aio(req, aio);
+
+ NUTS_MARRY(rep2, req);
+ NUTS_CLOSE(rep1);
+
+ nng_msg *msg = NULL;
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECONNRESET);
+ NUTS_FAIL(nng_recvmsg(rep2, &msg, 0), NNG_ETIMEDOUT);
+ nng_aio_free(aio);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep2);
+}
+
+void
+test_req_cancel(void)
+{
+ nng_duration retry = SECOND;
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_rep_open(&rep));
+ NUTS_PASS(nng_req_open(&req));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 5 * SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 5 * SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, retry));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 16));
+
+ NUTS_MARRY(rep, req);
+
+ // Send req #1 (abc).
+ NUTS_SEND(req, "abc");
+
+ // Sleep a bit. This is so that we ensure that our request gets
+ // to the far side. (If we cancel too fast, then our outgoing send
+ // will be canceled before it gets to the peer.)
+ NUTS_SLEEP(100);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to abc.
+ NUTS_SEND(req, "def");
+
+ // Receive the first request (should be abc) on the REP server.
+ NUTS_RECV(rep, "abc");
+
+ // REP sends the reply to first command. This will be discarded
+ // by the REQ socket.
+ NUTS_SEND(rep, "abc");
+
+ // Now get the next command from the REP; should be "def".
+ NUTS_RECV(rep, "def");
+
+ // And send it back to REQ.
+ NUTS_SEND(rep, "def");
+
+ // And we got back only the second result.
+ NUTS_RECV(req, "def");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_cancel_abort_recv(void)
+{
+ nng_aio * aio;
+ nng_duration retry = SECOND * 10; // 10s (kind of never)
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_rep_open(&rep));
+ NUTS_PASS(nng_req_open(&req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_REQ_RESENDTIME, retry));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 16));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 5 * SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 5 * SECOND));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 5 * SECOND));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 5 * SECOND));
+
+ NUTS_MARRY(rep, req);
+
+ // Send req #1 (abc).
+ NUTS_SEND(req, "abc");
+
+ // Wait for it to get ot the other side.
+ NUTS_SLEEP(100);
+
+ nng_aio_set_timeout(aio, 5 * SECOND);
+ nng_recv_aio(req, aio);
+
+ // Give time for this recv to post properly.
+ NUTS_SLEEP(100);
+
+ // Send the next next request ("def"). Note that
+ // the REP side server will have already buffered the receive
+ // request, and should simply be waiting for us to reply to
+ // abc.
+ NUTS_SEND(req, "def");
+
+ // Our pending I/O should have been canceled.
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+
+ // Receive the first request (should be abc) on the REP server.
+ NUTS_RECV(rep, "abc");
+
+ // REP sends the reply to first command. This will be
+ // discarded by the REQ socket.
+ NUTS_SEND(rep, "abc");
+
+ // Now get the next command from the REP; should be "def".
+ NUTS_RECV(rep, "def");
+
+ // And send it back to REQ.
+ NUTS_SEND(rep, "def");
+
+ // Try a req command. This should give back "def"
+ NUTS_RECV(req, "def");
+
+ nng_aio_free(aio);
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_req_cancel_post_recv(void)
+{
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_MARRY(req, rep);
+
+ NUTS_SEND(req, "ONE");
+ NUTS_RECV(rep, "ONE");
+ NUTS_SEND(rep, "one");
+ NUTS_SLEEP(100); // Make sure reply arrives!
+ NUTS_SEND(req, "TWO");
+ NUTS_RECV(rep, "TWO");
+ NUTS_SEND(rep, "two");
+ NUTS_RECV(req, "two");
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_poll_writeable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable before connect.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_MARRY(req, rep);
+
+ // It should be writable now.
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // Submit a bunch of jobs. Note that we have to stall a bit
+ // between each message to let it queue up.
+ for (int i = 0; i < 10; i++) {
+ int rv = nng_send(req, "", 0, NNG_FLAG_NONBLOCK);
+ if (rv == NNG_EAGAIN) {
+ break;
+ }
+ NUTS_PASS(rv);
+ NUTS_SLEEP(50);
+ }
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_poll_contention(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_aio * aio;
+ nng_ctx ctx[5];
+ nng_aio * ctx_aio[5];
+ nng_msg * ctx_msg[5];
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 1));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+
+ for (int i = 0; i < 5; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], req));
+ NUTS_PASS(nng_aio_alloc(&ctx_aio[i], NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&ctx_msg[i], 0));
+ }
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable before connect.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ nng_aio_set_msg(aio, msg);
+ nng_send_aio(req, aio);
+ for (int i = 0; i < 5; i++) {
+ nng_aio_set_msg(ctx_aio[i], ctx_msg[i]);
+ nng_ctx_send(ctx[i], ctx_aio[i]);
+ }
+ NUTS_SLEEP(50); // so everything is queued steady state
+
+ NUTS_MARRY(req, rep);
+
+ // It should not be writable now.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0));
+ nng_msg_free(msg);
+
+ // Still not writeable...
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ for (int i = 0; i < 5; i++) {
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0));
+ nng_msg_free(msg);
+ }
+ // It can take a little bit of time for the eased back-pressure
+ // to reflect across the network.
+ NUTS_SLEEP(100);
+
+ // Should be come writeable now...
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ for (int i = 0; i < 5; i++) {
+ nng_aio_free(ctx_aio[i]);
+ }
+ nng_aio_free(aio);
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+void
+test_req_poll_multi_pipe(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep1;
+ nng_socket rep2;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep1));
+ NUTS_PASS(nng_rep0_open(&rep2));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable before connect.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_MARRY(req, rep1);
+ NUTS_MARRY(req, rep2);
+
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+ NUTS_SEND(req, "ONE");
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep1);
+ NUTS_CLOSE(rep2);
+}
+
+void
+test_req_poll_readable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(req, rep);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_msg_append(msg, "xyz", 3));
+ NUTS_PASS(nng_sendmsg(req, msg, 0));
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0)); // recv on rep
+ NUTS_PASS(nng_sendmsg(rep, msg, 0)); // echo it back
+ NUTS_SLEEP(200); // give time for message to arrive
+
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ // and receiving makes it no longer ready
+ NUTS_PASS(nng_recvmsg(req, &msg, 0));
+ nng_msg_free(msg);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // TODO verify unsolicited response
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_req_ctx_no_poll(void)
+{
+ int fd;
+ nng_socket req;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+ NUTS_FAIL(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP);
+ NUTS_FAIL(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP);
+ NUTS_PASS(nng_ctx_close(ctx));
+ NUTS_CLOSE(req);
+}
+
+static void
+test_req_ctx_send_queued(void)
+{
+ nng_socket req;
+ nng_socket rep;
+ nng_ctx ctx[3];
+ nng_aio * aio[3];
+ nng_msg * msg[3];
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 100));
+
+ for (int i = 0; i < 3; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], req));
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg[i], 0));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_set_msg(aio[i], msg[i]);
+ nng_ctx_send(ctx[i], aio[i]);
+ }
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_SLEEP(50); // Only to ensure stuff queues up
+ for (int i = 0; i < 3; i++) {
+ nng_msg *m;
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+ nng_msg_free(m);
+ }
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+ for (int i = 0; i < 3; i++) {
+ nng_aio_wait(aio[i]);
+ NUTS_PASS(nng_aio_result(aio[i]));
+ nng_aio_free(aio[i]);
+ }
+}
+
+static void
+test_req_ctx_send_close(void)
+{
+ nng_socket req;
+ nng_ctx ctx[3];
+ nng_aio * aio[3];
+ nng_msg * msg[3];
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ for (int i = 0; i < 3; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], req));
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg[i], 0));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_set_msg(aio[i], msg[i]);
+ nng_ctx_send(ctx[i], aio[i]);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_ctx_close(ctx[i]);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_wait(aio[i]);
+ NUTS_FAIL(nng_aio_result(aio[i]), NNG_ECLOSED);
+ nng_aio_free(aio[i]);
+ nng_msg_free(msg[i]);
+ }
+ NUTS_CLOSE(req);
+}
+
+static void
+test_req_ctx_send_abort(void)
+{
+ nng_socket req;
+ nng_ctx ctx[3];
+ nng_aio * aio[3];
+ nng_msg * msg[3];
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ for (int i = 0; i < 3; i++) {
+ NUTS_PASS(nng_ctx_open(&ctx[i], req));
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg[i], 0));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_set_msg(aio[i], msg[i]);
+ nng_ctx_send(ctx[i], aio[i]);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_abort(aio[i], NNG_ECANCELED);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ nng_aio_wait(aio[i]);
+ NUTS_FAIL(nng_aio_result(aio[i]), NNG_ECANCELED);
+ nng_aio_free(aio[i]);
+ nng_msg_free(msg[i]);
+ }
+ NUTS_CLOSE(req);
+}
+
+static void
+test_req_ctx_send_twice(void)
+{
+ nng_socket req;
+ nng_ctx ctx;
+ nng_aio * aio[2];
+ nng_msg * msg[2];
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+
+ for (int i = 0; i < 2; i++) {
+ NUTS_PASS(nng_aio_alloc(&aio[i], NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg[i], 0));
+ }
+
+ for (int i = 0; i < 2; i++) {
+ nng_aio_set_msg(aio[i], msg[i]);
+ nng_ctx_send(ctx, aio[i]);
+ NUTS_SLEEP(50);
+ }
+
+ NUTS_CLOSE(req);
+ nng_aio_wait(aio[0]);
+ nng_aio_wait(aio[1]);
+ NUTS_FAIL(nng_aio_result(aio[0]), NNG_ECANCELED);
+ NUTS_FAIL(nng_aio_result(aio[1]), NNG_ECLOSED);
+
+ for (int i = 0; i < 2; i++) {
+ nng_aio_free(aio[i]);
+ nng_msg_free(msg[i]);
+ }
+}
+
+static void
+test_req_ctx_recv_nonblock(void)
+{
+ nng_socket req;
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+
+ NUTS_MARRY(req, rep);
+
+ nng_aio_set_msg(aio, msg);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ NUTS_PASS(nng_aio_result(aio));
+ nng_aio_set_timeout(aio, 0); // Instant timeout
+ nng_ctx_recv(ctx, aio);
+
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT);
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+static void
+test_req_ctx_send_nonblock(void)
+{
+ nng_socket req;
+ nng_ctx ctx;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+
+ nng_aio_set_msg(aio, msg);
+ nng_aio_set_timeout(aio, 0); // Instant timeout
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT);
+ NUTS_CLOSE(req);
+ nng_aio_free(aio);
+ nng_msg_free(msg);
+}
+
+static void
+test_req_ctx_recv_close_socket(void)
+{
+ nng_socket req;
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+ nng_msg * m;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_ctx_open(&ctx, req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_MARRY(req, rep);
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ nng_aio_set_msg(aio, m);
+ nng_ctx_send(ctx, aio);
+ nng_aio_wait(aio);
+ NUTS_PASS(nng_aio_result(aio));
+
+ nng_ctx_recv(ctx, aio);
+ nng_close(req);
+
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED);
+ nng_aio_free(aio);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_req_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char * addr;
+
+ NUTS_ADDR(addr, "inproc");
+
+ NUTS_PASS(nng_req0_open(&s1));
+ NUTS_PASS(nng_req0_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ nng_stats_free(stats);
+}
+
+NUTS_TESTS = {
+ { "req identity", test_req_identity },
+ { "req ttl option", test_req_ttl_option },
+ { "req resend option", test_req_resend_option },
+ { "req recv bad state", test_req_recv_bad_state },
+ { "req recv garbage", test_req_recv_garbage },
+ { "req rep exchange", test_req_rep_exchange },
+ { "req resend", test_req_resend },
+ { "req resend disconnect", test_req_resend_disconnect },
+ { "req disconnect no retry", test_req_disconnect_no_retry },
+ { "req disconnect abort", test_req_disconnect_abort },
+ { "req resend reconnect", test_req_resend_reconnect },
+ { "req cancel", test_req_cancel },
+ { "req cancel abort recv", test_req_cancel_abort_recv },
+ { "req cancel post recv", test_req_cancel_post_recv },
+ { "req poll writable", test_req_poll_writeable },
+ { "req poll contention", test_req_poll_contention },
+ { "req poll multi pipe", test_req_poll_multi_pipe },
+ { "req poll readable", test_req_poll_readable },
+ { "req context send queued", test_req_ctx_send_queued },
+ { "req context send close", test_req_ctx_send_close },
+ { "req context send abort", test_req_ctx_send_abort },
+ { "req context send twice", test_req_ctx_send_twice },
+ { "req context does not poll", test_req_ctx_no_poll },
+ { "req context recv close socket", test_req_ctx_recv_close_socket },
+ { "req context recv nonblock", test_req_ctx_recv_nonblock },
+ { "req context send nonblock", test_req_ctx_send_nonblock },
+ { "req validate peer", test_req_validate_peer },
+ { NULL, NULL },
+};
diff --git a/src/sp/protocol/reqrep0/xrep.c b/src/sp/protocol/reqrep0/xrep.c
new file mode 100644
index 00000000..9737c600
--- /dev/null
+++ b/src/sp/protocol/reqrep0/xrep.c
@@ -0,0 +1,432 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/reqrep0/rep.h"
+
+// Response protocol in raw mode. The REP protocol is the "reply" side of a
+// request-reply pair. This is useful for building RPC servers, for
+// example.
+
+typedef struct xrep0_pipe xrep0_pipe;
+typedef struct xrep0_sock xrep0_sock;
+
+static void xrep0_sock_getq_cb(void *);
+static void xrep0_pipe_getq_cb(void *);
+static void xrep0_pipe_putq_cb(void *);
+static void xrep0_pipe_send_cb(void *);
+static void xrep0_pipe_recv_cb(void *);
+static void xrep0_pipe_fini(void *);
+
+// xrep0_sock is our per-socket protocol private structure.
+struct xrep0_sock {
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_mtx lk;
+ nni_atomic_int ttl;
+ nni_id_map pipes;
+ nni_aio aio_getq;
+};
+
+// xrep0_pipe is our per-pipe protocol private structure.
+struct xrep0_pipe {
+ nni_pipe * pipe;
+ xrep0_sock *rep;
+ nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+};
+
+static void
+xrep0_sock_fini(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ nni_aio_fini(&s->aio_getq);
+ nni_id_map_fini(&s->pipes);
+ nni_mtx_fini(&s->lk);
+}
+
+static int
+xrep0_sock_init(void *arg, nni_sock *sock)
+{
+ xrep0_sock *s = arg;
+
+ nni_mtx_init(&s->lk);
+ nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s);
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8); // Per RFC
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+
+ nni_id_map_init(&s->pipes, 0, 0, false);
+ return (0);
+}
+
+static void
+xrep0_sock_open(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ // This starts us retrieving message from the upper write q.
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
+}
+
+static void
+xrep0_sock_close(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ nni_aio_close(&s->aio_getq);
+}
+
+static void
+xrep0_pipe_stop(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ nni_aio_stop(&p->aio_getq);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_putq);
+}
+
+static void
+xrep0_pipe_fini(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ nni_aio_fini(&p->aio_getq);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_putq);
+ nni_msgq_fini(p->sendq);
+}
+
+static int
+xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
+{
+ xrep0_pipe *p = arg;
+ int rv;
+
+ nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p);
+ nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p);
+ nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p);
+
+ p->pipe = pipe;
+ p->rep = s;
+
+ // We want a pretty deep send queue on pipes. The rationale here is
+ // that the send rate will be mitigated by the receive rate.
+ // If a slow pipe (req pipe not reading its own responses!?)
+ // comes up, then we will start discarding its replies eventually,
+ // but it takes some time. It would be poor form for a peer to
+ // smash us with requests, but be unable to handle replies faster
+ // than we can forward them. If they do that, their replies get
+ // dropped. (From a DDoS perspective, it might be nice in the
+ // future if we had a way to exert back pressure to the send side --
+ // essentially don't let peers send requests faster than they are
+ // willing to receive replies. Something to think about for the
+ // future.)
+ if ((rv = nni_msgq_init(&p->sendq, 64)) != 0) {
+ xrep0_pipe_fini(p);
+ return (rv);
+ }
+ return (0);
+}
+
+static int
+xrep0_pipe_start(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+ int rv;
+
+ if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
+ nni_mtx_lock(&s->lk);
+ rv = nni_id_set(&s->pipes, nni_pipe_id(p->pipe), p);
+ nni_mtx_unlock(&s->lk);
+ if (rv != 0) {
+ return (rv);
+ }
+
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ return (0);
+}
+
+static void
+xrep0_pipe_close(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+
+ nni_aio_close(&p->aio_getq);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_putq);
+ nni_msgq_close(p->sendq);
+
+ nni_mtx_lock(&s->lk);
+ nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+xrep0_sock_getq_cb(void *arg)
+{
+ xrep0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ xrep0_pipe *p;
+
+ // This watches for messages from the upper write queue,
+ // extracts the destination pipe, and forwards it to the appropriate
+ // destination pipe via a separate queue. This prevents a single bad
+ // or slow pipe from gumming up the works for the entire socket.
+
+ if (nni_aio_result(&s->aio_getq) != 0) {
+ // Closed socket?
+ return;
+ }
+
+ msg = nni_aio_get_msg(&s->aio_getq);
+ nni_aio_set_msg(&s->aio_getq, NULL);
+
+ // We yank the outgoing pipe id from the header
+ if (nni_msg_header_len(msg) < 4) {
+ nni_msg_free(msg);
+
+ // Look for another message on the upper write queue.
+ nni_msgq_aio_get(uwq, &s->aio_getq);
+ return;
+ }
+
+ id = nni_msg_header_trim_u32(msg);
+
+ // Look for the pipe, and attempt to put the message there
+ // (non-blocking) if we can. If we can't for any reason, then we
+ // free the message.
+ nni_mtx_lock(&s->lk);
+ if (((p = nni_id_get(&s->pipes, id)) == NULL) ||
+ (nni_msgq_tryput(p->sendq, msg) != 0)) {
+ nni_msg_free(msg);
+ }
+ nni_mtx_unlock(&s->lk);
+
+ // Now look for another message on the upper write queue.
+ nni_msgq_aio_get(uwq, &s->aio_getq);
+}
+
+static void
+xrep0_pipe_getq_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_getq) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
+ nni_aio_set_msg(&p->aio_getq, NULL);
+
+ nni_pipe_send(p->pipe, &p->aio_send);
+}
+
+static void
+xrep0_pipe_send_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
+}
+
+static void
+xrep0_pipe_recv_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+ nni_msg * msg;
+ int hops;
+ int ttl;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ ttl = nni_atomic_get(&s->ttl);
+
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ // Store the pipe id in the header, first thing.
+ nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
+
+ // Move backtrace from body to header
+ hops = 1;
+ for (;;) {
+ bool end;
+ uint8_t *body;
+ if (hops > ttl) {
+ // This isn't malformed, but it has gone through
+ // too many hops. Do not disconnect, because we
+ // can legitimately receive messages with too many
+ // hops from devices, etc.
+ goto drop;
+ }
+ hops++;
+ if (nni_msg_len(msg) < 4) {
+ // Peer is speaking garbage. Kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
+ // Out of memory most likely, but keep going to
+ // avoid breaking things.
+ goto drop;
+ }
+ nni_msg_trim(msg, 4);
+ if (end) {
+ break;
+ }
+ }
+
+ // Go ahead and send it up.
+ nni_aio_set_msg(&p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, &p->aio_putq);
+ return;
+
+drop:
+ nni_msg_free(msg);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+}
+
+static void
+xrep0_pipe_putq_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_putq));
+ nni_aio_set_msg(&p->aio_putq, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+}
+
+static int
+xrep0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ xrep0_sock *s = arg;
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
+}
+
+static int
+xrep0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ xrep0_sock *s = arg;
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
+}
+
+static void
+xrep0_sock_send(void *arg, nni_aio *aio)
+{
+ xrep0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+xrep0_sock_recv(void *arg, nni_aio *aio)
+{
+ xrep0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe_ops xrep0_pipe_ops = {
+ .pipe_size = sizeof(xrep0_pipe),
+ .pipe_init = xrep0_pipe_init,
+ .pipe_fini = xrep0_pipe_fini,
+ .pipe_start = xrep0_pipe_start,
+ .pipe_close = xrep0_pipe_close,
+ .pipe_stop = xrep0_pipe_stop,
+};
+
+static nni_option xrep0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_MAXTTL,
+ .o_get = xrep0_sock_get_maxttl,
+ .o_set = xrep0_sock_set_maxttl,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xrep0_sock_ops = {
+ .sock_size = sizeof(xrep0_sock),
+ .sock_init = xrep0_sock_init,
+ .sock_fini = xrep0_sock_fini,
+ .sock_open = xrep0_sock_open,
+ .sock_close = xrep0_sock_close,
+ .sock_options = xrep0_sock_options,
+ .sock_send = xrep0_sock_send,
+ .sock_recv = xrep0_sock_recv,
+};
+
+static nni_proto xrep0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME },
+ .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xrep0_sock_ops,
+ .proto_pipe_ops = &xrep0_pipe_ops,
+};
+
+int
+nng_rep0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xrep0_proto));
+}
diff --git a/src/sp/protocol/reqrep0/xrep_test.c b/src/sp/protocol/reqrep0/xrep_test.c
new file mode 100644
index 00000000..6f1564eb
--- /dev/null
+++ b/src/sp/protocol/reqrep0/xrep_test.c
@@ -0,0 +1,434 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+static void
+test_xrep_identity(void)
+{
+ nng_socket s;
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ NUTS_PASS(nng_rep0_open_raw(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p1));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p2));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n1));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n2));
+ NUTS_CLOSE(s);
+ NUTS_TRUE(p1 == NNG_REP0_SELF);
+ NUTS_TRUE(p2 == NNG_REP0_PEER);
+ NUTS_MATCH(n1, NNG_REP0_SELF_NAME);
+ NUTS_MATCH(n2, NNG_REP0_PEER_NAME);
+ nng_strfree(n1);
+ nng_strfree(n2);
+}
+
+static void
+test_xrep_raw(void)
+{
+ nng_socket s;
+ bool b;
+
+ NUTS_PASS(nng_rep0_open_raw(&s));
+ NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b));
+ NUTS_TRUE(b);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_xrep_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_rep0_open_raw(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_xrep_poll_writeable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_get_int(rep, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // We are always writeable, even before connect. This is so that
+ // back-pressure from a bad peer can't trash others. We assume
+ // that peers won't send us requests faster than they can consume
+ // the answers. If they do, they will lose their answers.
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ NUTS_MARRY(req, rep);
+
+ // Now it's writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_poll_readable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_socket_get_int(rep, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(req, rep);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(req, "abc");
+ NUTS_SLEEP(100);
+
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ // and receiving makes it no longer ready
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0));
+ nng_msg_free(msg);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char *addr;
+
+ NUTS_ADDR(addr, "inproc");
+
+ NUTS_PASS(nng_rep0_open_raw(&s1));
+ NUTS_PASS(nng_rep0_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_PASS(nng_close(s1));
+ NUTS_PASS(nng_close(s2));
+ nng_stats_free(stats);
+}
+
+static void
+test_xrep_close_pipe_before_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p;
+ nng_aio * aio1;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+
+ NUTS_MARRY(req, rep);
+ NUTS_SEND(req, "test");
+
+ nng_recv_aio(rep, aio1);
+ nng_aio_wait(aio1);
+ NUTS_PASS(nng_aio_result(aio1));
+ NUTS_TRUE((m = nng_aio_get_msg(aio1)) != NULL);
+ p = nng_msg_get_pipe(m);
+ NUTS_PASS(nng_pipe_close(p));
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio1);
+}
+
+static void
+test_xrep_close_pipe_during_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 200));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_SENDBUF, 20));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 20));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 20));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_RECVBUF, 1));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, (unsigned) 0x81000000u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+ p = nng_msg_get_pipe(m);
+ nng_msg_free(m);
+
+ for (int i = 0; i < 100; i++) {
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p)));
+ NUTS_PASS(
+ nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u));
+ // xrep does not exert back-pressure
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+ }
+ NUTS_PASS(nng_pipe_close(p));
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_close_during_recv(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 5));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 20));
+
+ NUTS_MARRY(req, rep);
+
+ for (unsigned i = 0; i < 100; i++) {
+ int rv;
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u));
+ rv = nng_sendmsg(req, m, 0);
+ if (rv == NNG_ETIMEDOUT) {
+ nng_msg_free(m);
+ break;
+ }
+ }
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_recv_aio_stopped(void)
+{
+ nng_socket rep;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_stop(aio);
+ nng_recv_aio(rep, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_CLOSE(rep);
+ nng_aio_free(aio);
+}
+
+static void
+test_xrep_send_no_header(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+ NUTS_FAIL(nng_recvmsg(rep, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_append_u32(m, 1u));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ NUTS_FAIL(nng_recvmsg(rep, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xrep_ttl_option(void)
+{
+ nng_socket rep;
+ int v;
+ bool b;
+ size_t sz;
+ const char *opt = NNG_OPT_MAXTTL;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+
+ NUTS_PASS(nng_socket_set_int(rep, opt, 1));
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 0), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 16), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 256), NNG_EINVAL);
+ NUTS_PASS(nng_socket_set_int(rep, opt, 3));
+ NUTS_PASS(nng_socket_get_int(rep, opt, &v));
+ NUTS_TRUE(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ NUTS_PASS(nng_socket_get(rep, opt, &v, &sz));
+ NUTS_TRUE(v == 3);
+ NUTS_TRUE(sz == sizeof(v));
+
+ NUTS_TRUE(nng_socket_set(rep, opt, "", 1) == NNG_EINVAL);
+ sz = 1;
+ NUTS_TRUE(nng_socket_get(rep, opt, &v, &sz) == NNG_EINVAL);
+ NUTS_TRUE(nng_socket_set_bool(rep, opt, true) == NNG_EBADTYPE);
+ NUTS_TRUE(nng_socket_get_bool(rep, opt, &b) == NNG_EBADTYPE);
+
+ NUTS_TRUE(nng_close(rep) == 0);
+}
+
+static void
+test_xrep_ttl_drop(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_MAXTTL, 3));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 200));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ // Send messages. Note that xrep implicitly adds a hop on receive.
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_append_u32(m, 1u)); // 2 hops
+ NUTS_PASS(nng_msg_append_u32(m, 0x80000001u));
+ NUTS_PASS(nng_msg_append(m, "PASS1", 6));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard!
+ NUTS_PASS(nng_msg_append_u32(m, 2u));
+ NUTS_PASS(nng_msg_append_u32(m, 3u));
+ NUTS_PASS(nng_msg_append_u32(m, 0x80000002u));
+ NUTS_PASS(nng_msg_append(m, "FAIL2", 6));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes
+ NUTS_PASS(nng_msg_append_u32(m, 2u));
+ NUTS_PASS(nng_msg_append_u32(m, 0x80000003u));
+ NUTS_PASS(nng_msg_append(m, "PASS3", 6));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard!
+ NUTS_PASS(nng_msg_append_u32(m, 2u));
+ NUTS_PASS(nng_msg_append_u32(m, 3u));
+ NUTS_PASS(nng_msg_append_u32(m, 0x80000003u));
+ NUTS_PASS(nng_msg_append(m, "FAIL4", 6));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ // So on receive we should see 80000001 and 80000003.
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+ NUTS_TRUE(nng_msg_header_len(m) == 12);
+ NUTS_TRUE(nng_msg_len(m) == 6);
+ NUTS_TRUE(strcmp(nng_msg_body(m), "PASS1") == 0);
+ nng_msg_free(m);
+
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+ NUTS_TRUE(nng_msg_header_len(m) == 16); // 3 hops + ID
+ NUTS_TRUE(nng_msg_len(m) == 6);
+ NUTS_TRUE(strcmp(nng_msg_body(m), "PASS3") == 0);
+ nng_msg_free(m);
+
+ NUTS_FAIL(nng_recvmsg(rep, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+NUTS_TESTS = {
+ { "xrep identity", test_xrep_identity },
+ { "xrep raw", test_xrep_raw },
+ { "xrep no context", test_xrep_no_context },
+ { "xrep poll readable", test_xrep_poll_readable },
+ { "xrep poll writable", test_xrep_poll_writeable },
+ { "xrep validate peer", test_xrep_validate_peer },
+ { "xrep close pipe before send", test_xrep_close_pipe_before_send },
+ { "xrep close pipe during send", test_xrep_close_pipe_during_send },
+ { "xrep close during recv", test_xrep_close_during_recv },
+ { "xrep recv aio stopped", test_xrep_recv_aio_stopped },
+ { "xrep send no header", test_xrep_send_no_header },
+ { "xrep recv garbage", test_xrep_recv_garbage },
+ { "xrep ttl option", test_xrep_ttl_option },
+ { "xrep ttl drop", test_xrep_ttl_drop },
+ { NULL, NULL },
+};
diff --git a/src/sp/protocol/reqrep0/xreq.c b/src/sp/protocol/reqrep0/xreq.c
new file mode 100644
index 00000000..bcb218bf
--- /dev/null
+++ b/src/sp/protocol/reqrep0/xreq.c
@@ -0,0 +1,319 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdio.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/reqrep0/req.h"
+
+// Request protocol. The REQ protocol is the "request" side of a
+// request-reply pair. This is useful for building RPC clients, for example.
+
+typedef struct xreq0_pipe xreq0_pipe;
+typedef struct xreq0_sock xreq0_sock;
+
+// An xreq0_sock is our per-socket protocol private structure.
+struct xreq0_sock {
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_atomic_int ttl;
+};
+
+// A req0_pipe is our per-pipe protocol private structure.
+struct xreq0_pipe {
+ nni_pipe * pipe;
+ xreq0_sock *req;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+};
+
+static void xreq0_sock_fini(void *);
+static void xreq0_getq_cb(void *);
+static void xreq0_send_cb(void *);
+static void xreq0_recv_cb(void *);
+static void xreq0_putq_cb(void *);
+
+static int
+xreq0_sock_init(void *arg, nni_sock *sock)
+{
+ xreq0_sock *s = arg;
+
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+
+ return (0);
+}
+
+static void
+xreq0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xreq0_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xreq0_sock_fini(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xreq0_pipe_stop(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_stop(&p->aio_getq);
+ nni_aio_stop(&p->aio_putq);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+}
+
+static void
+xreq0_pipe_fini(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_fini(&p->aio_getq);
+ nni_aio_fini(&p->aio_putq);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+}
+
+static int
+xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_init(&p->aio_getq, xreq0_getq_cb, p);
+ nni_aio_init(&p->aio_putq, xreq0_putq_cb, p);
+ nni_aio_init(&p->aio_recv, xreq0_recv_cb, p);
+ nni_aio_init(&p->aio_send, xreq0_send_cb, p);
+
+ p->pipe = pipe;
+ p->req = s;
+ return (0);
+}
+
+static int
+xreq0_pipe_start(void *arg)
+{
+ xreq0_pipe *p = arg;
+ xreq0_sock *s = p->req;
+
+ if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
+ return (NNG_EPROTO);
+ }
+
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ return (0);
+}
+
+static void
+xreq0_pipe_close(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_close(&p->aio_getq);
+ nni_aio_close(&p->aio_putq);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_send);
+}
+
+// For raw mode we can just let the pipes "contend" via get queue to get a
+// message from the upper write queue. The msg queue implementation
+// actually provides ordering, so load will be spread automatically.
+// (NB: We may have to revise this in the future if we want to provide some
+// kind of priority.)
+
+static void
+xreq0_getq_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_getq) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
+ nni_aio_set_msg(&p->aio_getq, NULL);
+
+ nni_pipe_send(p->pipe, &p->aio_send);
+}
+
+static void
+xreq0_send_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ // Sent a message so we just need to look for another one.
+ nni_msgq_aio_get(p->req->uwq, &p->aio_getq);
+}
+
+static void
+xreq0_putq_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_putq));
+ nni_aio_set_msg(&p->aio_putq, NULL);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_aio_set_msg(&p->aio_putq, NULL);
+
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+}
+
+static void
+xreq0_recv_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+ xreq0_sock *sock = p->req;
+ nni_msg * msg;
+ bool end;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ end = false;
+
+ while (!end) {
+ uint8_t *body;
+
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+
+ if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) {
+ // TODO: bump a no-memory stat
+ nni_msg_free(msg);
+ // Closing the pipe may release some memory.
+ // It at least gives an indication to the peer
+ // that we've lost the message.
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_msg_trim(msg, sizeof (uint32_t));
+ }
+ nni_aio_set_msg(&p->aio_putq, msg);
+ nni_msgq_aio_put(sock->urq, &p->aio_putq);
+}
+
+static void
+xreq0_sock_send(void *arg, nni_aio *aio)
+{
+ xreq0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+xreq0_sock_recv(void *arg, nni_aio *aio)
+{
+ xreq0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static int
+xreq0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ xreq0_sock *s = arg;
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
+}
+
+static int
+xreq0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ xreq0_sock *s = arg;
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
+}
+
+static nni_proto_pipe_ops xreq0_pipe_ops = {
+ .pipe_size = sizeof(xreq0_pipe),
+ .pipe_init = xreq0_pipe_init,
+ .pipe_fini = xreq0_pipe_fini,
+ .pipe_start = xreq0_pipe_start,
+ .pipe_close = xreq0_pipe_close,
+ .pipe_stop = xreq0_pipe_stop,
+};
+
+static nni_option xreq0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_MAXTTL,
+ .o_get = xreq0_sock_get_max_ttl,
+ .o_set = xreq0_sock_set_max_ttl,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xreq0_sock_ops = {
+ .sock_size = sizeof(xreq0_sock),
+ .sock_init = xreq0_sock_init,
+ .sock_fini = xreq0_sock_fini,
+ .sock_open = xreq0_sock_open,
+ .sock_close = xreq0_sock_close,
+ .sock_options = xreq0_sock_options,
+ .sock_send = xreq0_sock_send,
+ .sock_recv = xreq0_sock_recv,
+};
+
+static nni_proto xreq0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME },
+ .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xreq0_sock_ops,
+ .proto_pipe_ops = &xreq0_pipe_ops,
+ .proto_ctx_ops = NULL, // raw mode does not support contexts
+};
+
+int
+nng_req0_open_raw(nng_socket *sock)
+{
+ return (nni_proto_open(sock, &xreq0_proto));
+}
diff --git a/src/sp/protocol/reqrep0/xreq_test.c b/src/sp/protocol/reqrep0/xreq_test.c
new file mode 100644
index 00000000..8c850cba
--- /dev/null
+++ b/src/sp/protocol/reqrep0/xreq_test.c
@@ -0,0 +1,367 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+static void
+test_xreq_identity(void)
+{
+ nng_socket s;
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ NUTS_PASS(nng_req0_open_raw(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p1));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p2));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n1));
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n2));
+ NUTS_CLOSE(s);
+ NUTS_TRUE(p1 == NNG_REQ0_SELF);
+ NUTS_TRUE(p2 == NNG_REQ0_PEER);
+ NUTS_MATCH(n1, NNG_REQ0_SELF_NAME);
+ NUTS_MATCH(n2, NNG_REQ0_PEER_NAME);
+ nng_strfree(n1);
+ nng_strfree(n2);
+}
+
+static void
+test_xreq_raw(void)
+{
+ nng_socket s;
+ bool b;
+
+ NUTS_PASS(nng_req0_open_raw(&s));
+ NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b));
+ NUTS_TRUE(b);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_xreq_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_req0_open_raw(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_xreq_poll_writeable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // We can't write until we have a connection.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_MARRY(req, rep);
+
+ // Now it's writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_poll_readable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_rep0_open(&rep));
+ NUTS_PASS(nng_socket_get_int(req, NNG_OPT_RECVFD, &fd));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(req, rep);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ // Request ID
+ NUTS_PASS(nng_msg_append_u32(msg, 0x80000000));
+ NUTS_PASS(nng_sendmsg(req, msg, 0));
+
+ NUTS_PASS(nng_recvmsg(rep, &msg, 0));
+ NUTS_PASS(nng_sendmsg(rep, msg, 0));
+
+ NUTS_SLEEP(100);
+
+ NUTS_TRUE(nuts_poll_fd(fd) == true);
+
+ // and receiving makes it no longer ready
+ NUTS_PASS(nng_recvmsg(req, &msg, 0));
+ nng_msg_free(msg);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char * addr;
+
+ NUTS_ADDR(addr, "inproc");
+
+ NUTS_PASS(nng_req0_open_raw(&s1));
+ NUTS_PASS(nng_req0_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ nng_stats_free(stats);
+}
+
+static void
+test_xreq_recv_aio_stopped(void)
+{
+ nng_socket req;
+ nng_aio * aio;
+
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_stop(aio);
+ nng_recv_aio(req, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ NUTS_CLOSE(req);
+ nng_aio_free(aio);
+}
+
+static void
+test_xreq_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ uint32_t req_id;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 100));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY(req, rep);
+
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_append_u32(m, 0x80000000));
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+
+ NUTS_PASS(nng_recvmsg(rep, &m, 0));
+
+ // The message will have a header that contains the 32-bit pipe ID,
+ // followed by the 32-bit request ID. We will discard the request
+ // ID before sending it out.
+ NUTS_TRUE(nng_msg_header_len(m) == 8);
+ NUTS_PASS(nng_msg_header_chop_u32(m, &req_id));
+ NUTS_TRUE(req_id == 0x80000000);
+
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+ NUTS_FAIL(nng_recvmsg(req, &m, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_recv_header(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1, p2;
+ uint32_t id;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+
+ NUTS_MARRY_EX(req, rep, NULL, &p1, &p2);
+
+ // Simulate a few hops.
+ NUTS_PASS(nng_msg_alloc(&m, 0));
+ NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ NUTS_PASS(nng_msg_header_append_u32(m, 0x2));
+ NUTS_PASS(nng_msg_header_append_u32(m, 0x1));
+ NUTS_PASS(nng_msg_header_append_u32(m, 0x80000123u));
+
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+
+ NUTS_PASS(nng_recvmsg(req, &m, 0));
+ NUTS_TRUE(nng_msg_header_len(m) == 12);
+ NUTS_PASS(nng_msg_header_trim_u32(m, &id));
+ NUTS_TRUE(id == 0x2);
+ NUTS_PASS(nng_msg_header_trim_u32(m, &id));
+ NUTS_TRUE(id == 0x1);
+ NUTS_PASS(nng_msg_header_trim_u32(m, &id));
+ NUTS_TRUE(id == 0x80000123u);
+
+ nng_msg_free(m);
+
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_close_during_recv(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_RECVBUF, 5));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_SENDBUF, 20));
+
+ NUTS_MARRY_EX(req, rep, NULL, &p1, &p2);
+ NUTS_TRUE(nng_pipe_id(p1) > 0);
+ NUTS_TRUE(nng_pipe_id(p2) > 0);
+
+ for (unsigned i = 0; i < 20; i++) {
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u));
+ NUTS_SLEEP(10);
+ NUTS_PASS(nng_sendmsg(rep, m, 0));
+ }
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_close_pipe_during_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ NUTS_PASS(nng_rep0_open_raw(&rep));
+ NUTS_PASS(nng_req0_open_raw(&req));
+ NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 100));
+ NUTS_PASS(nng_socket_set_int(rep, NNG_OPT_RECVBUF, 5));
+ NUTS_PASS(nng_socket_set_int(req, NNG_OPT_SENDBUF, 20));
+
+ NUTS_MARRY_EX(req, rep, NULL, &p1, &p2);
+ NUTS_TRUE(nng_pipe_id(p1) > 0);
+ NUTS_TRUE(nng_pipe_id(p2) > 0);
+
+ for (unsigned i = 0; i < 20; i++) {
+ NUTS_PASS(nng_msg_alloc(&m, 4));
+ NUTS_PASS(nng_msg_header_append_u32(m, i | 0x80000000u));
+ NUTS_SLEEP(10);
+ NUTS_PASS(nng_sendmsg(req, m, 0));
+ }
+
+ NUTS_PASS(nng_pipe_close(p1));
+ NUTS_CLOSE(req);
+ NUTS_CLOSE(rep);
+}
+
+static void
+test_xreq_ttl_option(void)
+{
+ nng_socket rep;
+ int v;
+ bool b;
+ size_t sz;
+ const char *opt = NNG_OPT_MAXTTL;
+
+ NUTS_PASS(nng_req0_open_raw(&rep));
+
+ NUTS_PASS(nng_socket_set_int(rep, opt, 1));
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 0), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 16), NNG_EINVAL);
+ NUTS_FAIL(nng_socket_set_int(rep, opt, 256), NNG_EINVAL);
+ NUTS_PASS(nng_socket_set_int(rep, opt, 3));
+ NUTS_PASS(nng_socket_get_int(rep, opt, &v));
+ NUTS_TRUE(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ NUTS_PASS(nng_socket_get(rep, opt, &v, &sz));
+ NUTS_TRUE(v == 3);
+ NUTS_TRUE(sz == sizeof(v));
+
+ NUTS_TRUE(nng_socket_set(rep, opt, "", 1) == NNG_EINVAL);
+ sz = 1;
+ NUTS_TRUE(nng_socket_get(rep, opt, &v, &sz) == NNG_EINVAL);
+ NUTS_TRUE(nng_socket_set_bool(rep, opt, true) == NNG_EBADTYPE);
+ NUTS_TRUE(nng_socket_get_bool(rep, opt, &b) == NNG_EBADTYPE);
+
+ NUTS_TRUE(nng_close(rep) == 0);
+}
+
+NUTS_TESTS = {
+ { "xreq identity", test_xreq_identity },
+ { "xreq raw", test_xreq_raw },
+ { "xreq no context", test_xreq_no_context },
+ { "xreq poll readable", test_xreq_poll_readable },
+ { "xreq poll writable", test_xreq_poll_writeable },
+ { "xreq validate peer", test_xreq_validate_peer },
+ { "xreq recv aio stopped", test_xreq_recv_aio_stopped },
+ { "xreq recv garbage", test_xreq_recv_garbage },
+ { "xreq recv header", test_xreq_recv_header },
+ { "xreq close during recv", test_xreq_close_during_recv },
+ { "xreq close pipe during send", test_xreq_close_pipe_during_send },
+ { "xreq ttl option", test_xreq_ttl_option },
+ { NULL, NULL },
+};