aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt1
-rw-r--r--src/protocol/reqrep0/rep.c39
-rw-r--r--src/protocol/reqrep0/rep_test.c6
-rw-r--r--src/protocol/reqrep0/req.c92
-rw-r--r--src/protocol/reqrep0/xrep.c17
-rw-r--r--src/protocol/reqrep0/xrep_test.c32
-rw-r--r--src/protocol/reqrep0/xreq.c130
-rw-r--r--src/protocol/reqrep0/xreq_test.c333
8 files changed, 490 insertions, 160 deletions
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
index 3cd8d366..4e1b7a8f 100644
--- a/src/protocol/reqrep0/CMakeLists.txt
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -26,3 +26,4 @@ nng_defines_if(NNG_PROTO_REP0 NNG_HAVE_REP0)
nng_test(reqrep_test)
nng_test(rep_test)
nng_test(xrep_test)
+nng_test(xreq_test)
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index a29c3120..3e1a34a9 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -48,14 +48,14 @@ struct rep0_ctx {
// rep0_sock is our per-socket protocol private structure.
struct rep0_sock {
- nni_mtx lk;
- int ttl;
- nni_idhash * pipes;
- nni_list recvpipes; // list of pipes with data to receive
- nni_list recvq;
- rep0_ctx ctx;
- nni_pollable readable;
- nni_pollable writable;
+ nni_mtx lk;
+ nni_atomic_int ttl;
+ nni_idhash * pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ rep0_ctx ctx;
+ nni_pollable readable;
+ nni_pollable writable;
};
// rep0_pipe is our per-pipe protocol private structure.
@@ -241,8 +241,8 @@ rep0_sock_init(void *arg, nni_sock *sock)
NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode);
NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode);
-
- s->ttl = 8;
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
(void) rep0_ctx_init(&s->ctx, s);
@@ -506,7 +506,7 @@ rep0_pipe_recv_cb(void *arg)
for (;;) {
bool end;
- if (hops > s->ttl) {
+ if (hops > nni_atomic_get(&s->ttl)) {
// This isn't malformed, but it has gone
// through too many hops. Do not disconnect,
// because we can legitimately receive messages
@@ -574,19 +574,24 @@ drop:
}
static int
-rep0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+rep0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
rep0_sock *s = arg;
+ int ttl;
+ int rv;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t));
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
}
static int
-rep0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+rep0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
rep0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static int
@@ -654,8 +659,8 @@ static nni_proto_ctx_ops rep0_ctx_ops = {
static nni_option rep0_sock_options[] = {
{
.o_name = NNG_OPT_MAXTTL,
- .o_get = rep0_sock_get_maxttl,
- .o_set = rep0_sock_set_maxttl,
+ .o_get = rep0_sock_get_max_ttl,
+ .o_set = rep0_sock_set_max_ttl,
},
{
.o_name = NNG_OPT_RECVFD,
diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c
index f339e68d..7c6d6ba0 100644
--- a/src/protocol/reqrep0/rep_test.c
+++ b/src/protocol/reqrep0/rep_test.c
@@ -117,7 +117,7 @@ test_rep_poll_readable(void)
TEST_CHECK(testutil_pollfd(fd) == true);
- // and receiving makes it no longer pollable
+ // and receiving makes it no longer ready
TEST_NNG_PASS(nng_recvmsg(rep, &msg, 0));
nng_msg_free(msg);
TEST_CHECK(testutil_pollfd(fd) == false);
@@ -129,7 +129,7 @@ test_rep_poll_readable(void)
}
void
-test_rep_context_not_pollable(void)
+test_rep_context_no_poll(void)
{
int fd;
nng_socket req;
@@ -439,7 +439,7 @@ TEST_LIST = {
{ "rep send bad state", test_rep_send_bad_state },
{ "rep poll readable", test_rep_poll_readable },
{ "rep poll writable", test_rep_poll_writeable },
- { "rep context not pollable", test_rep_context_not_pollable },
+ { "rep context does not poll", test_rep_context_no_poll },
{ "rep validate peer", test_rep_validate_peer },
{ "rep double recv", test_rep_double_recv },
{ "rep close pipe before send", test_rep_close_pipe_before_send },
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 14da7143..796bd71e 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -9,7 +9,6 @@
//
#include <stdio.h>
-#include <stdlib.h>
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
@@ -56,19 +55,19 @@ struct req0_ctx {
// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
- nni_duration retry;
- bool closed;
- int ttl;
- req0_ctx master; // base socket master
- nni_list ready_pipes;
- nni_list busy_pipes;
- nni_list stop_pipes;
- nni_list contexts;
- nni_list send_queue; // contexts waiting to send.
- nni_idhash * requests; // contexts by request ID
- nni_pollable readable;
- nni_pollable writable;
- nni_mtx mtx;
+ nni_duration retry;
+ bool closed;
+ nni_atomic_int ttl;
+ req0_ctx master; // base socket master
+ nni_list ready_pipes;
+ nni_list busy_pipes;
+ nni_list stop_pipes;
+ nni_list contexts;
+ nni_list send_queue; // contexts waiting to send.
+ nni_idhash * requests; // contexts by request ID
+ nni_pollable readable;
+ nni_pollable writable;
+ nni_mtx mtx;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -78,8 +77,8 @@ struct req0_pipe {
nni_list_node node;
nni_list contexts; // contexts with pending traffic
bool closed;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
};
static void req0_sock_fini(void *);
@@ -120,7 +119,8 @@ req0_sock_init(void *arg, nni_sock *sock)
nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);
- s->ttl = 8;
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
return (0);
}
@@ -172,8 +172,8 @@ req0_pipe_stop(void *arg)
req0_pipe *p = arg;
req0_sock *s = p->req;
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_send);
nni_mtx_lock(&s->mtx);
nni_list_node_remove(&p->node);
nni_mtx_unlock(&s->mtx);
@@ -184,22 +184,17 @@ req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
- nni_aio_free(p->aio_recv);
- nni_aio_free(p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_send);
}
static int
req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
req0_pipe *p = arg;
- int rv;
-
- if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) {
- req0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_recv, req0_recv_cb, p);
+ nni_aio_init(&p->aio_send, req0_send_cb, p);
NNI_LIST_NODE_INIT(&p->node);
NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node);
p->pipe = pipe;
@@ -227,7 +222,7 @@ req0_pipe_start(void *arg)
req0_run_send_queue(s, NULL);
nni_mtx_unlock(&s->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -238,8 +233,8 @@ req0_pipe_close(void *arg)
req0_sock *s = p->req;
req0_ctx * ctx;
- nni_aio_close(p->aio_recv);
- nni_aio_close(p->aio_send);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_send);
nni_mtx_lock(&s->mtx);
// This removes the node from either busy_pipes or ready_pipes.
@@ -277,10 +272,10 @@ req0_send_cb(void *arg)
nni_list send_list;
nni_aio_list_init(&send_list);
- if (nni_aio_result(p->aio_send) != 0) {
+ if (nni_aio_result(&p->aio_send) != 0) {
// We failed to send... clean up and deal with it.
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -319,13 +314,13 @@ req0_recv_cb(void *arg)
nni_aio * aio;
uint32_t id;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// We yank 4 bytes from front of body, and move them to the header.
@@ -334,17 +329,11 @@ req0_recv_cb(void *arg)
goto malformed;
}
id = nni_msg_trim_u32(msg);
- if (nni_msg_header_append_u32(msg, id) != 0) {
- // Arguably we could just discard and carry on. But
- // dropping the connection is probably more helpful since
- // it lets the other side see that a problem occurred.
- // Plus it gives us a chance to reclaim some memory.
- goto malformed;
- }
+ nni_msg_header_must_append_u32(msg, id);
// Schedule another receive while we are processing this.
nni_mtx_lock(&s->mtx);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
// Look for a context to receive it.
if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) ||
@@ -526,8 +515,8 @@ req0_run_send_queue(req0_sock *s, nni_list *send_list)
}
}
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
}
}
@@ -763,14 +752,19 @@ static int
req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
req0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t));
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
}
static int
req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static int
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index aac6d6b9..e5d96f02 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -40,7 +40,7 @@ struct xrep0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
- nni_atomic_u64 ttl;
+ nni_atomic_int ttl;
nni_idhash * pipes;
nni_aio aio_getq;
};
@@ -74,8 +74,8 @@ xrep0_sock_init(void *arg, nni_sock *sock)
nni_mtx_init(&s->lk);
nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s);
- nni_atomic_init64(&s->ttl);
- nni_atomic_set64(&s->ttl, 8); // Per RFC
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8); // Per RFC
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
@@ -295,17 +295,14 @@ xrep0_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// Store the pipe id in the header, first thing.
- if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) {
- // Failure here causes us to drop the message.
- goto drop;
- }
+ nni_msg_header_must_append_u32(msg, nni_pipe_id(p->pipe));
// Move backtrace from body to header
hops = 1;
for (;;) {
bool end = 0;
uint8_t *body;
- if (hops > (int)nni_atomic_get64(&s->ttl)) {
+ if (hops > (int)nni_atomic_get(&s->ttl)) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
// can legitimately receive messages with too many
@@ -364,7 +361,7 @@ xrep0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
int ttl;
int rv;
if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
- nni_atomic_set64(&s->ttl, (uint64_t) ttl);
+ nni_atomic_set(&s->ttl, ttl);
}
return (rv);
}
@@ -373,7 +370,7 @@ static int
xrep0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
xrep0_sock *s = arg;
- return (nni_copyout_int((int) nni_atomic_get64(&s->ttl), buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static void
diff --git a/src/protocol/reqrep0/xrep_test.c b/src/protocol/reqrep0/xrep_test.c
index 93573be1..958f3240 100644
--- a/src/protocol/reqrep0/xrep_test.c
+++ b/src/protocol/reqrep0/xrep_test.c
@@ -20,7 +20,7 @@
#define NNI_PROTO(x, y) (((x) << 4u) | (y))
#endif
-void
+static void
test_xrep_identity(void)
{
nng_socket s;
@@ -41,7 +41,7 @@ test_xrep_identity(void)
TEST_CHECK(nng_close(s) == 0);
}
-void
+static void
test_xrep_raw(void)
{
nng_socket s;
@@ -53,7 +53,7 @@ test_xrep_raw(void)
TEST_NNG_PASS(nng_close(s));
}
-void
+static void
test_xrep_no_context(void)
{
nng_socket s;
@@ -64,7 +64,7 @@ test_xrep_no_context(void)
TEST_NNG_PASS(nng_close(s));
}
-void
+static void
test_xrep_poll_writeable(void)
{
int fd;
@@ -91,7 +91,7 @@ test_xrep_poll_writeable(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_poll_readable(void)
{
int fd;
@@ -118,7 +118,7 @@ test_xrep_poll_readable(void)
TEST_CHECK(testutil_pollfd(fd) == true);
- // and receiving makes it no longer pollable
+ // and receiving makes it no longer ready
TEST_NNG_PASS(nng_recvmsg(rep, &msg, 0));
nng_msg_free(msg);
TEST_CHECK(testutil_pollfd(fd) == false);
@@ -127,7 +127,7 @@ test_xrep_poll_readable(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_validate_peer(void)
{
nng_socket s1, s2;
@@ -158,7 +158,7 @@ test_xrep_validate_peer(void)
nng_stats_free(stats);
}
-void
+static void
test_xrep_close_pipe_before_send(void)
{
nng_socket rep;
@@ -190,7 +190,7 @@ test_xrep_close_pipe_before_send(void)
nng_aio_free(aio1);
}
-void
+static void
test_xrep_close_pipe_during_send(void)
{
nng_socket rep;
@@ -231,7 +231,7 @@ test_xrep_close_pipe_during_send(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_close_during_recv(void)
{
nng_socket rep;
@@ -261,7 +261,7 @@ test_xrep_close_during_recv(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_recv_aio_stopped(void)
{
nng_socket rep;
@@ -278,7 +278,7 @@ test_xrep_recv_aio_stopped(void)
nng_aio_free(aio);
}
-void
+static void
test_xrep_send_no_header(void)
{
nng_socket rep;
@@ -302,7 +302,7 @@ test_xrep_send_no_header(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_recv_garbage(void)
{
nng_socket rep;
@@ -326,13 +326,13 @@ test_xrep_recv_garbage(void)
TEST_NNG_PASS(nng_close(rep));
}
-void
+static void
test_xrep_ttl_option(void)
{
nng_socket rep;
int v;
bool b;
- size_t sz = sizeof(v);
+ size_t sz;
const char *opt = NNG_OPT_MAXTTL;
TEST_NNG_PASS(nng_rep0_open_raw(&rep));
@@ -359,7 +359,7 @@ test_xrep_ttl_option(void)
TEST_CHECK(nng_close(rep) == 0);
}
-void
+static void
test_xrep_ttl_drop(void)
{
nng_socket rep;
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 15652f4f..ef24010e 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -11,7 +11,7 @@
#include <stdio.h>
#include "core/nng_impl.h"
-#include "nng/protocol/reqrep0/req.h"
+//#include "nng/protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
@@ -29,19 +29,19 @@ typedef struct xreq0_sock xreq0_sock;
// An xreq0_sock is our per-socket protocol private structure.
struct xreq0_sock {
- nni_msgq *uwq;
- nni_msgq *urq;
- int ttl;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_atomic_int ttl;
};
// A req0_pipe is our per-pipe protocol private structure.
struct xreq0_pipe {
nni_pipe * pipe;
xreq0_sock *req;
- nni_aio * aio_getq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static void xreq0_sock_fini(void *);
@@ -55,7 +55,8 @@ xreq0_sock_init(void *arg, nni_sock *sock)
{
xreq0_sock *s = arg;
- s->ttl = 8;
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
@@ -85,10 +86,10 @@ xreq0_pipe_stop(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
+ nni_aio_stop(&p->aio_getq);
+ nni_aio_stop(&p->aio_putq);
+ nni_aio_stop(&p->aio_recv);
+ nni_aio_stop(&p->aio_send);
}
static void
@@ -96,25 +97,21 @@ xreq0_pipe_fini(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_free(p->aio_getq);
- nni_aio_free(p->aio_putq);
- nni_aio_free(p->aio_recv);
- nni_aio_free(p->aio_send);
+ nni_aio_fini(&p->aio_getq);
+ nni_aio_fini(&p->aio_putq);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_send);
}
static int
xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
xreq0_pipe *p = arg;
- int rv;
- if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) {
- xreq0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_getq, xreq0_getq_cb, p);
+ nni_aio_init(&p->aio_putq, xreq0_putq_cb, p);
+ nni_aio_init(&p->aio_recv, xreq0_recv_cb, p);
+ nni_aio_init(&p->aio_send, xreq0_send_cb, p);
p->pipe = pipe;
p->req = s;
@@ -131,8 +128,8 @@ xreq0_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_msgq_aio_get(s->uwq, p->aio_getq);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -141,14 +138,14 @@ xreq0_pipe_close(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_close(p->aio_getq);
- nni_aio_close(p->aio_putq);
- nni_aio_close(p->aio_recv);
- nni_aio_close(p->aio_send);
+ nni_aio_close(&p->aio_getq);
+ nni_aio_close(&p->aio_putq);
+ nni_aio_close(&p->aio_recv);
+ nni_aio_close(&p->aio_send);
}
-// For raw mode we can just let the pipes "contend" via getq to get a
-// message from the upper write queue. The msgqueue implementation
+// For raw mode we can just let the pipes "contend" via get queue to get a
+// message from the upper write queue. The msg queue implementation
// actually provides ordering, so load will be spread automatically.
// (NB: We may have to revise this in the future if we want to provide some
// kind of priority.)
@@ -158,15 +155,15 @@ xreq0_getq_cb(void *arg)
{
xreq0_pipe *p = arg;
- if (nni_aio_result(p->aio_getq) != 0) {
+ if (nni_aio_result(&p->aio_getq) != 0) {
nni_pipe_close(p->pipe);
return;
}
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
+ nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
+ nni_aio_set_msg(&p->aio_getq, NULL);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_pipe_send(p->pipe, &p->aio_send);
}
static void
@@ -174,15 +171,15 @@ xreq0_send_cb(void *arg)
{
xreq0_pipe *p = arg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
// Sent a message so we just need to look for another one.
- nni_msgq_aio_get(p->req->uwq, p->aio_getq);
+ nni_msgq_aio_get(p->req->uwq, &p->aio_getq);
}
static void
@@ -190,15 +187,15 @@ xreq0_putq_cb(void *arg)
{
xreq0_pipe *p = arg;
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_putq));
+ nni_aio_set_msg(&p->aio_putq, NULL);
nni_pipe_close(p->pipe);
return;
}
- nni_aio_set_msg(p->aio_putq, NULL);
+ nni_aio_set_msg(&p->aio_putq, NULL);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static void
@@ -209,13 +206,13 @@ xreq0_recv_cb(void *arg)
nni_msg * msg;
uint32_t id;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// We yank 4 bytes from front of body, and move them to the header.
@@ -226,15 +223,13 @@ xreq0_recv_cb(void *arg)
return;
}
id = nni_msg_trim_u32(msg);
- if (nni_msg_header_append_u32(msg, id) != 0) {
- // Probably ENOMEM, discard and carry on.
- nni_msg_free(msg);
- nni_pipe_recv(p->pipe, p->aio_recv);
- return;
- }
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(sock->urq, p->aio_putq);
+ // Since we got this from the transport, there had better be some
+ // room in the header for our stuff.
+ nni_msg_header_must_append_u32(msg, id);
+
+ nni_aio_set_msg(&p->aio_putq, msg);
+ nni_msgq_aio_put(sock->urq, &p->aio_putq);
}
static void
@@ -254,17 +249,22 @@ xreq0_sock_recv(void *arg, nni_aio *aio)
}
static int
-xreq0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
+xreq0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
xreq0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t));
+ int ttl;
+ int rv;
+ if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
+ nni_atomic_set(&s->ttl, ttl);
+ }
+ return (rv);
}
static int
-xreq0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
+xreq0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
xreq0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, t));
+ return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
static nni_proto_pipe_ops xreq0_pipe_ops = {
@@ -279,8 +279,8 @@ static nni_proto_pipe_ops xreq0_pipe_ops = {
static nni_option xreq0_sock_options[] = {
{
.o_name = NNG_OPT_MAXTTL,
- .o_get = xreq0_sock_get_maxttl,
- .o_set = xreq0_sock_set_maxttl,
+ .o_get = xreq0_sock_get_max_ttl,
+ .o_set = xreq0_sock_set_max_ttl,
},
// terminate list
{
@@ -310,7 +310,7 @@ static nni_proto xreq0_proto = {
};
int
-nng_req0_open_raw(nng_socket *sidp)
+nng_req0_open_raw(nng_socket *sock)
{
- return (nni_proto_open(sidp, &xreq0_proto));
+ return (nni_proto_open(sock, &xreq0_proto));
}
diff --git a/src/protocol/reqrep0/xreq_test.c b/src/protocol/reqrep0/xreq_test.c
new file mode 100644
index 00000000..7f14f0cf
--- /dev/null
+++ b/src/protocol/reqrep0/xreq_test.c
@@ -0,0 +1,333 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+
+#include <acutest.h>
+#include <testutil.h>
+
+#ifndef NNI_PROTO
+#define NNI_PROTO(x, y) (((x) << 4u) | (y))
+#endif
+
+static void
+test_xreq_identity(void)
+{
+ nng_socket s;
+ int p;
+ char * n;
+
+ TEST_CHECK(nng_req0_open_raw(&s) == 0);
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
+ TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
+ TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "req") == 0);
+ nng_strfree(n);
+ TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
+ TEST_CHECK(strcmp(n, "rep") == 0);
+ nng_strfree(n);
+ TEST_CHECK(nng_close(s) == 0);
+}
+
+static void
+test_xreq_raw(void)
+{
+ nng_socket s;
+ bool b;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&s));
+ TEST_NNG_PASS(nng_getopt_bool(s, NNG_OPT_RAW, &b));
+ TEST_CHECK(b);
+ TEST_NNG_PASS(nng_close(s));
+}
+
+static void
+test_xreq_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&s));
+ TEST_NNG_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ TEST_NNG_PASS(nng_close(s));
+}
+
+static void
+test_xreq_poll_writeable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_getopt_int(req, NNG_OPT_SENDFD, &fd));
+ TEST_CHECK(fd >= 0);
+
+ // We can't write until we have a connection.
+ TEST_CHECK(testutil_pollfd(fd) == false);
+
+ TEST_NNG_PASS(testutil_marry(req, rep));
+
+ // Now it's writable.
+ TEST_CHECK(testutil_pollfd(fd) == true);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
+test_xreq_poll_readable(void)
+{
+ int fd;
+ nng_socket req;
+ nng_socket rep;
+ nng_msg * msg;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_getopt_int(req, NNG_OPT_RECVFD, &fd));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_CHECK(fd >= 0);
+
+ // Not readable if not connected!
+ TEST_CHECK(testutil_pollfd(fd) == false);
+
+ // Even after connect (no message yet)
+ TEST_NNG_PASS(testutil_marry(req, rep));
+ TEST_CHECK(testutil_pollfd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ TEST_NNG_PASS(nng_msg_alloc(&msg, 0));
+ // Request ID
+ TEST_NNG_PASS(nng_msg_append_u32(msg, 0x80000000));
+ TEST_NNG_PASS(nng_sendmsg(req, msg, 0));
+
+ TEST_NNG_PASS(nng_recvmsg(rep, &msg, 0));
+ TEST_NNG_PASS(nng_sendmsg(rep, msg, 0));
+
+ testutil_sleep(100);
+
+ TEST_CHECK(testutil_pollfd(fd) == true);
+
+ // and receiving makes it no longer ready
+ TEST_NNG_PASS(nng_recvmsg(req, &msg, 0));
+ nng_msg_free(msg);
+ TEST_CHECK(testutil_pollfd(fd) == false);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
+test_xreq_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char addr[64];
+
+ testutil_scratch_addr("inproc", sizeof(addr), addr);
+
+ TEST_NNG_PASS(nng_req0_open_raw(&s1));
+ TEST_NNG_PASS(nng_req0_open(&s2));
+
+ TEST_NNG_PASS(nng_listen(s1, addr, NULL, 0));
+ TEST_NNG_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ testutil_sleep(100);
+ TEST_NNG_PASS(nng_stats_get(&stats));
+
+ TEST_CHECK(stats != NULL);
+ TEST_CHECK((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ TEST_CHECK((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ TEST_CHECK(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ TEST_CHECK(nng_stat_value(reject) > 0);
+
+ TEST_NNG_PASS(nng_close(s1));
+ TEST_NNG_PASS(nng_close(s2));
+ nng_stats_free(stats);
+}
+
+static void
+test_xreq_recv_aio_stopped(void)
+{
+ nng_socket req;
+ nng_aio * aio;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL));
+
+ nng_aio_stop(aio);
+ nng_recv_aio(req, aio);
+ nng_aio_wait(aio);
+ TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ TEST_NNG_PASS(nng_close(req));
+ nng_aio_free(aio);
+}
+
+static void
+test_xreq_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ uint32_t req_id;
+
+ TEST_NNG_PASS(nng_rep0_open_raw(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 100));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_NNG_PASS(testutil_marry(req, rep));
+
+ TEST_NNG_PASS(nng_msg_alloc(&m, 0));
+ TEST_NNG_PASS(nng_msg_append_u32(m, 0x80000000));
+ TEST_NNG_PASS(nng_sendmsg(req, m, 0));
+
+ TEST_NNG_PASS(nng_recvmsg(rep, &m, 0));
+
+ // The message will have a header that contains the 32-bit pipe ID,
+ // followed by the 32-bit request ID. We will discard the request
+ // ID before sending it out.
+ TEST_CHECK(nng_msg_header_len(m) == 8);
+ TEST_NNG_PASS(nng_msg_header_chop_u32(m, &req_id));
+ TEST_CHECK(req_id == 0x80000000);
+
+ TEST_NNG_PASS(nng_sendmsg(rep, m, 0));
+ TEST_NNG_FAIL(nng_recvmsg(req, &m, 0), NNG_ETIMEDOUT);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
+test_xreq_close_during_recv(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ TEST_NNG_PASS(nng_rep0_open_raw(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 100));
+ TEST_NNG_PASS(nng_setopt_int(req, NNG_OPT_RECVBUF, 5));
+ TEST_NNG_PASS(nng_setopt_int(rep, NNG_OPT_SENDBUF, 20));
+
+ TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2));
+ TEST_CHECK(nng_pipe_id(p1) > 0);
+ TEST_CHECK(nng_pipe_id(p2) > 0);
+
+ for (unsigned i = 0; i < 20; i++) {
+ TEST_NNG_PASS(nng_msg_alloc(&m, 4));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, i | 0x80000000u));
+ testutil_sleep(10);
+ TEST_NNG_PASS(nng_sendmsg(rep, m, 0));
+ }
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
+test_xreq_close_pipe_during_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1;
+ nng_pipe p2;
+
+ TEST_NNG_PASS(nng_rep0_open_raw(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 100));
+ TEST_NNG_PASS(nng_setopt_int(rep, NNG_OPT_RECVBUF, 5));
+ TEST_NNG_PASS(nng_setopt_int(req, NNG_OPT_SENDBUF, 20));
+
+ TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2));
+ TEST_CHECK(nng_pipe_id(p1) > 0);
+ TEST_CHECK(nng_pipe_id(p2) > 0);
+
+ for (unsigned i = 0; i < 20; i++) {
+ TEST_NNG_PASS(nng_msg_alloc(&m, 4));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, i | 0x80000000u));
+ testutil_sleep(10);
+ TEST_NNG_PASS(nng_sendmsg(req, m, 0));
+ }
+
+ TEST_NNG_PASS(nng_pipe_close(p1));
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
+test_xreq_ttl_option(void)
+{
+ nng_socket rep;
+ int v;
+ bool b;
+ size_t sz = sizeof(v);
+ const char *opt = NNG_OPT_MAXTTL;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&rep));
+
+ TEST_NNG_PASS(nng_setopt_int(rep, opt, 1));
+ TEST_NNG_FAIL(nng_setopt_int(rep, opt, 0), NNG_EINVAL);
+ TEST_NNG_FAIL(nng_setopt_int(rep, opt, -1), NNG_EINVAL);
+ TEST_NNG_FAIL(nng_setopt_int(rep, opt, 256), NNG_EINVAL);
+ TEST_NNG_PASS(nng_setopt_int(rep, opt, 3));
+ TEST_NNG_PASS(nng_getopt_int(rep, opt, &v));
+ TEST_CHECK(v == 3);
+ v = 0;
+ sz = sizeof(v);
+ TEST_NNG_PASS(nng_getopt(rep, opt, &v, &sz));
+ TEST_CHECK(v == 3);
+ TEST_CHECK(sz == sizeof(v));
+
+ TEST_CHECK(nng_setopt(rep, opt, "", 1) == NNG_EINVAL);
+ sz = 1;
+ TEST_CHECK(nng_getopt(rep, opt, &v, &sz) == NNG_EINVAL);
+ TEST_CHECK(nng_setopt_bool(rep, opt, true) == NNG_EBADTYPE);
+ TEST_CHECK(nng_getopt_bool(rep, opt, &b) == NNG_EBADTYPE);
+
+ TEST_CHECK(nng_close(rep) == 0);
+}
+
+TEST_LIST = {
+ { "xreq identity", test_xreq_identity },
+ { "xreq raw", test_xreq_raw },
+ { "xreq no context", test_xreq_no_context },
+ { "xreq poll readable", test_xreq_poll_readable },
+ { "xreq poll writable", test_xreq_poll_writeable },
+ { "xreq validate peer", test_xreq_validate_peer },
+ { "xreq recv aio stopped", test_xreq_recv_aio_stopped },
+ { "xreq recv garbage", test_xreq_recv_garbage },
+ { "xreq close during recv", test_xreq_close_during_recv },
+ { "xreq close pipe during send", test_xreq_close_pipe_during_send },
+ { "xreq ttl option", test_xreq_ttl_option },
+ { NULL, NULL },
+};