aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/reqrep0/rep.c14
-rw-r--r--src/protocol/reqrep0/rep_test.c124
2 files changed, 127 insertions, 11 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index f8c15fa6..142edb9b 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -36,15 +36,14 @@ static void rep0_pipe_fini(void *);
struct rep0_ctx {
rep0_sock * sock;
- char * btrace;
size_t btrace_len;
- size_t btrace_size;
uint32_t pipe_id;
rep0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
nni_aio * raio; // recv aio
nni_list_node sqnode;
nni_list_node rqnode;
+ uint32_t btrace[256]; // backtrace buffer
};
// rep0_sock is our per-socket protocol private structure.
@@ -100,7 +99,6 @@ rep0_ctx_fini(void *arg)
rep0_ctx *ctx = arg;
rep0_ctx_close(ctx);
- nni_free(ctx->btrace, ctx->btrace_size);
NNI_FREE_STRUCT(ctx);
}
@@ -114,12 +112,6 @@ rep0_ctx_init(void **ctxp, void *sarg)
return (NNG_ENOMEM);
}
- // this is 1kB, which covers the worst case.
- ctx->btrace_size = 256 * sizeof(uint32_t);
- if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) {
- NNI_FREE_STRUCT(ctx);
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&ctx->sqnode);
NNI_LIST_NODE_INIT(&ctx->rqnode);
ctx->btrace_len = 0;
@@ -541,7 +533,7 @@ rep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- bool end = false;
+ bool end;
if (hops > s->ttl) {
// This isn't malformed, but it has gone
@@ -559,7 +551,7 @@ rep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = ((body[0] & 0x80) != 0);
+ end = ((body[0] & 0x80u) != 0);
if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory, so drop it.
goto drop;
diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c
index 784cdeb7..6bbdfcd0 100644
--- a/src/protocol/reqrep0/rep_test.c
+++ b/src/protocol/reqrep0/rep_test.c
@@ -176,6 +176,126 @@ test_rep_validate_peer(void)
nng_stats_free(stats);
}
+void
+test_rep_double_recv(void)
+{
+ nng_socket s1;
+ nng_aio * aio1;
+ nng_aio * aio2;
+
+ TEST_NNG_PASS(nng_rep0_open(&s1));
+ TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+
+ nng_recv_aio(s1, aio1);
+ nng_recv_aio(s1, aio2);
+
+ nng_aio_wait(aio2);
+ TEST_NNG_FAIL(nng_aio_result(aio2), NNG_ESTATE);
+ TEST_NNG_PASS(nng_close(s1));
+ TEST_NNG_FAIL(nng_aio_result(aio1), NNG_ECLOSED);
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+}
+
+void
+test_rep_close_pipe_before_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p;
+ nng_aio * aio1;
+ nng_msg * m;
+
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_req0_open(&req));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+
+ TEST_NNG_PASS(testutil_marry(req, rep));
+ TEST_NNG_SEND_STR(req, "test");
+
+ nng_recv_aio(rep, aio1);
+ nng_aio_wait(aio1);
+ TEST_NNG_PASS(nng_aio_result(aio1));
+ TEST_CHECK((m = nng_aio_get_msg(aio1)) != NULL);
+ p = nng_msg_get_pipe(m);
+ TEST_NNG_PASS(nng_pipe_close(p));
+ TEST_NNG_PASS(nng_sendmsg(rep, m, 0));
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+ nng_aio_free(aio1);
+}
+
+void
+test_rep_close_pipe_during_send(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_pipe p = NNG_PIPE_INITIALIZER;
+ nng_msg * m;
+
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 200));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_int(rep, NNG_OPT_SENDBUF, 20));
+ TEST_NNG_PASS(nng_setopt_int(rep, NNG_OPT_RECVBUF, 20));
+ TEST_NNG_PASS(nng_setopt_int(req, NNG_OPT_SENDBUF, 20));
+ TEST_NNG_PASS(nng_setopt_int(req, NNG_OPT_RECVBUF, 1));
+
+ TEST_NNG_PASS(testutil_marry(req, rep));
+
+ for (int i = 0; i < 100; i++) {
+ int rv;
+ TEST_NNG_PASS(nng_msg_alloc(&m, 4));
+ TEST_NNG_PASS(
+ nng_msg_append_u32(m, (unsigned) i | 0x80000000u));
+ TEST_NNG_PASS(nng_sendmsg(req, m, 0));
+ TEST_NNG_PASS(nng_recvmsg(rep, &m, 0));
+ p = nng_msg_get_pipe(m);
+ rv = nng_sendmsg(rep, m, 0);
+ if (rv == NNG_ETIMEDOUT) {
+ // Queue is backed up, senders are busy.
+ nng_msg_free(m);
+ break;
+ }
+ TEST_NNG_PASS(rv);
+ }
+ TEST_NNG_PASS(nng_pipe_close(p));
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+void
+test_rep_recv_garbage(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 200));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 200));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_NNG_PASS(testutil_marry(req, rep));
+
+ TEST_NNG_PASS(nng_msg_alloc(&m, 4));
+ TEST_NNG_PASS(nng_msg_append_u32(m, 1u));
+ TEST_NNG_PASS(nng_sendmsg(req, m, 0));
+ TEST_NNG_FAIL(nng_recvmsg(rep, &m, 0), NNG_ETIMEDOUT);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
TEST_LIST = {
{ "rep identity", test_rep_identity },
{ "rep send bad state", test_rep_send_bad_state },
@@ -183,5 +303,9 @@ TEST_LIST = {
{ "rep poll writable", test_rep_poll_writeable },
{ "rep context not pollable", test_rep_context_not_pollable },
{ "rep validate peer", test_rep_validate_peer },
+ { "rep double recv", test_rep_double_recv },
+ { "rep close pipe before send", test_rep_close_pipe_before_send },
+ { "rep close pipe during send", test_rep_close_pipe_during_send },
+ { "rep recv garbage", test_rep_recv_garbage },
{ NULL, NULL },
};