diff options
| -rw-r--r-- | tests/multistress.c | 714 | ||||
| -rw-r--r-- | tests/reqstress.c | 209 |
2 files changed, 563 insertions, 360 deletions
diff --git a/tests/multistress.c b/tests/multistress.c index cbe16b0f..d39d2896 100644 --- a/tests/multistress.c +++ b/tests/multistress.c @@ -71,12 +71,17 @@ int naddresses; int allocaddrs; typedef struct test_case { - nng_socket socket; + nng_socket sock; const char *name; - nng_thread *thr; int nrecv; int nsend; int nfail; + nng_aio * recd; + nng_aio * sent; + nng_aio * woke; + char addr[NNG_MAXADDRLEN]; + char buf[32]; + } test_case; static test_case *cases; @@ -93,7 +98,7 @@ fatal(const char *msg, int rv) void error(test_case *c, const char *msg, int rv) { - if (rv == NNG_ECLOSED) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) { return; } fprintf( @@ -114,86 +119,108 @@ getaddr(char *buf) // Request/Reply test. For this test, we open a server socket, // and bind it to each protocol. Then we run a bunch of clients -// against it. The +// against it. -// Simple rep echo server. +// REP server implemented via callbacks. static void -rep_server(void *arg) +rep0_recd(void *arg) { test_case *c = arg; - for (;;) { - int rv; - nng_msg * msg; - nng_socket rep = c->socket; - - if (nng_clock() > end_time) { - break; - } + int rv; - if ((rv = nng_recvmsg(rep, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; - if ((rv = nng_sendmsg(rep, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; } + c->nrecv++; + nng_aio_set_msg(c->sent, nng_aio_get_msg(c->recd)); + nng_aio_set_msg(c->recd, NULL); + nng_send_aio(c->sock, c->sent); } static void -req_client(void *arg) +rep0_sent(void *arg) { test_case *c = arg; - for (;;) { - int rv; - nng_socket req = c->socket; - int num = 0; - nng_msg * msg; - char buf[32]; + int rv; - (void) snprintf(buf, sizeof(buf), "%u-%d", req.id, num++); + if ((rv = nng_aio_result(c->sent)) != 0) { + error(c, "send", rv); + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + return; + } + c->nsend++; + nng_recv_aio(c->sock, c->recd); +} - nng_msleep(rand() % 10); - if (nng_clock() > end_time) { - break; - } +static void +req0_woke(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - error(c, "alloc fail", rv); - return; - } - if ((rv = nng_msg_append(msg, buf, strlen(buf) + 1)) != 0) { - nng_msg_free(msg); - error(c, "append fail", rv); - return; - } - if ((rv = nng_sendmsg(req, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; - if ((rv = nng_recvmsg(req, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; + if ((rv = nng_aio_result(c->woke)) != 0) { + error(c, "sleep", rv); + return; + } + (void) snprintf(c->buf, sizeof(c->buf), "%u-%d", c->sock.id, c->nsend); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, c->buf, strlen(c->buf) + 1)) != 0)) { + error(c, "alloc", rv); + nng_msg_free(msg); + return; + } + nng_aio_set_msg(c->sent, msg); + nng_send_aio(c->sock, c->sent); +} - if (strcmp(nng_msg_body(msg), buf) != 0) { - error(c, "mismatched message", NNG_EINTERNAL); - return; - } +static void +req0_recd(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; + } + + msg = nng_aio_get_msg(c->recd); + if ((nng_msg_len(msg) != (strlen(c->buf) + 1)) || + (strcmp(c->buf, nng_msg_body(msg)) != 0)) { + error(c, "msg mismatch", rv); nng_msg_free(msg); + return; } + + nng_msg_free(msg); + memset(c->buf, 0, sizeof(c->buf)); + + c->nrecv++; + nng_sleep_aio(rand() % 10, c->woke); +} + +static void +req0_sent(void *arg) +{ + test_case *c = arg; + int rv; + + if ((rv = nng_aio_result(c->sent)) != 0) { + error(c, "send", rv); + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + return; + } + c->nsend++; + nng_recv_aio(c->sock, c->recd); } void -reqrep_test(int ntests) +reqrep0_test(int ntests) { test_case *srv, *cli; int i; @@ -206,206 +233,296 @@ reqrep_test(int ntests) } srv = &cases[curcase++]; - srv->name = "rep"; + srv->name = "rep0"; - if ((rv = nng_rep0_open(&srv->socket)) != 0) { + if ((rv = nng_rep0_open(&srv->sock)) != 0) { fatal("nng_rep0_open", rv); } - if ((rv = nng_thread_create(&srv->thr, rep_server, srv)) != 0) { - fatal("nng_thread_create", rv); + if (((rv = nng_aio_alloc(&srv->sent, rep0_sent, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->recd, rep0_recd, srv)) != 0)) { + fatal("nng_aio_alloc", rv); } + nng_recv_aio(srv->sock, srv->recd); + for (i = 1; i < ntests; i++) { - cli = &cases[curcase++]; - if ((rv = nng_req0_open(&cli->socket)) != 0) { + cli = &cases[curcase++]; + cli->name = "req0"; + + if (((rv = nng_aio_alloc(&cli->sent, req0_sent, cli)) != 0) || + ((rv = nng_aio_alloc(&cli->recd, req0_recd, cli)) != 0) || + ((rv = nng_aio_alloc(&cli->woke, req0_woke, cli)) != 0)) { + fatal("nng_aio_alloc", rv); + } + + if ((rv = nng_req0_open(&cli->sock)) != 0) { fatal("nng_req0_open", rv); } - cli->name = "req"; getaddr(addr); dprintf("DOING reqrep0 (req %u rep %u) address: %s\n", - cli->socket.id, srv->socket.id, addr); + cli->sock.id, srv->sock.id, addr); - if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_listen(srv->sock, addr, NULL, 0)) != 0) { fatal("nng_listen", rv); } - if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_dial(cli->sock, addr, NULL, 0)) != 0) { fatal("nng_dial", rv); } - if ((rv = nng_thread_create(&cli->thr, req_client, cli)) != - 0) { - fatal("nng_thread_create", rv); - } + nng_sleep_aio(1, cli->woke); } } -void -pair0_bouncer(void *arg) +// PAIRv0 test. We just bind two sockets together, and bounce messages at +// each other. As we don't need to run synchronously, the receive is not +// linked to the send. + +static void +pair0_recd(void *arg) { test_case *c = arg; - for (;;) { - nng_msg *msg; - int r; - int rv; - - nng_msleep(rand() % 10); - if (nng_clock() > end_time) { - break; - } - - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - error(c, "alloc", rv); - return; - } - - r = rand(); - if ((rv = nng_msg_append(msg, &r, sizeof(r))) != 0) { - nng_msg_free(msg); - error(c, "msg_append", rv); - return; - } + int rv; + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; + } + c->nrecv++; + nng_msg_free(nng_aio_get_msg(c->recd)); + nng_recv_aio(c->sock, c->recd); +} - if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; +static void +pair0_woke(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; - if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; + if ((rv = nng_aio_result(c->woke)) != 0) { + error(c, "sleep", rv); + return; + } + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) { + error(c, "alloc", rv); nng_msg_free(msg); + return; } + nng_aio_set_msg(c->sent, msg); + nng_send_aio(c->sock, c->sent); +} + +static void +pair0_sent(void *arg) +{ + test_case *c = arg; + int rv; + if ((rv = nng_aio_result(c->sent)) != 0) { + error(c, "send", rv); + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + return; + } + c->nsend++; + nng_sleep_aio(rand() % 10, c->woke); } void -pair0_test(void) +pair0_test(int ntests) { test_case *srv, *cli; char addr[NNG_MAXADDRLEN]; int rv; + if (ntests < 2) { + return; + } srv = &cases[curcase++]; srv->name = "pair0"; - - if ((rv = nng_pair0_open(&srv->socket)) != 0) { - fatal("nng_pair0_open", rv); - } - - if ((rv = nng_thread_create(&srv->thr, pair0_bouncer, srv)) != 0) { - fatal("nng_thread_create", rv); - } - cli = &cases[curcase++]; cli->name = "pair0"; - if ((rv = nng_pair0_open(&cli->socket)) != 0) { + if (((rv = nng_pair0_open(&srv->sock)) != 0) || + ((rv = nng_pair0_open(&cli->sock)) != 0)) { fatal("nng_pair0_open", rv); } - if ((rv = nng_thread_create(&cli->thr, pair0_bouncer, cli)) != 0) { - fatal("nng_thread_create", rv); + if (((rv = nng_aio_alloc(&srv->sent, pair0_sent, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->recd, pair0_recd, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->woke, pair0_woke, srv)) != 0) || + ((rv = nng_aio_alloc(&cli->sent, pair0_sent, cli)) != 0) || + ((rv = nng_aio_alloc(&cli->recd, pair0_recd, cli)) != 0) || + ((rv = nng_aio_alloc(&cli->woke, pair0_woke, cli)) != 0)) { + fatal("nng_aio_alloc", rv); } getaddr(addr); - dprintf("DOING pair0 (%u, %u) address: %s\n", cli->socket.id, - srv->socket.id, addr); + dprintf("DOING pair0 (%u, %u) address: %s\n", cli->sock.id, + srv->sock.id, addr); - if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_listen(srv->sock, addr, NULL, 0)) != 0) { fatal("nng_listen", rv); } - if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_dial(cli->sock, addr, NULL, 0)) != 0) { fatal("nng_dial", rv); } + + nng_recv_aio(srv->sock, srv->recd); + nng_recv_aio(cli->sock, cli->recd); + nng_sleep_aio(1, srv->woke); + nng_sleep_aio(1, cli->woke); } -void -bus0_bouncer(void *arg) +// BUSv0 test. We just bind sockets together into a full mesh, and bounce +// messages at each other. As we don't need to run synchronously, the +// receive is not linked to the send. + +static void +bus0_recd(void *arg) { test_case *c = arg; - for (;;) { - nng_msg *msg; - int r; - int rv; - - nng_msleep(rand() % 10); - if (nng_clock() > end_time) { - break; - } - - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - error(c, "alloc", rv); - return; - } - - r = rand(); - if ((rv = nng_msg_append(msg, &r, sizeof(r))) != 0) { - nng_msg_free(msg); - error(c, "msg_append", rv); - return; - } + int rv; + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; + } + c->nrecv++; + nng_msg_free(nng_aio_get_msg(c->recd)); + nng_recv_aio(c->sock, c->recd); +} - if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; +static void +bus0_woke(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; - if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; + if ((rv = nng_aio_result(c->woke)) != 0) { + error(c, "sleep", rv); + return; + } + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) { nng_msg_free(msg); + error(c, "alloc", rv); + return; } + nng_aio_set_msg(c->sent, msg); + nng_send_aio(c->sock, c->sent); } -// We just do 1:1 for bus for now. -void -bus_test(void) +static void +bus0_sent(void *arg) { - test_case *srv, *cli; - char addr[NNG_MAXADDRLEN]; + test_case *c = arg; int rv; + if ((rv = nng_aio_result(c->sent)) != 0) { + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + error(c, "send", rv); + return; + } + c->nsend++; + nng_sleep_aio(rand() % 10, c->woke); +} - srv = &cases[curcase++]; - srv->name = "bus0"; +void +bus0_test(int ntests) +{ - if ((rv = nng_bus0_open(&srv->socket)) != 0) { - fatal("nng_bus0_open", rv); + if (ntests < 2) { + return; } + for (int i = 0; i < ntests; i++) { + test_case *c = &cases[curcase + i]; + int rv; - if ((rv = nng_thread_create(&srv->thr, bus0_bouncer, srv)) != 0) { - fatal("nng_thread_create", rv); - } + getaddr(c->addr); - cli = &cases[curcase++]; - cli->name = "pair0"; + c->name = "bus0"; + if (((rv = nng_aio_alloc(&c->recd, bus0_recd, c)) != 0) || + ((rv = nng_aio_alloc(&c->sent, bus0_sent, c)) != 0) || + ((rv = nng_aio_alloc(&c->woke, bus0_woke, c)) != 0)) { + fatal("nng_aio_alloc", rv); + } + if ((rv = nng_bus0_open(&c->sock)) != 0) { + fatal("nng_bus0_open", rv); + } + if ((rv = nng_listen(c->sock, c->addr, NULL, 0)) != 0) { + fatal("nng_listen", rv); + } + dprintf("DOING bus0 (%u) address: %s\n", c->sock.id, c->addr); - if ((rv = nng_bus0_open(&cli->socket)) != 0) { - fatal("nng_bus0_open", rv); + // We dial to everyone else who already listened. + for (int j = 0; j < i; j++) { + rv = nng_dial( + c->sock, cases[curcase + j].addr, NULL, 0); + if (rv != 0) { + fatal("nng_dial", rv); + } + } } - if ((rv = nng_thread_create(&cli->thr, pair0_bouncer, cli)) != 0) { - fatal("nng_thread_create", rv); + for (int i = 0; i < ntests; i++) { + test_case *c = &cases[curcase++]; + nng_recv_aio(c->sock, c->recd); + nng_sleep_aio(1, c->woke); } +} - getaddr(addr); - dprintf("DOING bus0 (%u, %u) address: %s\n", cli->socket.id, - srv->socket.id, addr); +void +pub0_woke(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; - if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) { - fatal("nng_listen", rv); + if ((rv = nng_aio_result(c->woke)) != 0) { + error(c, "sleep", rv); + return; } - if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) { - fatal("nng_dial", rv); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, "SUB", 4)) != 0)) { + nng_msg_free(msg); + error(c, "alloc", rv); + return; + } + nng_aio_set_msg(c->sent, msg); + nng_send_aio(c->sock, c->sent); +} + +void +pub0_sent(void *arg) +{ + test_case *c = arg; + int rv; + if ((rv = nng_aio_result(c->sent)) != 0) { + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + error(c, "send", rv); + return; } + c->nsend++; + nng_sleep_aio(rand() % 10, c->woke); +} + +void +sub0_recd(void *arg) +{ + test_case *c = arg; + int rv; + + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; + } + c->nrecv++; + nng_msg_free(nng_aio_get_msg(c->recd)); + nng_aio_set_msg(c->recd, NULL); + nng_recv_aio(c->sock, c->recd); } void @@ -432,7 +549,7 @@ pub0_sender(void *arg) return; } - if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) { + if ((rv = nng_sendmsg(c->sock, msg, 0)) != 0) { nng_msg_free(msg); error(c, "sendmsg", rv); return; @@ -442,36 +559,9 @@ pub0_sender(void *arg) } void -sub0_receiver(void *arg) +pubsub0_test(int ntests) { - test_case *c = arg; - int rv; - - if ((rv = nng_setopt(c->socket, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { - error(c, "subscribe", rv); - return; - } - for (;;) { - nng_msg *msg; - if (nng_clock() > end_time) { - break; - } - - if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; - nng_msg_free(msg); - } -} - -void -pubsub_test(int ntests) -{ - test_case *srv, *cli; - int i; - char addr[NNG_MAXADDRLEN]; + test_case *srv; int rv; if (ntests < 2) { @@ -480,39 +570,101 @@ pubsub_test(int ntests) } srv = &cases[curcase++]; - srv->name = "pub"; + srv->name = "pub0"; - if ((rv = nng_pub0_open(&srv->socket)) != 0) { + if ((rv = nng_pub0_open(&srv->sock)) != 0) { fatal("nng_pub0_open", rv); } - - if ((rv = nng_thread_create(&srv->thr, pub0_sender, srv)) != 0) { - fatal("nng_thread_create", rv); + if (((rv = nng_aio_alloc(&srv->sent, pub0_sent, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->woke, pub0_woke, srv)) != 0)) { + fatal("nng_aio_alloc", rv); } - for (i = 1; i < ntests; i++) { - cli = &cases[curcase++]; - if ((rv = nng_sub0_open(&cli->socket)) != 0) { + for (int i = 1; i < ntests; i++) { + test_case *cli = &cases[curcase++]; + + cli->name = "sub0"; + getaddr(cli->addr); + + if ((rv = nng_sub0_open(&cli->sock)) != 0) { fatal("nng_sub0_open", rv); } + if ((rv = nng_aio_alloc(&cli->recd, sub0_recd, cli)) != 0) { + fatal("nng_aio_alloc", rv); + } + rv = nng_setopt(cli->sock, NNG_OPT_SUB_SUBSCRIBE, "", 0); + if (rv != 0) { + fatal("subscribe", rv); + } - cli->name = "sub"; - getaddr(addr); dprintf("DOING pubsub0 (pub %u sub %u) address: %s\n", - cli->socket.id, srv->socket.id, addr); + cli->sock.id, srv->sock.id, cli->addr); - if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_listen(srv->sock, cli->addr, NULL, 0)) != 0) { fatal("nng_listen", rv); } - if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_dial(cli->sock, cli->addr, NULL, 0)) != 0) { fatal("nng_dial", rv); } - if ((rv = nng_thread_create(&cli->thr, sub0_receiver, cli)) != - 0) { - fatal("nng_thread_create", rv); - } + nng_recv_aio(cli->sock, cli->recd); } + + nng_sleep_aio(1, srv->woke); +} + +void +push0_sent(void *arg) +{ + test_case *c = arg; + int rv; + + if ((rv = nng_aio_result(c->sent)) != 0) { + nng_msg_free(nng_aio_get_msg(c->sent)); + nng_aio_set_msg(c->sent, NULL); + error(c, "send", rv); + return; + } + + c->nsend++; + nng_sleep_aio(rand() % 10, c->woke); +} + +void +push0_woke(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; + + if ((rv = nng_aio_result(c->woke)) != 0) { + error(c, "sleep", rv); + return; + } + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append_u32(msg, (unsigned) rand())) != 0)) { + nng_msg_free(msg); + error(c, "alloc", rv); + return; + } + nng_aio_set_msg(c->sent, msg); + nng_send_aio(c->sock, c->sent); +} + +void +pull0_recd(void *arg) +{ + test_case *c = arg; + int rv; + + if ((rv = nng_aio_result(c->recd)) != 0) { + error(c, "recv", rv); + return; + } + c->nrecv++; + nng_msg_free(nng_aio_get_msg(c->recd)); + nng_aio_set_msg(c->recd, NULL); + nng_recv_aio(c->sock, c->recd); } void @@ -539,7 +691,7 @@ pipeline0_pusher(void *arg) return; } - if ((rv = nng_sendmsg(c->socket, msg, 0)) != 0) { + if ((rv = nng_sendmsg(c->sock, msg, 0)) != 0) { nng_msg_free(msg); error(c, "sendmsg", rv); return; @@ -560,7 +712,7 @@ pipeline0_puller(void *arg) break; } - if ((rv = nng_recvmsg(c->socket, &msg, 0)) != 0) { + if ((rv = nng_recvmsg(c->sock, &msg, 0)) != 0) { error(c, "recvmsg", rv); return; } @@ -572,52 +724,52 @@ pipeline0_puller(void *arg) void pipeline0_test(int ntests) { - test_case *srv, *cli; - int i; - char addr[NNG_MAXADDRLEN]; + test_case *srv; int rv; - atexit(nng_fini); - if (ntests < 2) { // Need a client *and* a server. return; } srv = &cases[curcase++]; - srv->name = "push"; + srv->name = "push0"; - if ((rv = nng_push0_open(&srv->socket)) != 0) { + if ((rv = nng_push0_open(&srv->sock)) != 0) { fatal("nng_push0_open", rv); } - - if ((rv = nng_thread_create(&srv->thr, pipeline0_pusher, srv)) != 0) { - fatal("nng_thread_create", rv); + if (((rv = nng_aio_alloc(&srv->sent, push0_sent, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->woke, push0_woke, srv)) != 0)) { + fatal("nng_aio_alloc", rv); } - for (i = 1; i < ntests; i++) { - cli = &cases[curcase++]; - if ((rv = nng_pull0_open(&cli->socket)) != 0) { - fatal("nng_pull0_open", rv); + for (int i = 1; i < ntests; i++) { + test_case *cli = &cases[curcase++]; + + cli->name = "pull0"; + getaddr(cli->addr); + + if ((rv = nng_pull0_open(&cli->sock)) != 0) { + fatal("nng_sub0_open", rv); + } + if ((rv = nng_aio_alloc(&cli->recd, pull0_recd, cli)) != 0) { + fatal("nng_aio_alloc", rv); } - cli->name = "pull"; - getaddr(addr); dprintf("DOING pipeline0 (pull %u push %u) address: %s\n", - cli->socket.id, srv->socket.id, addr); + cli->sock.id, srv->sock.id, cli->addr); - if ((rv = nng_listen(srv->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_listen(srv->sock, cli->addr, NULL, 0)) != 0) { fatal("nng_listen", rv); } - if ((rv = nng_dial(cli->socket, addr, NULL, 0)) != 0) { + if ((rv = nng_dial(cli->sock, cli->addr, NULL, 0)) != 0) { fatal("nng_dial", rv); } - if ((rv = nng_thread_create( - &cli->thr, pipeline0_puller, cli)) != 0) { - fatal("nng_thread_create", rv); - } + nng_recv_aio(cli->sock, cli->recd); } + + nng_sleep_aio(1, srv->woke); } Main({ @@ -635,7 +787,7 @@ Main({ tmo = 30; } // We have to keep this relatively low by default because some - // platforms don't support large numbers of threads. + // platforms have limited resources. if (((str = ConveyGetEnv("STRESSPRESSURE")) == NULL) || ((ncases = atoi(str)) < 1)) { ncases = 32; @@ -656,18 +808,16 @@ Main({ } switch (rand() % 5) { case 0: - reqrep_test(x); + reqrep0_test(x); break; case 1: - pair0_test(); - x = 2; // pair is always 2 + pair0_test(x); break; case 2: - pubsub_test(x); + pubsub0_test(x); break; case 3: - bus_test(); - x = 2; // pair is always 2 + bus0_test(x); break; case 4: pipeline0_test(x); @@ -676,29 +826,37 @@ Main({ // that didn't work break; } - i -= x; + i = ncases - curcase; } dprintf("WAITING for %d sec...\n", tmo); nng_msleep(tmo * 1000); // sleep 30 sec + for (i = 0; i < ncases; i++) { + nng_aio_stop(cases[i].woke); + } nng_closeall(); Test("MultiProtocol/Transport Stress", { Convey("All tests worked", { for (i = 0; i < ncases; i++) { - if (cases[i].thr != NULL) { - nng_thread_destroy(cases[i].thr); - dprintf( - "RESULT socket %u (%s) sent %d " - "recd " - "%d fail %d\n", - cases[i].socket.id, cases[i].name, - cases[i].nsend, cases[i].nrecv, - cases[i].nfail); - So(cases[i].nfail == 0); - So(cases[i].nsend > 0 || - cases[i].nrecv > 0); + test_case *c = &cases[i]; + if (c->name == NULL) { + break; } + nng_aio_stop(c->sent); + nng_aio_stop(c->recd); + nng_aio_stop(c->woke); + nng_aio_free(c->sent); + nng_aio_free(c->recd); + nng_aio_free(c->woke); + + dprintf("RESULT socket %u (%s) sent %d " + "recd %d fail %d\n", + c->sock.id, c->name, c->nsend, c->nrecv, + c->nfail); + So(c->nfail == 0); + So((c->sent == NULL) || (c->nsend > 0)); + So((c->recd == NULL) || (c->nrecv > 0)); } }); }); diff --git a/tests/reqstress.c b/tests/reqstress.c index b06a60c5..1811063b 100644 --- a/tests/reqstress.c +++ b/tests/reqstress.c @@ -33,12 +33,11 @@ static int next_port = 20000; // port number kind of. -char tcp4_template[] = "tcp://127.0.0.1:%d"; -char tcp6_template[] = "tcp://[::1]:%d"; -char inproc_template[] = "inproc://nng_reqstress_%d"; -char ipc_template[] = "ipc:///tmp/nng_reqstress_%d"; -char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress"; -nng_time end_time; +char tcp4_template[] = "tcp://127.0.0.1:%d"; +char tcp6_template[] = "tcp://[::1]:%d"; +char inproc_template[] = "inproc://nng_reqstress_%d"; +char ipc_template[] = "ipc:///tmp/nng_reqstress_%d"; +char ws_template[] = "ws://127.0.0.1:%d/nng_reqstress"; char *templates[] = { #ifdef NNG_TRANSPORT_TCP @@ -69,10 +68,13 @@ int allocaddrs; typedef struct test_case { nng_socket socket; const char *name; - nng_thread *thr; int nrecv; int nsend; int nfail; + nng_aio * recv_aio; + nng_aio * send_aio; + nng_aio * time_aio; + char buf[32]; } test_case; static test_case *cases; @@ -89,7 +91,7 @@ fatal(const char *msg, int rv) void error(test_case *c, const char *msg, int rv) { - if (rv == NNG_ECLOSED) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED)) { return; } fprintf( @@ -110,83 +112,105 @@ getaddr(char *buf) // Request/Reply test. For this test, we open a server socket, // and bind it to each protocol. Then we run a bunch of clients -// against it. The +// against it. -// Simple rep echo server. +// REP server implemented via callbacks. static void -rep_server(void *arg) +rep_recv_cb(void *arg) { test_case *c = arg; - for (;;) { - int rv; - nng_msg * msg; - nng_socket rep = c->socket; - - if (nng_clock() > end_time) { - break; - } + int rv; - if ((rv = nng_recvmsg(rep, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; - if ((rv = nng_sendmsg(rep, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; + if ((rv = nng_aio_result(c->recv_aio)) != 0) { + error(c, "recv", rv); + return; } + c->nrecv++; + nng_aio_set_msg(c->send_aio, nng_aio_get_msg(c->recv_aio)); + nng_aio_set_msg(c->recv_aio, NULL); + nng_send_aio(c->socket, c->send_aio); } static void -req_client(void *arg) +rep_send_cb(void *arg) { test_case *c = arg; - for (;;) { - int rv; - nng_socket req = c->socket; - int num = 0; - nng_msg * msg; - char buf[32]; - - (void) snprintf(buf, sizeof(buf), "%u-%d", req.id, num++); - - nng_msleep(rand() % 10); - if (nng_clock() > end_time) { - break; - } + int rv; - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - error(c, "alloc fail", rv); - return; - } - if ((rv = nng_msg_append(msg, buf, strlen(buf) + 1)) != 0) { - nng_msg_free(msg); - error(c, "append fail", rv); - return; - } - if ((rv = nng_sendmsg(req, msg, 0)) != 0) { - nng_msg_free(msg); - error(c, "sendmsg", rv); - return; - } - c->nsend++; - if ((rv = nng_recvmsg(req, &msg, 0)) != 0) { - error(c, "recvmsg", rv); - return; - } - c->nrecv++; + if ((rv = nng_aio_result(c->send_aio)) != 0) { + error(c, "send", rv); + nng_msg_free(nng_aio_get_msg(c->send_aio)); + nng_aio_set_msg(c->send_aio, NULL); + return; + } + c->nsend++; + nng_recv_aio(c->socket, c->recv_aio); +} - if (strcmp(nng_msg_body(msg), buf) != 0) { - error(c, "mismatched message", NNG_EINTERNAL); - nng_msg_free(msg); - return; - } +static void +req_time_cb(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; + + if ((rv = nng_aio_result(c->time_aio)) != 0) { + error(c, "sleep", rv); + return; + } + (void) snprintf( + c->buf, sizeof(c->buf), "%u-%d", c->socket.id, c->nsend); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, c->buf, strlen(c->buf) + 1)) != 0)) { + error(c, "alloc", rv); + nng_msg_free(msg); + return; + } + nng_aio_set_msg(c->send_aio, msg); + nng_send_aio(c->socket, c->send_aio); +} +static void +req_recv_cb(void *arg) +{ + test_case *c = arg; + nng_msg * msg = NULL; + int rv; + + if ((rv = nng_aio_result(c->recv_aio)) != 0) { + error(c, "recv", rv); + return; + } + + msg = nng_aio_get_msg(c->recv_aio); + if ((nng_msg_len(msg) != (strlen(c->buf) + 1)) || + (strcmp(c->buf, nng_msg_body(msg)) != 0)) { + error(c, "msg mismatch", rv); nng_msg_free(msg); + return; + } + + nng_msg_free(msg); + memset(c->buf, 0, sizeof(c->buf)); + + c->nrecv++; + nng_sleep_aio(rand() % 10, c->time_aio); +} + +static void +req_send_cb(void *arg) +{ + test_case *c = arg; + int rv; + + if ((rv = nng_aio_result(c->send_aio)) != 0) { + error(c, "send", rv); + nng_msg_free(nng_aio_get_msg(c->send_aio)); + nng_aio_set_msg(c->send_aio, NULL); + return; } + c->nsend++; + nng_recv_aio(c->socket, c->recv_aio); } void @@ -209,12 +233,25 @@ reqrep_test(int ntests) fatal("nng_rep0_open", rv); } - if ((rv = nng_thread_create(&srv->thr, rep_server, srv)) != 0) { - fatal("nng_thread_create", rv); + if (((rv = nng_aio_alloc(&srv->send_aio, rep_send_cb, srv)) != 0) || + ((rv = nng_aio_alloc(&srv->recv_aio, rep_recv_cb, srv)) != 0)) { + fatal("nng_aio_alloc", rv); } + nng_recv_aio(srv->socket, srv->recv_aio); + for (i = 1; i < ntests; i++) { cli = &cases[curcase++]; + + if (((rv = nng_aio_alloc(&cli->send_aio, req_send_cb, cli)) != + 0) || + ((rv = nng_aio_alloc(&cli->recv_aio, req_recv_cb, cli)) != + 0) || + ((rv = nng_aio_alloc(&cli->time_aio, req_time_cb, cli)) != + 0)) { + fatal("nng_aio_alloc", rv); + } + if ((rv = nng_req0_open(&cli->socket)) != 0) { fatal("nng_req0_open", rv); } @@ -231,10 +268,7 @@ reqrep_test(int ntests) fatal("nng_dial", rv); } - if ((rv = nng_thread_create(&cli->thr, req_client, cli)) != - 0) { - fatal("nng_thread_create", rv); - } + nng_sleep_aio(1, cli->time_aio); } } @@ -250,7 +284,8 @@ Main({ tmo = 30; } // We have to keep this relatively low by default because some - // platforms don't support large numbers of threads. + // platforms don't support large numbers of sockets. (On macOS + // laptop I can run this with 500 though.) if (((str = ConveyGetEnv("STRESSPRESSURE")) == NULL) || ((ncases = atoi(str)) < 1)) { ncases = 32; @@ -265,8 +300,7 @@ Main({ i = ncases; - cases = calloc(ncases, sizeof(test_case)); - end_time = nng_clock() + (tmo * 1000); + cases = calloc(ncases, sizeof(test_case)); while (i > 1) { int x = rand() % NTEMPLATES; if (x > i) { @@ -278,17 +312,28 @@ Main({ dprintf("WAITING for %d sec...\n", tmo); nng_msleep(tmo * 1000); // sleep 30 sec + + // Close the timeouts first, before closing sockets. This tends to + // ensure that we complete exchanges. + for (i = 0; i < ncases; i++) { + nng_aio_stop(cases[i].time_aio); + } + nng_msleep(100); nng_closeall(); Test("Req/Rep Stress", { Convey("All tests worked", { for (i = 0; i < ncases; i++) { - if (cases[i].thr != NULL) { - nng_thread_destroy(cases[i].thr); + nng_aio_stop(cases[i].recv_aio); + nng_aio_stop(cases[i].send_aio); + nng_aio_stop(cases[i].time_aio); + nng_aio_free(cases[i].recv_aio); + nng_aio_free(cases[i].send_aio); + nng_aio_free(cases[i].time_aio); + if (cases[i].name != NULL) { dprintf( "RESULT socket %u (%s) sent %d " - "recd " - "%d fail %d\n", + "recd %d fail %d\n", cases[i].socket.id, cases[i].name, cases[i].nsend, cases[i].nrecv, cases[i].nfail); |
