aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-07 06:59:33 +0300
committerGarrett D'Amore <garrett@damore.org>2018-08-07 10:55:40 +0300
commit9804009c29664e303d81793e49e0ef2f3cf67b4f (patch)
tree4c5621a226bf35b1eb38716311aa1afdc0f85caf
parente81219db592d4f1622710136a11b8e8c4fe36c79 (diff)
downloadnng-9804009c29664e303d81793e49e0ef2f3cf67b4f.tar.gz
nng-9804009c29664e303d81793e49e0ef2f3cf67b4f.tar.bz2
nng-9804009c29664e303d81793e49e0ef2f3cf67b4f.zip
fixes #620 Stress tests are too stressful in CI/CD
This converts the tests to use async I/O callbacks instead of threads for running tests. This should greatly reduce the amount of pressure we apply to the system. On macOS the time to start up with a pressure of 500 is significantly less than under the old system. Plus, as we are no longer at the mercy of the scheduler, we're far more likely to get a successful test.
-rw-r--r--tests/multistress.c714
-rw-r--r--tests/reqstress.c209
2 files changed, 563 insertions, 360 deletions
diff --git a/tests/multistress.c b/tests/multistress.c
index cbe16b0f..d39d2896 100644
--- a/tests/multistress.c
+++ b/tests/multistress.c
@@ -71,12 +71,17 @@ int naddresses;
int allocaddrs;
typedef struct test_case {
- nng_socket socket;
+ nng_socket sock;
const char *name;
- nng_thread *thr;
int nrecv;
int nsend;
int nfail;
+ nng_aio * recd;
+ nng_aio * sent;
+ nng_aio * woke;
+ char addr[NNG_MAXADDRLEN];
+ char buf[32];
+
} test_case;
static test_case *cases;
@@ -93,7 +98,7 @@ fatal(const char *msg, int rv)
void
error(test_case *c, const char *msg, int rv)
{
- if (rv == NNG_ECLOSED) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) {
return;
}
fprintf(
@@ -114,86 +119,108 @@ getaddr(char *buf)
// Request/Reply test. For this test, we open a server socket,
// and bind it to each protocol. Then we run a bunch of clients
-// against it. The
+// against it.
-// Simple rep echo server.
+// REP server implemented via callbacks.
static void
-rep_server(void *arg)
+rep0_recd(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_msg * msg;
- nng_socket rep = c->socket;
-
- if (nng_clock() > end_time) {
- break;
- }
+ int rv;
- if ((rv = nng_recvmsg(rep, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
- if ((rv = nng_sendmsg(rep, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
}
+ c->nrecv++;
+ nng_aio_set_msg(c->sent, nng_aio_get_msg(c->recd));
+ nng_aio_set_msg(c->recd, NULL);
+ nng_send_aio(c->sock, c->sent);
}
static void
-req_client(void *arg)
+rep0_sent(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_socket req = c->socket;
- int num = 0;
- nng_msg * msg;
- char buf[32];
+ int rv;
- (void) snprintf(buf, sizeof(buf), "%u-%d", req.id, num++);
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ return;
+ }
+ c->nsend++;
+ nng_recv_aio(c->sock, c->recd);
+}
- nng_msleep(rand() % 10);
- if (nng_clock() > end_time) {
- break;
- }
+static void
+req0_woke(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- error(c, "alloc fail", rv);
- return;
- }
- if ((rv = nng_msg_append(msg, buf, strlen(buf) + 1)) != 0) {
- nng_msg_free(msg);
- error(c, "append fail", rv);
- return;
- }
- if ((rv = nng_sendmsg(req, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
- if ((rv = nng_recvmsg(req, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
+ if ((rv = nng_aio_result(c->woke)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ (void) snprintf(c->buf, sizeof(c->buf), "%u-%d", c->sock.id, c->nsend);
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, c->buf, strlen(c->buf) + 1)) != 0)) {
+ error(c, "alloc", rv);
+ nng_msg_free(msg);
+ return;
+ }
+ nng_aio_set_msg(c->sent, msg);
+ nng_send_aio(c->sock, c->sent);
+}
- if (strcmp(nng_msg_body(msg), buf) != 0) {
- error(c, "mismatched message", NNG_EINTERNAL);
- return;
- }
+static void
+req0_recd(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+
+ msg = nng_aio_get_msg(c->recd);
+ if ((nng_msg_len(msg) != (strlen(c->buf) + 1)) ||
+ (strcmp(c->buf, nng_msg_body(msg)) != 0)) {
+ error(c, "msg mismatch", rv);
nng_msg_free(msg);
+ return;
}
+
+ nng_msg_free(msg);
+ memset(c->buf, 0, sizeof(c->buf));
+
+ c->nrecv++;
+ nng_sleep_aio(rand() % 10, c->woke);
+}
+
+static void
+req0_sent(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ return;
+ }
+ c->nsend++;
+ nng_recv_aio(c->sock, c->recd);
}
void
-reqrep_test(int ntests)
+reqrep0_test(int ntests)
{
test_case *srv, *cli;
int i;
@@ -206,206 +233,296 @@ reqrep_test(int ntests)
}
srv = &cases[curcase++];
- srv->name = "rep";
+ srv->name = "rep0";
- if ((rv = nng_rep0_open(&srv->socket)) != 0) {
+ if ((rv = nng_rep0_open(&srv->sock)) != 0) {
fatal("nng_rep0_open", rv);
}
- if ((rv = nng_thread_create(&srv->thr, rep_server, srv)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->sent, rep0_sent, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->recd, rep0_recd, srv)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
+ nng_recv_aio(srv->sock, srv->recd);
+
for (i = 1; i < ntests; i++) {
- cli = &cases[curcase++];
- if ((rv = nng_req0_open(&cli->socket)) != 0) {
+ cli = &cases[curcase++];
+ cli->name = "req0";
+
+ if (((rv = nng_aio_alloc(&cli->sent, req0_sent, cli)) != 0) ||
+ ((rv = nng_aio_alloc(&cli->recd, req0_recd, cli)) != 0) ||
+ ((rv = nng_aio_alloc(&cli->woke, req0_woke, cli)) != 0)) {
+ fatal("nng_aio_alloc", rv);
+ }
+
+ if ((rv = nng_req0_open(&cli->sock)) != 0) {
fatal("nng_req0_open", rv);
}
- cli->name = "req";
getaddr(addr);
dprintf("DOING reqrep0 (req %u rep %u) address: %s\n",
- cli->socket.id, srv->socket.id, addr);
+ cli->sock.id, srv->sock.id, addr);
- if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_listen(srv->sock, addr, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}
- if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_dial(cli->sock, addr, NULL, 0)) != 0) {
fatal("nng_dial", rv);
}
- if ((rv = nng_thread_create(&cli->thr, req_client, cli)) !=
- 0) {
- fatal("nng_thread_create", rv);
- }
+ nng_sleep_aio(1, cli->woke);
}
}
-void
-pair0_bouncer(void *arg)
+// PAIRv0 test. We just bind two sockets together, and bounce messages at
+// each other. As we don't need to run synchronously, the receive is not
+// linked to the send.
+
+static void
+pair0_recd(void *arg)
{
test_case *c = arg;
- for (;;) {
- nng_msg *msg;
- int r;
- int rv;
-
- nng_msleep(rand() % 10);
- if (nng_clock() > end_time) {
- break;
- }
-
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- error(c, "alloc", rv);
- return;
- }
-
- r = rand();
- if ((rv = nng_msg_append(msg, &r, sizeof(r))) != 0) {
- nng_msg_free(msg);
- error(c, "msg_append", rv);
- return;
- }
+ int rv;
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+ c->nrecv++;
+ nng_msg_free(nng_aio_get_msg(c->recd));
+ nng_recv_aio(c->sock, c->recd);
+}
- if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
+static void
+pair0_woke(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
- if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
+ if ((rv = nng_aio_result(c->woke)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) {
+ error(c, "alloc", rv);
nng_msg_free(msg);
+ return;
}
+ nng_aio_set_msg(c->sent, msg);
+ nng_send_aio(c->sock, c->sent);
+}
+
+static void
+pair0_sent(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ return;
+ }
+ c->nsend++;
+ nng_sleep_aio(rand() % 10, c->woke);
}
void
-pair0_test(void)
+pair0_test(int ntests)
{
test_case *srv, *cli;
char addr[NNG_MAXADDRLEN];
int rv;
+ if (ntests < 2) {
+ return;
+ }
srv = &cases[curcase++];
srv->name = "pair0";
-
- if ((rv = nng_pair0_open(&srv->socket)) != 0) {
- fatal("nng_pair0_open", rv);
- }
-
- if ((rv = nng_thread_create(&srv->thr, pair0_bouncer, srv)) != 0) {
- fatal("nng_thread_create", rv);
- }
-
cli = &cases[curcase++];
cli->name = "pair0";
- if ((rv = nng_pair0_open(&cli->socket)) != 0) {
+ if (((rv = nng_pair0_open(&srv->sock)) != 0) ||
+ ((rv = nng_pair0_open(&cli->sock)) != 0)) {
fatal("nng_pair0_open", rv);
}
- if ((rv = nng_thread_create(&cli->thr, pair0_bouncer, cli)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->sent, pair0_sent, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->recd, pair0_recd, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->woke, pair0_woke, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&cli->sent, pair0_sent, cli)) != 0) ||
+ ((rv = nng_aio_alloc(&cli->recd, pair0_recd, cli)) != 0) ||
+ ((rv = nng_aio_alloc(&cli->woke, pair0_woke, cli)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
getaddr(addr);
- dprintf("DOING pair0 (%u, %u) address: %s\n", cli->socket.id,
- srv->socket.id, addr);
+ dprintf("DOING pair0 (%u, %u) address: %s\n", cli->sock.id,
+ srv->sock.id, addr);
- if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_listen(srv->sock, addr, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}
- if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_dial(cli->sock, addr, NULL, 0)) != 0) {
fatal("nng_dial", rv);
}
+
+ nng_recv_aio(srv->sock, srv->recd);
+ nng_recv_aio(cli->sock, cli->recd);
+ nng_sleep_aio(1, srv->woke);
+ nng_sleep_aio(1, cli->woke);
}
-void
-bus0_bouncer(void *arg)
+// BUSv0 test. We just bind sockets together into a full mesh, and bounce
+// messages at each other. As we don't need to run synchronously, the
+// receive is not linked to the send.
+
+static void
+bus0_recd(void *arg)
{
test_case *c = arg;
- for (;;) {
- nng_msg *msg;
- int r;
- int rv;
-
- nng_msleep(rand() % 10);
- if (nng_clock() > end_time) {
- break;
- }
-
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- error(c, "alloc", rv);
- return;
- }
-
- r = rand();
- if ((rv = nng_msg_append(msg, &r, sizeof(r))) != 0) {
- nng_msg_free(msg);
- error(c, "msg_append", rv);
- return;
- }
+ int rv;
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+ c->nrecv++;
+ nng_msg_free(nng_aio_get_msg(c->recd));
+ nng_recv_aio(c->sock, c->recd);
+}
- if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
+static void
+bus0_woke(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
- if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
+ if ((rv = nng_aio_result(c->woke)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) {
nng_msg_free(msg);
+ error(c, "alloc", rv);
+ return;
}
+ nng_aio_set_msg(c->sent, msg);
+ nng_send_aio(c->sock, c->sent);
}
-// We just do 1:1 for bus for now.
-void
-bus_test(void)
+static void
+bus0_sent(void *arg)
{
- test_case *srv, *cli;
- char addr[NNG_MAXADDRLEN];
+ test_case *c = arg;
int rv;
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ error(c, "send", rv);
+ return;
+ }
+ c->nsend++;
+ nng_sleep_aio(rand() % 10, c->woke);
+}
- srv = &cases[curcase++];
- srv->name = "bus0";
+void
+bus0_test(int ntests)
+{
- if ((rv = nng_bus0_open(&srv->socket)) != 0) {
- fatal("nng_bus0_open", rv);
+ if (ntests < 2) {
+ return;
}
+ for (int i = 0; i < ntests; i++) {
+ test_case *c = &cases[curcase + i];
+ int rv;
- if ((rv = nng_thread_create(&srv->thr, bus0_bouncer, srv)) != 0) {
- fatal("nng_thread_create", rv);
- }
+ getaddr(c->addr);
- cli = &cases[curcase++];
- cli->name = "pair0";
+ c->name = "bus0";
+ if (((rv = nng_aio_alloc(&c->recd, bus0_recd, c)) != 0) ||
+ ((rv = nng_aio_alloc(&c->sent, bus0_sent, c)) != 0) ||
+ ((rv = nng_aio_alloc(&c->woke, bus0_woke, c)) != 0)) {
+ fatal("nng_aio_alloc", rv);
+ }
+ if ((rv = nng_bus0_open(&c->sock)) != 0) {
+ fatal("nng_bus0_open", rv);
+ }
+ if ((rv = nng_listen(c->sock, c->addr, NULL, 0)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ dprintf("DOING bus0 (%u) address: %s\n", c->sock.id, c->addr);
- if ((rv = nng_bus0_open(&cli->socket)) != 0) {
- fatal("nng_bus0_open", rv);
+ // We dial to everyone else who already listened.
+ for (int j = 0; j < i; j++) {
+ rv = nng_dial(
+ c->sock, cases[curcase + j].addr, NULL, 0);
+ if (rv != 0) {
+ fatal("nng_dial", rv);
+ }
+ }
}
- if ((rv = nng_thread_create(&cli->thr, pair0_bouncer, cli)) != 0) {
- fatal("nng_thread_create", rv);
+ for (int i = 0; i < ntests; i++) {
+ test_case *c = &cases[curcase++];
+ nng_recv_aio(c->sock, c->recd);
+ nng_sleep_aio(1, c->woke);
}
+}
- getaddr(addr);
- dprintf("DOING bus0 (%u, %u) address: %s\n", cli->socket.id,
- srv->socket.id, addr);
+void
+pub0_woke(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
- if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
- fatal("nng_listen", rv);
+ if ((rv = nng_aio_result(c->woke)) != 0) {
+ error(c, "sleep", rv);
+ return;
}
- if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
- fatal("nng_dial", rv);
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, "SUB", 4)) != 0)) {
+ nng_msg_free(msg);
+ error(c, "alloc", rv);
+ return;
+ }
+ nng_aio_set_msg(c->sent, msg);
+ nng_send_aio(c->sock, c->sent);
+}
+
+void
+pub0_sent(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ error(c, "send", rv);
+ return;
}
+ c->nsend++;
+ nng_sleep_aio(rand() % 10, c->woke);
+}
+
+void
+sub0_recd(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+ c->nrecv++;
+ nng_msg_free(nng_aio_get_msg(c->recd));
+ nng_aio_set_msg(c->recd, NULL);
+ nng_recv_aio(c->sock, c->recd);
}
void
@@ -432,7 +549,7 @@ pub0_sender(void *arg)
return;
}
- if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) {
+ if ((rv = nng_sendmsg(c->sock, msg, 0)) != 0) {
nng_msg_free(msg);
error(c, "sendmsg", rv);
return;
@@ -442,36 +559,9 @@ pub0_sender(void *arg)
}
void
-sub0_receiver(void *arg)
+pubsub0_test(int ntests)
{
- test_case *c = arg;
- int rv;
-
- if ((rv = nng_setopt(c->socket, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
- error(c, "subscribe", rv);
- return;
- }
- for (;;) {
- nng_msg *msg;
- if (nng_clock() > end_time) {
- break;
- }
-
- if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
- nng_msg_free(msg);
- }
-}
-
-void
-pubsub_test(int ntests)
-{
- test_case *srv, *cli;
- int i;
- char addr[NNG_MAXADDRLEN];
+ test_case *srv;
int rv;
if (ntests < 2) {
@@ -480,39 +570,101 @@ pubsub_test(int ntests)
}
srv = &cases[curcase++];
- srv->name = "pub";
+ srv->name = "pub0";
- if ((rv = nng_pub0_open(&srv->socket)) != 0) {
+ if ((rv = nng_pub0_open(&srv->sock)) != 0) {
fatal("nng_pub0_open", rv);
}
-
- if ((rv = nng_thread_create(&srv->thr, pub0_sender, srv)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->sent, pub0_sent, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->woke, pub0_woke, srv)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
- for (i = 1; i < ntests; i++) {
- cli = &cases[curcase++];
- if ((rv = nng_sub0_open(&cli->socket)) != 0) {
+ for (int i = 1; i < ntests; i++) {
+ test_case *cli = &cases[curcase++];
+
+ cli->name = "sub0";
+ getaddr(cli->addr);
+
+ if ((rv = nng_sub0_open(&cli->sock)) != 0) {
fatal("nng_sub0_open", rv);
}
+ if ((rv = nng_aio_alloc(&cli->recd, sub0_recd, cli)) != 0) {
+ fatal("nng_aio_alloc", rv);
+ }
+ rv = nng_setopt(cli->sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);
+ if (rv != 0) {
+ fatal("subscribe", rv);
+ }
- cli->name = "sub";
- getaddr(addr);
dprintf("DOING pubsub0 (pub %u sub %u) address: %s\n",
- cli->socket.id, srv->socket.id, addr);
+ cli->sock.id, srv->sock.id, cli->addr);
- if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_listen(srv->sock, cli->addr, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}
- if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_dial(cli->sock, cli->addr, NULL, 0)) != 0) {
fatal("nng_dial", rv);
}
- if ((rv = nng_thread_create(&cli->thr, sub0_receiver, cli)) !=
- 0) {
- fatal("nng_thread_create", rv);
- }
+ nng_recv_aio(cli->sock, cli->recd);
}
+
+ nng_sleep_aio(1, srv->woke);
+}
+
+void
+push0_sent(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->sent)) != 0) {
+ nng_msg_free(nng_aio_get_msg(c->sent));
+ nng_aio_set_msg(c->sent, NULL);
+ error(c, "send", rv);
+ return;
+ }
+
+ c->nsend++;
+ nng_sleep_aio(rand() % 10, c->woke);
+}
+
+void
+push0_woke(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+
+ if ((rv = nng_aio_result(c->woke)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) {
+ nng_msg_free(msg);
+ error(c, "alloc", rv);
+ return;
+ }
+ nng_aio_set_msg(c->sent, msg);
+ nng_send_aio(c->sock, c->sent);
+}
+
+void
+pull0_recd(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->recd)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+ c->nrecv++;
+ nng_msg_free(nng_aio_get_msg(c->recd));
+ nng_aio_set_msg(c->recd, NULL);
+ nng_recv_aio(c->sock, c->recd);
}
void
@@ -539,7 +691,7 @@ pipeline0_pusher(void *arg)
return;
}
- if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) {
+ if ((rv = nng_sendmsg(c->sock, msg, 0)) != 0) {
nng_msg_free(msg);
error(c, "sendmsg", rv);
return;
@@ -560,7 +712,7 @@ pipeline0_puller(void *arg)
break;
}
- if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) {
+ if ((rv = nng_recvmsg(c->sock, &msg, 0)) != 0) {
error(c, "recvmsg", rv);
return;
}
@@ -572,52 +724,52 @@ pipeline0_puller(void *arg)
void
pipeline0_test(int ntests)
{
- test_case *srv, *cli;
- int i;
- char addr[NNG_MAXADDRLEN];
+ test_case *srv;
int rv;
- atexit(nng_fini);
-
if (ntests < 2) {
// Need a client *and* a server.
return;
}
srv = &cases[curcase++];
- srv->name = "push";
+ srv->name = "push0";
- if ((rv = nng_push0_open(&srv->socket)) != 0) {
+ if ((rv = nng_push0_open(&srv->sock)) != 0) {
fatal("nng_push0_open", rv);
}
-
- if ((rv = nng_thread_create(&srv->thr, pipeline0_pusher, srv)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->sent, push0_sent, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->woke, push0_woke, srv)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
- for (i = 1; i < ntests; i++) {
- cli = &cases[curcase++];
- if ((rv = nng_pull0_open(&cli->socket)) != 0) {
- fatal("nng_pull0_open", rv);
+ for (int i = 1; i < ntests; i++) {
+ test_case *cli = &cases[curcase++];
+
+ cli->name = "pull0";
+ getaddr(cli->addr);
+
+ if ((rv = nng_pull0_open(&cli->sock)) != 0) {
+ fatal("nng_sub0_open", rv);
+ }
+ if ((rv = nng_aio_alloc(&cli->recd, pull0_recd, cli)) != 0) {
+ fatal("nng_aio_alloc", rv);
}
- cli->name = "pull";
- getaddr(addr);
dprintf("DOING pipeline0 (pull %u push %u) address: %s\n",
- cli->socket.id, srv->socket.id, addr);
+ cli->sock.id, srv->sock.id, cli->addr);
- if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_listen(srv->sock, cli->addr, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}
- if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
+ if ((rv = nng_dial(cli->sock, cli->addr, NULL, 0)) != 0) {
fatal("nng_dial", rv);
}
- if ((rv = nng_thread_create(
- &cli->thr, pipeline0_puller, cli)) != 0) {
- fatal("nng_thread_create", rv);
- }
+ nng_recv_aio(cli->sock, cli->recd);
}
+
+ nng_sleep_aio(1, srv->woke);
}
Main({
@@ -635,7 +787,7 @@ Main({
tmo = 30;
}
// We have to keep this relatively low by default because some
- // platforms don't support large numbers of threads.
+ // platforms have limited resources.
if (((str = ConveyGetEnv("STRESSPRESSURE")) == NULL) ||
((ncases = atoi(str)) < 1)) {
ncases = 32;
@@ -656,18 +808,16 @@ Main({
}
switch (rand() % 5) {
case 0:
- reqrep_test(x);
+ reqrep0_test(x);
break;
case 1:
- pair0_test();
- x = 2; // pair is always 2
+ pair0_test(x);
break;
case 2:
- pubsub_test(x);
+ pubsub0_test(x);
break;
case 3:
- bus_test();
- x = 2; // pair is always 2
+ bus0_test(x);
break;
case 4:
pipeline0_test(x);
@@ -676,29 +826,37 @@ Main({
// that didn't work
break;
}
- i -= x;
+ i = ncases - curcase;
}
dprintf("WAITING for %d sec...\n", tmo);
nng_msleep(tmo * 1000); // sleep 30 sec
+ for (i = 0; i < ncases; i++) {
+ nng_aio_stop(cases[i].woke);
+ }
nng_closeall();
Test("MultiProtocol/Transport Stress", {
Convey("All tests worked", {
for (i = 0; i < ncases; i++) {
- if (cases[i].thr != NULL) {
- nng_thread_destroy(cases[i].thr);
- dprintf(
- "RESULT socket %u (%s) sent %d "
- "recd "
- "%d fail %d\n",
- cases[i].socket.id, cases[i].name,
- cases[i].nsend, cases[i].nrecv,
- cases[i].nfail);
- So(cases[i].nfail == 0);
- So(cases[i].nsend > 0 ||
- cases[i].nrecv > 0);
+ test_case *c = &cases[i];
+ if (c->name == NULL) {
+ break;
}
+ nng_aio_stop(c->sent);
+ nng_aio_stop(c->recd);
+ nng_aio_stop(c->woke);
+ nng_aio_free(c->sent);
+ nng_aio_free(c->recd);
+ nng_aio_free(c->woke);
+
+ dprintf("RESULT socket %u (%s) sent %d "
+ "recd %d fail %d\n",
+ c->sock.id, c->name, c->nsend, c->nrecv,
+ c->nfail);
+ So(c->nfail == 0);
+ So((c->sent == NULL) || (c->nsend > 0));
+ So((c->recd == NULL) || (c->nrecv > 0));
}
});
});
diff --git a/tests/reqstress.c b/tests/reqstress.c
index b06a60c5..1811063b 100644
--- a/tests/reqstress.c
+++ b/tests/reqstress.c
@@ -33,12 +33,11 @@
static int next_port = 20000; // port number kind of.
-char tcp4_template[] = "tcp://127.0.0.1:%d";
-char tcp6_template[] = "tcp://[::1]:%d";
-char inproc_template[] = "inproc://nng_reqstress_%d";
-char ipc_template[] = "ipc:///tmp/nng_reqstress_%d";
-char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress";
-nng_time end_time;
+char tcp4_template[] = "tcp://127.0.0.1:%d";
+char tcp6_template[] = "tcp://[::1]:%d";
+char inproc_template[] = "inproc://nng_reqstress_%d";
+char ipc_template[] = "ipc:///tmp/nng_reqstress_%d";
+char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress";
char *templates[] = {
#ifdef NNG_TRANSPORT_TCP
@@ -69,10 +68,13 @@ int allocaddrs;
typedef struct test_case {
nng_socket socket;
const char *name;
- nng_thread *thr;
int nrecv;
int nsend;
int nfail;
+ nng_aio * recv_aio;
+ nng_aio * send_aio;
+ nng_aio * time_aio;
+ char buf[32];
} test_case;
static test_case *cases;
@@ -89,7 +91,7 @@ fatal(const char *msg, int rv)
void
error(test_case *c, const char *msg, int rv)
{
- if (rv == NNG_ECLOSED) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) {
return;
}
fprintf(
@@ -110,83 +112,105 @@ getaddr(char *buf)
// Request/Reply test. For this test, we open a server socket,
// and bind it to each protocol. Then we run a bunch of clients
-// against it. The
+// against it.
-// Simple rep echo server.
+// REP server implemented via callbacks.
static void
-rep_server(void *arg)
+rep_recv_cb(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_msg * msg;
- nng_socket rep = c->socket;
-
- if (nng_clock() > end_time) {
- break;
- }
+ int rv;
- if ((rv = nng_recvmsg(rep, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
- if ((rv = nng_sendmsg(rep, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
+ if ((rv = nng_aio_result(c->recv_aio)) != 0) {
+ error(c, "recv", rv);
+ return;
}
+ c->nrecv++;
+ nng_aio_set_msg(c->send_aio, nng_aio_get_msg(c->recv_aio));
+ nng_aio_set_msg(c->recv_aio, NULL);
+ nng_send_aio(c->socket, c->send_aio);
}
static void
-req_client(void *arg)
+rep_send_cb(void *arg)
{
test_case *c = arg;
- for (;;) {
- int rv;
- nng_socket req = c->socket;
- int num = 0;
- nng_msg * msg;
- char buf[32];
-
- (void) snprintf(buf, sizeof(buf), "%u-%d", req.id, num++);
-
- nng_msleep(rand() % 10);
- if (nng_clock() > end_time) {
- break;
- }
+ int rv;
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- error(c, "alloc fail", rv);
- return;
- }
- if ((rv = nng_msg_append(msg, buf, strlen(buf) + 1)) != 0) {
- nng_msg_free(msg);
- error(c, "append fail", rv);
- return;
- }
- if ((rv = nng_sendmsg(req, msg, 0)) != 0) {
- nng_msg_free(msg);
- error(c, "sendmsg", rv);
- return;
- }
- c->nsend++;
- if ((rv = nng_recvmsg(req, &msg, 0)) != 0) {
- error(c, "recvmsg", rv);
- return;
- }
- c->nrecv++;
+ if ((rv = nng_aio_result(c->send_aio)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->send_aio));
+ nng_aio_set_msg(c->send_aio, NULL);
+ return;
+ }
+ c->nsend++;
+ nng_recv_aio(c->socket, c->recv_aio);
+}
- if (strcmp(nng_msg_body(msg), buf) != 0) {
- error(c, "mismatched message", NNG_EINTERNAL);
- nng_msg_free(msg);
- return;
- }
+static void
+req_time_cb(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+
+ if ((rv = nng_aio_result(c->time_aio)) != 0) {
+ error(c, "sleep", rv);
+ return;
+ }
+ (void) snprintf(
+ c->buf, sizeof(c->buf), "%u-%d", c->socket.id, c->nsend);
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, c->buf, strlen(c->buf) + 1)) != 0)) {
+ error(c, "alloc", rv);
+ nng_msg_free(msg);
+ return;
+ }
+ nng_aio_set_msg(c->send_aio, msg);
+ nng_send_aio(c->socket, c->send_aio);
+}
+static void
+req_recv_cb(void *arg)
+{
+ test_case *c = arg;
+ nng_msg * msg = NULL;
+ int rv;
+
+ if ((rv = nng_aio_result(c->recv_aio)) != 0) {
+ error(c, "recv", rv);
+ return;
+ }
+
+ msg = nng_aio_get_msg(c->recv_aio);
+ if ((nng_msg_len(msg) != (strlen(c->buf) + 1)) ||
+ (strcmp(c->buf, nng_msg_body(msg)) != 0)) {
+ error(c, "msg mismatch", rv);
nng_msg_free(msg);
+ return;
+ }
+
+ nng_msg_free(msg);
+ memset(c->buf, 0, sizeof(c->buf));
+
+ c->nrecv++;
+ nng_sleep_aio(rand() % 10, c->time_aio);
+}
+
+static void
+req_send_cb(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ if ((rv = nng_aio_result(c->send_aio)) != 0) {
+ error(c, "send", rv);
+ nng_msg_free(nng_aio_get_msg(c->send_aio));
+ nng_aio_set_msg(c->send_aio, NULL);
+ return;
}
+ c->nsend++;
+ nng_recv_aio(c->socket, c->recv_aio);
}
void
@@ -209,12 +233,25 @@ reqrep_test(int ntests)
fatal("nng_rep0_open", rv);
}
- if ((rv = nng_thread_create(&srv->thr, rep_server, srv)) != 0) {
- fatal("nng_thread_create", rv);
+ if (((rv = nng_aio_alloc(&srv->send_aio, rep_send_cb, srv)) != 0) ||
+ ((rv = nng_aio_alloc(&srv->recv_aio, rep_recv_cb, srv)) != 0)) {
+ fatal("nng_aio_alloc", rv);
}
+ nng_recv_aio(srv->socket, srv->recv_aio);
+
for (i = 1; i < ntests; i++) {
cli = &cases[curcase++];
+
+ if (((rv = nng_aio_alloc(&cli->send_aio, req_send_cb, cli)) !=
+ 0) ||
+ ((rv = nng_aio_alloc(&cli->recv_aio, req_recv_cb, cli)) !=
+ 0) ||
+ ((rv = nng_aio_alloc(&cli->time_aio, req_time_cb, cli)) !=
+ 0)) {
+ fatal("nng_aio_alloc", rv);
+ }
+
if ((rv = nng_req0_open(&cli->socket)) != 0) {
fatal("nng_req0_open", rv);
}
@@ -231,10 +268,7 @@ reqrep_test(int ntests)
fatal("nng_dial", rv);
}
- if ((rv = nng_thread_create(&cli->thr, req_client, cli)) !=
- 0) {
- fatal("nng_thread_create", rv);
- }
+ nng_sleep_aio(1, cli->time_aio);
}
}
@@ -250,7 +284,8 @@ Main({
tmo = 30;
}
// We have to keep this relatively low by default because some
- // platforms don't support large numbers of threads.
+ // platforms don't support large numbers of sockets. (On macOS
+ // laptop I can run this with 500 though.)
if (((str = ConveyGetEnv("STRESSPRESSURE")) == NULL) ||
((ncases = atoi(str)) < 1)) {
ncases = 32;
@@ -265,8 +300,7 @@ Main({
i = ncases;
- cases = calloc(ncases, sizeof(test_case));
- end_time = nng_clock() + (tmo * 1000);
+ cases = calloc(ncases, sizeof(test_case));
while (i > 1) {
int x = rand() % NTEMPLATES;
if (x > i) {
@@ -278,17 +312,28 @@ Main({
dprintf("WAITING for %d sec...\n", tmo);
nng_msleep(tmo * 1000); // sleep 30 sec
+
+ // Close the timeouts first, before closing sockets. This tends to
+ // ensure that we complete exchanges.
+ for (i = 0; i < ncases; i++) {
+ nng_aio_stop(cases[i].time_aio);
+ }
+ nng_msleep(100);
nng_closeall();
Test("Req/Rep Stress", {
Convey("All tests worked", {
for (i = 0; i < ncases; i++) {
- if (cases[i].thr != NULL) {
- nng_thread_destroy(cases[i].thr);
+ nng_aio_stop(cases[i].recv_aio);
+ nng_aio_stop(cases[i].send_aio);
+ nng_aio_stop(cases[i].time_aio);
+ nng_aio_free(cases[i].recv_aio);
+ nng_aio_free(cases[i].send_aio);
+ nng_aio_free(cases[i].time_aio);
+ if (cases[i].name != NULL) {
dprintf(
"RESULT socket %u (%s) sent %d "
- "recd "
- "%d fail %d\n",
+ "recd %d fail %d\n",
cases[i].socket.id, cases[i].name,
cases[i].nsend, cases[i].nrecv,
cases[i].nfail);