From a2c3aae5303b698354fee8a0ebb455791879e367 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 17 Jul 2017 11:09:14 -0700 Subject: Scalability test fixes. This fixes a potential nasty bug associated with the objhash table resizing, and rewrites the scalability test to use just a single thread handling some 2000 client sockets. This proves that the framework can deal with vast numbers of sockets, regardless of the supported number of operating system threads. --- tests/scalability.c | 210 ++++++++++++++++++++++++---------------------------- 1 file changed, 98 insertions(+), 112 deletions(-) (limited to 'tests/scalability.c') diff --git a/tests/scalability.c b/tests/scalability.c index 0999dc78..6a11ec44 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -12,154 +12,140 @@ #include -static int count = 1; -static int nthrs = 100; -static char *addr = "inproc:///atscale"; +static int nclients = 2000; +static char *addr = "inproc:///atscale"; -static void -client(void *arg) +void +serve(void *arg) { - int *result = arg; - nng_socket s; - int rv; - uint64_t timeo; - nng_msg *msg; - int i; - - *result = 0; + nng_socket rep = *(nng_socket *) arg; + nng_msg * msg; - if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) { - *result = rv; - return; - } + for (;;) { + if (nng_recvmsg(rep, &msg, 0) != 0) { + nng_close(rep); + return; + } - if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) { - *result = rv; - nng_close(s); - return; + if (nng_sendmsg(rep, msg, 0) != 0) { + nng_close(rep); + return; + } } +} - timeo = 4000000; // 4 seconds - if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) || - ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) { - *result = rv; - nng_close(s); - return; +int +openclients(nng_socket *clients, int num) +{ + int rv; + int i; + for (i = 0; i < num; i++) { + if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) { + printf("open #%d: %s\n", i, nng_strerror(rv)); + return (rv); + } + rv = nng_dial(clients[i], addr, NULL, NNG_FLAG_SYNCH); + if (rv != 0) { + printf("dial #%d: %s\n", i, nng_strerror(rv)); + return (rv); + } } + return (0); +} - // Sleep for up to a 10ms before issuing requests to avoid saturating - // the CPU with bazillions of requests at the same time. - nng_usleep(10000); - - for (i = 0; i < count; i++) { - // Sleep for up to a 1ms before issuing requests to - // avoid saturating the CPU with bazillions of requests at - // the same time. - nng_usleep(rand() % 1000); +int +sendreqs(nng_socket *clients, int num) +{ + nng_msg *msg; + int rv; + int i; + for (i = 0; i < num; i++) { - // Reusing the same message causes problems as a result of - // header reuse. if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - *result = rv; - nng_close(s); - return; + printf("alloc #%d: %s\n", i, nng_strerror(rv)); + return (rv); } - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - *result = rv; + if ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) { nng_msg_free(msg); - nng_close(s); - return; - } - - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - *result = rv; - nng_close(s); - return; + printf("sendmsg #%d: %s", i, nng_strerror(rv)); + return (rv); } - - nng_msg_free(msg); } - - nng_close(s); - *result = 0; + return (0); } -void -serve(void *arg) +int +recvreps(nng_socket *clients, int num) { - nng_socket rep = *(nng_socket *)arg; nng_msg *msg; + int rv; + int i; + for (i = 0; i < num; i++) { - for (;;) { - if (nng_recvmsg(rep, &msg, 0) != 0) { - nng_close(rep); - return; + if ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0) { + printf("sendmsg #%d: %s", i, nng_strerror(rv)); + return (rv); } + nng_msg_free(msg); + } + return (0); +} - if (nng_sendmsg(rep, msg, 0) != 0) { - nng_close(rep); - return; +void +closeclients(nng_socket *clients, int num) +{ + int i; + for (i = 0; i < num; i++) { + if (clients[i] > 0) { + nng_close(clients[i]); } } } Main({ - int rv; - void **clients; - void *server; - int *results; + int rv; + nng_socket *clients; + void * server; + int * results; - clients = calloc(nthrs, sizeof (void *)); - results = calloc(nthrs, sizeof (int)); + clients = calloc(nclients, sizeof(nng_socket)); + results = calloc(nclients, sizeof(int)); Test("Scalability", { - Convey("Given a server socket", { nng_socket rep; - int depth = 256; + int depth = 256; So(nng_open(&rep, NNG_PROTO_REP) == 0); - - Reset({ - nng_close(rep); - }) - - So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0); - So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0); + So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, + sizeof(depth)) == 0); + So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, + sizeof(depth)) == 0); So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_thread_create(&server, serve, &rep) == 0); - nng_usleep(100000); - - Convey("We can run many many clients", { - int fails = 0; - int i; - for (i = 0; i < nthrs; i++) { - if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) { - printf("thread create failed: %s", nng_strerror(rv)); - break; - } - } - So(i == nthrs); - - for (i = 0; i < nthrs; i++) { - nng_thread_destroy(clients[i]); - fails += (results[i] == 0 ? 0 : 1); - if (results[i] != 0) { - printf("%d (%d): %s\n", - fails, i, - nng_strerror(results[i])); - } + Reset({ + nng_usleep(1000); + if (rep != 0) { + nng_close(rep); + rep = 0; } - So(fails == 0); - - nng_shutdown(rep); - - nng_thread_destroy(server); - }) - }) - - }) -}) + }); + + Convey("We can open many many clients", { + So(openclients(clients, nclients) == 0); + Reset({ closeclients(clients, nclients); }); + + Convey("And we send them messages", { + So(sendreqs(clients, nclients) == 0); + Convey("And they receive", { + So(recvreps(clients, + nclients) == 0); + }); + }); + }); + }); + }); +}); -- cgit v1.2.3-70-g09d2