aboutsummaryrefslogtreecommitdiff
path: root/tests/reqstress.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-07 06:59:33 +0300
committerGarrett D'Amore <garrett@damore.org>2018-08-07 10:55:40 +0300
commit9804009c29664e303d81793e49e0ef2f3cf67b4f (patch)
tree4c5621a226bf35b1eb38716311aa1afdc0f85caf /tests/reqstress.c
parente81219db592d4f1622710136a11b8e8c4fe36c79 (diff)
downloadnng-9804009c29664e303d81793e49e0ef2f3cf67b4f.tar.gz
nng-9804009c29664e303d81793e49e0ef2f3cf67b4f.tar.bz2
nng-9804009c29664e303d81793e49e0ef2f3cf67b4f.zip
fixes #620 Stress tests are too stressful in CI/CD
This converts the tests to use async I/O callbacks instead of threads for running tests. This should greatly reduce the amount of pressure we apply to the system. On macOS the time to start up with a pressure of 500 is significantly less than under the old system. Plus, as we are no longer at the mercy of the scheduler, we're far more likely to get a successful test.
Diffstat (limited to 'tests/reqstress.c')
-rw-r--r--tests/reqstress.c209
1 files changed, 127 insertions, 82 deletions
diff --git a/tests/reqstress.c b/tests/reqstress.c
index b06a60c5..1811063b 100644
--- a/tests/reqstress.c
+++ b/tests/reqstress.c
@@ -33,12 +33,11 @@
static int next_port = 20000; // port number kind of.
-char tcp4_template[] = "tcp://127.0.0.1:%d";
-char tcp6_template[] = "tcp://[::1]:%d";
-char inproc_template[] = "inproc://nng_reqstress_%d";
-char ipc_template[] = "ipc:///tmp/nng_reqstress_%d";
-char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress";
-nng_time end_time;
+char tcp4_template[] = "tcp://127.0.0.1:%d";
+char tcp6_template[] = "tcp://[::1]:%d";
+char inproc_template[] = "inproc://nng_reqstress_%d";
+char ipc_template[] = "ipc:///tmp/nng_reqstress_%d";
+char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress";
char *templates[] = {
#ifdef NNG_TRANSPORT_TCP
@@ -69,10 +68,13 @@ int allocaddrs;
typedef struct test_case {
nng_socket socket;
const char *name;
- nng_thread *thr;
int nrecv;
int nsend;
int nfail;
+ nng_aio * recv_aio;
+ nng_aio * send_aio;
+ nng_aio * time_aio;
+ char buf[32];
} test_case;
static test_case *cases;
@@ -89,7 +91,7 @@ fatal(const char *msg, int rv)
void
error(test_case *c, const char *msg, int rv)
{
- if (rv == NNG_ECLOSED) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) {
return;
}
fprintf(
@@ -110,83 +112,105 @@ getaddr(char *buf)
// Request/Reply test. For this test, we open a server socket,
// and bind it to each protocol. Then we run a bunch of clients
-// against it. The
+// against it.
-// Simple rep echo server.
+// REP server implemented via callbacks.
static void
-rep_server(void *arg)
+rep_recv_cb(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_msg * msg;
- nng_socket rep = c->socket;
-
- if (nng_clock() > end_time) {
- break;
- }
+ int rv;
- if ((rv = nng_recvmsg(rep, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
- if ((rv = nng_sendmsg(rep, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
+ if ((rv = nng_aio_result(c->recv_aio)) != 0) {
+ error(c, "recv", rv);
+ return;
}
+ c->nrecv++;
+ nng_aio_set_msg(c->send_aio, nng_aio_get_msg(c->recv_aio));
+ nng_aio_set_msg(c->recv_aio, NULL);
+ nng_send_aio(c->socket, c->send_aio);
}
static void
-req_client(void *arg)
+rep_send_cb(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_socket req = c->socket;
- int num = 0;
- nng_msg * msg;
- char buf[32];
-
- (void) snprintf(buf, sizeof(buf), "%u-%d", req.id, num++);
-
- nng_msleep(rand() % 10);
- if (nng_clock() > end_time) {
- break;
- }
+ int rv;
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- error(c, "alloc fail", rv);
- return;
- }
- if ((rv = nng_msg_append(msg, buf, strlen(buf) + 1)) != 0) {
- nng_msg_free(msg);
- error(c, "append fail", rv);
- return;
- }
- if ((rv = nng_sendmsg(req, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
- if ((rv = nng_recvmsg(req, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
+ if ((rv = nng_aio_result(c->send_aio)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->send_aio));
+ nng_aio_set_msg(c->send_aio, NULL);
+ return;
+ }
+ c->nsend++;
+ nng_recv_aio(c->socket, c->recv_aio);
+}
- if (strcmp(nng_msg_body(msg), buf) != 0) {
- error(c, "mismatched message", NNG_EINTERNAL);
- nng_msg_free(msg);
- return;
- }
+static void
+req_time_cb(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+
+ if ((rv = nng_aio_result(c->time_aio)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ (void) snprintf(
+ c->buf, sizeof(c->buf), "%u-%d", c->socket.id, c->nsend);
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, c->buf, strlen(c->buf) + 1)) != 0)) {
+ error(c, "alloc", rv);
+ nng_msg_free(msg);
+ return;
+ }
+ nng_aio_set_msg(c->send_aio, msg);
+ nng_send_aio(c->socket, c->send_aio);
+}
+static void
+req_recv_cb(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+
+ if ((rv = nng_aio_result(c->recv_aio)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+
+ msg = nng_aio_get_msg(c->recv_aio);
+ if ((nng_msg_len(msg) != (strlen(c->buf) + 1)) ||
+ (strcmp(c->buf, nng_msg_body(msg)) != 0)) {
+ error(c, "msg mismatch", rv);
nng_msg_free(msg);
+ return;
+ }
+
+ nng_msg_free(msg);
+ memset(c->buf, 0, sizeof(c->buf));
+
+ c->nrecv++;
+ nng_sleep_aio(rand() % 10, c->time_aio);
+}
+
+static void
+req_send_cb(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->send_aio)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->send_aio));
+ nng_aio_set_msg(c->send_aio, NULL);
+ return;
}
+ c->nsend++;
+ nng_recv_aio(c->socket, c->recv_aio);
}
void
@@ -209,12 +233,25 @@ reqrep_test(int ntests)
fatal("nng_rep0_open", rv);
}
- if ((rv = nng_thread_create(&srv->thr, rep_server, srv)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->send_aio, rep_send_cb, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->recv_aio, rep_recv_cb, srv)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
+ nng_recv_aio(srv->socket, srv->recv_aio);
+
for (i = 1; i < ntests; i++) {
cli = &cases[curcase++];
+
+ if (((rv = nng_aio_alloc(&cli->send_aio, req_send_cb, cli)) !=
+ 0) ||
+ ((rv = nng_aio_alloc(&cli->recv_aio, req_recv_cb, cli)) !=
+ 0) ||
+ ((rv = nng_aio_alloc(&cli->time_aio, req_time_cb, cli)) !=
+ 0)) {
+ fatal("nng_aio_alloc", rv);
+ }
+
if ((rv = nng_req0_open(&cli->socket)) != 0) {
fatal("nng_req0_open", rv);
}
@@ -231,10 +268,7 @@ reqrep_test(int ntests)
fatal("nng_dial", rv);
}
- if ((rv = nng_thread_create(&cli->thr, req_client, cli)) !=
- 0) {
- fatal("nng_thread_create", rv);
- }
+ nng_sleep_aio(1, cli->time_aio);
}
}
@@ -250,7 +284,8 @@ Main({
tmo = 30;
}
// We have to keep this relatively low by default because some
- // platforms don't support large numbers of threads.
+ // platforms don't support large numbers of sockets. (On macOS
+ // laptop I can run this with 500 though.)
if (((str = ConveyGetEnv("STRESSPRESSURE")) == NULL) ||
((ncases = atoi(str)) < 1)) {
ncases = 32;
@@ -265,8 +300,7 @@ Main({
i = ncases;
- cases = calloc(ncases, sizeof(test_case));
- end_time = nng_clock() + (tmo * 1000);
+ cases = calloc(ncases, sizeof(test_case));
while (i > 1) {
int x = rand() % NTEMPLATES;
if (x > i) {
@@ -278,17 +312,28 @@ Main({
dprintf("WAITING for %d sec...\n", tmo);
nng_msleep(tmo * 1000); // sleep 30 sec
+
+ // Close the timeouts first, before closing sockets. This tends to
+ // ensure that we complete exchanges.
+ for (i = 0; i < ncases; i++) {
+ nng_aio_stop(cases[i].time_aio);
+ }
+ nng_msleep(100);
nng_closeall();
Test("Req/Rep Stress", {
Convey("All tests worked", {
for (i = 0; i < ncases; i++) {
- if (cases[i].thr != NULL) {
- nng_thread_destroy(cases[i].thr);
+ nng_aio_stop(cases[i].recv_aio);
+ nng_aio_stop(cases[i].send_aio);
+ nng_aio_stop(cases[i].time_aio);
+ nng_aio_free(cases[i].recv_aio);
+ nng_aio_free(cases[i].send_aio);
+ nng_aio_free(cases[i].time_aio);
+ if (cases[i].name != NULL) {
dprintf(
"RESULT socket %u (%s) sent %d "
- "recd "
- "%d fail %d\n",
+ "recd %d fail %d\n",
cases[i].socket.id, cases[i].name,
cases[i].nsend, cases[i].nrecv,
cases[i].nfail);