aboutsummaryrefslogtreecommitdiff
path: root/tests/scalability.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/scalability.c')
-rw-r--r--tests/scalability.c210
1 files changed, 98 insertions, 112 deletions
diff --git a/tests/scalability.c b/tests/scalability.c
index 0999dc78..6a11ec44 100644
--- a/tests/scalability.c
+++ b/tests/scalability.c
@@ -12,154 +12,140 @@
#include <string.h>
-static int count = 1;
-static int nthrs = 100;
-static char *addr = "inproc:///atscale";
+static int nclients = 2000;
+static char *addr = "inproc:///atscale";
-static void
-client(void *arg)
+void
+serve(void *arg)
{
- int *result = arg;
- nng_socket s;
- int rv;
- uint64_t timeo;
- nng_msg *msg;
- int i;
-
- *result = 0;
+ nng_socket rep = *(nng_socket *) arg;
+ nng_msg * msg;
- if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) {
- *result = rv;
- return;
- }
+ for (;;) {
+ if (nng_recvmsg(rep, &msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
- if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ if (nng_sendmsg(rep, msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
}
+}
- timeo = 4000000; // 4 seconds
- if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) ||
- ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) {
- *result = rv;
- nng_close(s);
- return;
+int
+openclients(nng_socket *clients, int num)
+{
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
+ if ((rv = nng_open(&clients[i], NNG_PROTO_REQ)) != 0) {
+ printf("open #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
+ }
+ rv = nng_dial(clients[i], addr, NULL, NNG_FLAG_SYNCH);
+ if (rv != 0) {
+ printf("dial #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
+ }
}
+ return (0);
+}
- // Sleep for up to a 10ms before issuing requests to avoid saturating
- // the CPU with bazillions of requests at the same time.
- nng_usleep(10000);
-
- for (i = 0; i < count; i++) {
- // Sleep for up to a 1ms before issuing requests to
- // avoid saturating the CPU with bazillions of requests at
- // the same time.
- nng_usleep(rand() % 1000);
+int
+sendreqs(nng_socket *clients, int num)
+{
+ nng_msg *msg;
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
- // Reusing the same message causes problems as a result of
- // header reuse.
if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ printf("alloc #%d: %s\n", i, nng_strerror(rv));
+ return (rv);
}
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- *result = rv;
+ if ((rv = nng_sendmsg(clients[i], msg, 0)) != 0) {
nng_msg_free(msg);
- nng_close(s);
- return;
- }
-
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- *result = rv;
- nng_close(s);
- return;
+ printf("sendmsg #%d: %s", i, nng_strerror(rv));
+ return (rv);
}
-
- nng_msg_free(msg);
}
-
- nng_close(s);
- *result = 0;
+ return (0);
}
-void
-serve(void *arg)
+int
+recvreps(nng_socket *clients, int num)
{
- nng_socket rep = *(nng_socket *)arg;
nng_msg *msg;
+ int rv;
+ int i;
+ for (i = 0; i < num; i++) {
- for (;;) {
- if (nng_recvmsg(rep, &msg, 0) != 0) {
- nng_close(rep);
- return;
+ if ((rv = nng_recvmsg(clients[i], &msg, 0)) != 0) {
+ printf("sendmsg #%d: %s", i, nng_strerror(rv));
+ return (rv);
}
+ nng_msg_free(msg);
+ }
+ return (0);
+}
- if (nng_sendmsg(rep, msg, 0) != 0) {
- nng_close(rep);
- return;
+void
+closeclients(nng_socket *clients, int num)
+{
+ int i;
+ for (i = 0; i < num; i++) {
+ if (clients[i] > 0) {
+ nng_close(clients[i]);
}
}
}
Main({
- int rv;
- void **clients;
- void *server;
- int *results;
+ int rv;
+ nng_socket *clients;
+ void * server;
+ int * results;
- clients = calloc(nthrs, sizeof (void *));
- results = calloc(nthrs, sizeof (int));
+ clients = calloc(nclients, sizeof(nng_socket));
+ results = calloc(nclients, sizeof(int));
Test("Scalability", {
-
Convey("Given a server socket", {
nng_socket rep;
- int depth = 256;
+ int depth = 256;
So(nng_open(&rep, NNG_PROTO_REP) == 0);
-
- Reset({
- nng_close(rep);
- })
-
- So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0);
- So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0);
+ So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth,
+ sizeof(depth)) == 0);
+ So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth,
+ sizeof(depth)) == 0);
So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_thread_create(&server, serve, &rep) == 0);
- nng_usleep(100000);
-
- Convey("We can run many many clients", {
- int fails = 0;
- int i;
- for (i = 0; i < nthrs; i++) {
- if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) {
- printf("thread create failed: %s", nng_strerror(rv));
- break;
- }
- }
- So(i == nthrs);
-
- for (i = 0; i < nthrs; i++) {
- nng_thread_destroy(clients[i]);
- fails += (results[i] == 0 ? 0 : 1);
- if (results[i] != 0) {
- printf("%d (%d): %s\n",
- fails, i,
- nng_strerror(results[i]));
- }
+ Reset({
+ nng_usleep(1000);
+ if (rep != 0) {
+ nng_close(rep);
+ rep = 0;
}
- So(fails == 0);
-
- nng_shutdown(rep);
-
- nng_thread_destroy(server);
- })
- })
-
- })
-})
+ });
+
+ Convey("We can open many many clients", {
+ So(openclients(clients, nclients) == 0);
+ Reset({ closeclients(clients, nclients); });
+
+ Convey("And we send them messages", {
+ So(sendreqs(clients, nclients) == 0);
+ Convey("And they receive", {
+ So(recvreps(clients,
+ nclients) == 0);
+ });
+ });
+ });
+ });
+ });
+});