aboutsummaryrefslogtreecommitdiff
path: root/tests/scalability.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-17 11:09:14 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-17 11:09:14 -0700
commita2c3aae5303b698354fee8a0ebb455791879e367 (patch)
tree765317b759c9adba0082a5d756a34d3ca3320b58 /tests/scalability.c
parenta7e3190449b4b60e70177582bd683973e097c6a1 (diff)
downloadnng-a2c3aae5303b698354fee8a0ebb455791879e367.tar.gz
nng-a2c3aae5303b698354fee8a0ebb455791879e367.tar.bz2
nng-a2c3aae5303b698354fee8a0ebb455791879e367.zip
Scalability test fixes.
This fixes a potential nasty bug associated with the objhash table resizing, and rewrites the scalability test to use just a single thread handling some 2000 client sockets. This proves that the framework can deal with vast numbers of sockets, regardless of the supported number of operating system threads.
Diffstat (limited to 'tests/scalability.c')
-rw-r--r--tests/scalability.c210
1 files changed, 98 insertions, 112 deletions
diff --git a/tests/scalability.c b/tests/scalability.c
index 0999dc78..6a11ec44 100644
--- a/tests/scalability.c
+++ b/tests/scalability.c
@@ -12,154 +12,140 @@
#include <string.h>
-static int count = 1;
-static int nthrs = 100;
-static char *addr = "inproc:///atscale";
+static int nclients = 2000;
+static char *addr = "inproc:///atscale";
-static void
-client(void *arg)
+void
+serve(void *arg)
{
- int *result = arg;
- nng_socket s;
- int rv;
- uint64_t timeo;
- nng_msg *msg;
- int i;
-
- *result = 0;
+ nng_socket rep = *(nng_socket *) arg;
+ nng_msg * msg;
- if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) {
- *result = rv;
- return;
- }
+ for (;;) {
+ if (nng_recvmsg(rep, &msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
- if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ if (nng_sendmsg(rep, msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
}
+}
- timeo = 4000000; // 4 seconds
- if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) ||
- ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) {
- *result = rv;
- nng_close(s);
- return;
+int
+openclients(nng_socket *clients, int num)
+{
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
+ if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) {
+ printf("open #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
+ }
+ rv = nng_dial(clients[i], addr, NULL, NNG_FLAG_SYNCH);
+ if (rv != 0) {
+ printf("dial #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
+ }
}
+ return (0);
+}
- // Sleep for up to a 10ms before issuing requests to avoid saturating
- // the CPU with bazillions of requests at the same time.
- nng_usleep(10000);
-
- for (i = 0; i < count; i++) {
- // Sleep for up to a 1ms before issuing requests to
- // avoid saturating the CPU with bazillions of requests at
- // the same time.
- nng_usleep(rand() % 1000);
+int
+sendreqs(nng_socket *clients, int num)
+{
+ nng_msg *msg;
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
- // Reusing the same message causes problems as a result of
- // header reuse.
if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ printf("alloc #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
}
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- *result = rv;
+ if ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) {
nng_msg_free(msg);
- nng_close(s);
- return;
- }
-
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ printf("sendmsg #%d: %s", i, nng_strerror(rv));
+ return (rv);
}
-
- nng_msg_free(msg);
}
-
- nng_close(s);
- *result = 0;
+ return (0);
}
-void
-serve(void *arg)
+int
+recvreps(nng_socket *clients, int num)
{
- nng_socket rep = *(nng_socket *)arg;
nng_msg *msg;
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
- for (;;) {
- if (nng_recvmsg(rep, &msg, 0) != 0) {
- nng_close(rep);
- return;
+ if ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0) {
+ printf("sendmsg #%d: %s", i, nng_strerror(rv));
+ return (rv);
}
+ nng_msg_free(msg);
+ }
+ return (0);
+}
- if (nng_sendmsg(rep, msg, 0) != 0) {
- nng_close(rep);
- return;
+void
+closeclients(nng_socket *clients, int num)
+{
+ int i;
+ for (i = 0; i < num; i++) {
+ if (clients[i] > 0) {
+ nng_close(clients[i]);
}
}
}
Main({
- int rv;
- void **clients;
- void *server;
- int *results;
+ int rv;
+ nng_socket *clients;
+ void * server;
+ int * results;
- clients = calloc(nthrs, sizeof (void *));
- results = calloc(nthrs, sizeof (int));
+ clients = calloc(nclients, sizeof(nng_socket));
+ results = calloc(nclients, sizeof(int));
Test("Scalability", {
-
Convey("Given a server socket", {
nng_socket rep;
- int depth = 256;
+ int depth = 256;
So(nng_open(&rep, NNG_PROTO_REP) == 0);
-
- Reset({
- nng_close(rep);
- })
-
- So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0);
- So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0);
+ So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth,
+ sizeof(depth)) == 0);
+ So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth,
+ sizeof(depth)) == 0);
So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_thread_create(&server, serve, &rep) == 0);
- nng_usleep(100000);
-
- Convey("We can run many many clients", {
- int fails = 0;
- int i;
- for (i = 0; i < nthrs; i++) {
- if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) {
- printf("thread create failed: %s", nng_strerror(rv));
- break;
- }
- }
- So(i == nthrs);
-
- for (i = 0; i < nthrs; i++) {
- nng_thread_destroy(clients[i]);
- fails += (results[i] == 0 ? 0 : 1);
- if (results[i] != 0) {
- printf("%d (%d): %s\n",
- fails, i,
- nng_strerror(results[i]));
- }
+ Reset({
+ nng_usleep(1000);
+ if (rep != 0) {
+ nng_close(rep);
+ rep = 0;
}
- So(fails == 0);
-
- nng_shutdown(rep);
-
- nng_thread_destroy(server);
- })
- })
-
- })
-})
+ });
+
+ Convey("We can open many many clients", {
+ So(openclients(clients, nclients) == 0);
+ Reset({ closeclients(clients, nclients); });
+
+ Convey("And we send them messages", {
+ So(sendreqs(clients, nclients) == 0);
+ Convey("And they receive", {
+ So(recvreps(clients,
+ nclients) == 0);
+ });
+ });
+ });
+ });
+ });
+});