aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-06 13:35:27 -0800
committerGarrett D'Amore <garrett@damore.org>2017-11-06 13:35:27 -0800
commitac6019bfabac887274fb9d8b2a167df940ba6121 (patch)
tree06706a623de68b5a9bf604ae5a2cdc33e26a2e59 /tests
parentfb68c81e2c81deb70a9b6760fa8db9efb30b665e (diff)
downloadnng-ac6019bfabac887274fb9d8b2a167df940ba6121.tar.gz
nng-ac6019bfabac887274fb9d8b2a167df940ba6121.tar.bz2
nng-ac6019bfabac887274fb9d8b2a167df940ba6121.zip
Add pipeline test to the multistress test.
Diffstat (limited to 'tests')
-rw-r--r--tests/multistress.c105
1 files changed, 101 insertions, 4 deletions
diff --git a/tests/multistress.c b/tests/multistress.c
index 26bc8a6d..7088ac14 100644
--- a/tests/multistress.c
+++ b/tests/multistress.c
@@ -209,7 +209,7 @@ reqrep_test(int ntests)
cli->name = "req";
getaddr(addr);
- dprintf("DOING reqrep (req %d rep %d) address: %s\n",
+ dprintf("DOING reqrep0 (req %d rep %d) address: %s\n",
cli->socket, srv->socket, addr);
if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
@@ -465,7 +465,7 @@ pubsub_test(int ntests)
cli->name = "sub";
getaddr(addr);
- dprintf("DOING pubsub (pub %d sub %d) address: %s\n",
+ dprintf("DOING pubsub0 (pub %d sub %d) address: %s\n",
cli->socket, srv->socket, addr);
if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
@@ -482,6 +482,101 @@ pubsub_test(int ntests)
}
}
+void
+pipeline0_pusher(void *arg)
+{
+ test_case *c = arg;
+ for (;;) {
+ nng_msg *msg;
+ int rv;
+
+ nng_msleep(rand() % 10);
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ error(c, "alloc", rv);
+ return;
+ }
+
+ if ((rv = nng_msg_append(msg, "PUSH", 5)) != 0) {
+ error(c, "msg_append", rv);
+ return;
+ }
+
+ if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) {
+ error(c, "sendmsg", rv);
+ return;
+ }
+ c->nsend++;
+ }
+}
+
+void
+pipeline0_puller(void *arg)
+{
+ test_case *c = arg;
+ int rv;
+
+ for (;;) {
+ nng_msg *msg;
+
+ if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) {
+ error(c, "recvmsg", rv);
+ return;
+ }
+ c->nrecv++;
+ nng_msg_free(msg);
+ }
+}
+
+void
+pipeline0_test(int ntests)
+{
+ test_case *srv, *cli;
+ int i;
+ char addr[NNG_MAXADDRLEN];
+ int rv;
+
+ if (ntests < 2) {
+ // Need a client *and* a server.
+ return;
+ }
+
+ srv = &cases[curcase++];
+ srv->name = "push";
+
+ if ((rv = nng_push0_open(&srv->socket)) != 0) {
+ fatal("nng_push0_open", rv);
+ }
+
+ if ((rv = nng_thread_create(&srv->thr, pipeline0_pusher, srv)) != 0) {
+ fatal("nng_thread_create", rv);
+ }
+
+ for (i = 1; i < ntests; i++) {
+ cli = &cases[curcase++];
+ if ((rv = nng_pull0_open(&cli->socket)) != 0) {
+ fatal("nng_pull0_open", rv);
+ }
+
+ cli->name = "pull";
+ getaddr(addr);
+ dprintf("DOING pipeline0 (pull %d push %d) address: %s\n",
+ cli->socket, srv->socket, addr);
+
+ if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) {
+ fatal("nng_dial", rv);
+ }
+
+ if ((rv = nng_thread_create(
+ &cli->thr, pipeline0_puller, cli)) != 0) {
+ fatal("nng_thread_create", rv);
+ }
+ }
+}
+
Main({
int i;
@@ -504,7 +599,7 @@ Main({
if (x > i) {
x = i;
}
- switch (rand() % 4) {
+ switch (rand() % 5) {
case 0:
reqrep_test(x);
break;
@@ -519,7 +614,9 @@ Main({
bus_test();
x = 2; // pair is always 2
break;
-
+ case 4:
+ pipeline0_test(x);
+ break;
default:
// that didn't work
break;