diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-17 11:09:14 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-17 11:09:14 -0700 |
| commit | a2c3aae5303b698354fee8a0ebb455791879e367 (patch) | |
| tree | 765317b759c9adba0082a5d756a34d3ca3320b58 /tests/scalability.c | |
| parent | a7e3190449b4b60e70177582bd683973e097c6a1 (diff) | |
| download | nng-a2c3aae5303b698354fee8a0ebb455791879e367.tar.gz nng-a2c3aae5303b698354fee8a0ebb455791879e367.tar.bz2 nng-a2c3aae5303b698354fee8a0ebb455791879e367.zip | |
Scalability test fixes.
This fixes a potential nasty bug associated with the objhash table
resizing, and rewrites the scalability test to use just a single thread
handling some 2000 client sockets. This proves that the framework can
deal with vast numbers of sockets, regardless of the supported number
of operating system threads.
Diffstat (limited to 'tests/scalability.c')
| -rw-r--r-- | tests/scalability.c | 210 |
1 files changed, 98 insertions, 112 deletions
diff --git a/tests/scalability.c b/tests/scalability.c index 0999dc78..6a11ec44 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -12,154 +12,140 @@ #include <string.h> -static int count = 1; -static int nthrs = 100; -static char *addr = "inproc:///atscale"; +static int nclients = 2000; +static char *addr = "inproc:///atscale"; -static void -client(void *arg) +void +serve(void *arg) { - int *result = arg; - nng_socket s; - int rv; - uint64_t timeo; - nng_msg *msg; - int i; - - *result = 0; + nng_socket rep = *(nng_socket *) arg; + nng_msg * msg; - if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) { - *result = rv; - return; - } + for (;;) { + if (nng_recvmsg(rep, &msg, 0) != 0) { + nng_close(rep); + return; + } - if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) { - *result = rv; - nng_close(s); - return; + if (nng_sendmsg(rep, msg, 0) != 0) { + nng_close(rep); + return; + } } +} - timeo = 4000000; // 4 seconds - if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) || - ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) { - *result = rv; - nng_close(s); - return; +int +openclients(nng_socket *clients, int num) +{ + int rv; + int i; + for (i = 0; i < num; i++) { + if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) { + printf("open #%d: %s\n", i, nng_strerror(rv)); + return (rv); + } + rv = nng_dial(clients[i], addr, NULL, NNG_FLAG_SYNCH); + if (rv != 0) { + printf("dial #%d: %s\n", i, nng_strerror(rv)); + return (rv); + } } + return (0); +} - // Sleep for up to a 10ms before issuing requests to avoid saturating - // the CPU with bazillions of requests at the same time. - nng_usleep(10000); - - for (i = 0; i < count; i++) { - // Sleep for up to a 1ms before issuing requests to - // avoid saturating the CPU with bazillions of requests at - // the same time. - nng_usleep(rand() % 1000); +int +sendreqs(nng_socket *clients, int num) +{ + nng_msg *msg; + int rv; + int i; + for (i = 0; i < num; i++) { - // Reusing the same message causes problems as a result of - // header reuse. if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - *result = rv; - nng_close(s); - return; + printf("alloc #%d: %s\n", i, nng_strerror(rv)); + return (rv); } - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - *result = rv; + if ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) { nng_msg_free(msg); - nng_close(s); - return; - } - - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - *result = rv; - nng_close(s); - return; + printf("sendmsg #%d: %s", i, nng_strerror(rv)); + return (rv); } - - nng_msg_free(msg); } - - nng_close(s); - *result = 0; + return (0); } -void -serve(void *arg) +int +recvreps(nng_socket *clients, int num) { - nng_socket rep = *(nng_socket *)arg; nng_msg *msg; + int rv; + int i; + for (i = 0; i < num; i++) { - for (;;) { - if (nng_recvmsg(rep, &msg, 0) != 0) { - nng_close(rep); - return; + if ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0) { + printf("sendmsg #%d: %s", i, nng_strerror(rv)); + return (rv); } + nng_msg_free(msg); + } + return (0); +} - if (nng_sendmsg(rep, msg, 0) != 0) { - nng_close(rep); - return; +void +closeclients(nng_socket *clients, int num) +{ + int i; + for (i = 0; i < num; i++) { + if (clients[i] > 0) { + nng_close(clients[i]); } } } Main({ - int rv; - void **clients; - void *server; - int *results; + int rv; + nng_socket *clients; + void * server; + int * results; - clients = calloc(nthrs, sizeof (void *)); - results = calloc(nthrs, sizeof (int)); + clients = calloc(nclients, sizeof(nng_socket)); + results = calloc(nclients, sizeof(int)); Test("Scalability", { - Convey("Given a server socket", { nng_socket rep; - int depth = 256; + int depth = 256; So(nng_open(&rep, NNG_PROTO_REP) == 0); - - Reset({ - nng_close(rep); - }) - - So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0); - So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0); + So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, + sizeof(depth)) == 0); + So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, + sizeof(depth)) == 0); So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); So(nng_thread_create(&server, serve, &rep) == 0); - nng_usleep(100000); - - Convey("We can run many many clients", { - int fails = 0; - int i; - for (i = 0; i < nthrs; i++) { - if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) { - printf("thread create failed: %s", nng_strerror(rv)); - break; - } - } - So(i == nthrs); - - for (i = 0; i < nthrs; i++) { - nng_thread_destroy(clients[i]); - fails += (results[i] == 0 ? 0 : 1); - if (results[i] != 0) { - printf("%d (%d): %s\n", - fails, i, - nng_strerror(results[i])); - } + Reset({ + nng_usleep(1000); + if (rep != 0) { + nng_close(rep); + rep = 0; } - So(fails == 0); - - nng_shutdown(rep); - - nng_thread_destroy(server); - }) - }) - - }) -}) + }); + + Convey("We can open many many clients", { + So(openclients(clients, nclients) == 0); + Reset({ closeclients(clients, nclients); }); + + Convey("And we send them messages", { + So(sendreqs(clients, nclients) == 0); + Convey("And they receive", { + So(recvreps(clients, + nclients) == 0); + }); + }); + }); + }); + }); +}); |
