aboutsummaryrefslogtreecommitdiff
path: root/src/tools/perf/perf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/perf/perf.c')
-rw-r--r--src/tools/perf/perf.c662
1 files changed, 662 insertions, 0 deletions
diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c
new file mode 100644
index 00000000..accac621
--- /dev/null
+++ b/src/tools/perf/perf.c
@@ -0,0 +1,662 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+
+static void die(const char *, ...);
+static int
+no_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Protocol not supported in this build!");
+ return (NNG_ENOTSUP);
+}
+
+typedef int (*open_func)(nng_socket *);
+
+static open_func open_server = no_open;
+static open_func open_client = no_open;
+
+#if defined(NNG_HAVE_PAIR1)
+#include <nng/protocol/pair1/pair.h>
+#else
+#define nng_pair1_open no_open
+#endif
+
+#if defined(NNG_HAVE_PAIR0)
+#include <nng/protocol/pair0/pair.h>
+#else
+#define nng_pair0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REQ0)
+#include <nng/protocol/reqrep0/req.h>
+#else
+#define nng_req0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REP0)
+#include <nng/protocol/reqrep0/rep.h>
+#else
+#define nng_rep0_open no_open
+#endif
+
+#if defined(NNG_HAVE_BUS0)
+#include <nng/protocol/bus0/bus.h>
+#else
+#define nng_bus0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PULL0)
+#include <nng/protocol/pipeline0/pull.h>
+#else
+#define nng_pull0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUSH0)
+#include <nng/protocol/pipeline0/push.h>
+#else
+#define nng_push0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#else
+#define nng_pub0_open no_open
+#endif
+
+#if defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/sub.h>
+#else
+#define nng_sub0_open no_open
+#endif
+
+enum options {
+ OPT_PAIR0 = 1,
+ OPT_PAIR1,
+ OPT_REQREP0,
+ OPT_PUBSUB0,
+ OPT_PIPELINE0,
+ OPT_SURVEY0,
+ OPT_BUS0,
+ OPT_URL,
+};
+
+// These are not universally supported by the variants yet.
+static nng_optspec opts[] = {
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
+ { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
+ { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void latency_client(const char *, size_t, int);
+static void latency_server(const char *, size_t, int);
+static void throughput_client(const char *, size_t, int);
+static void throughput_server(const char *, size_t, int);
+static void do_remote_lat(int argc, char **argv);
+static void do_local_lat(int argc, char **argv);
+static void do_remote_thr(int argc, char **argv);
+static void do_local_thr(int argc, char **argv);
+static void do_inproc_thr(int argc, char **argv);
+static void do_inproc_lat(int argc, char **argv);
+static void die(const char *, ...);
+
+// perf implements the same performance tests found in the standard
+// nanomsg & mangos performance tests. As with mangos, the decision
+// about which test to run is determined by the program name (ARGV[0}])
+// that it is run under.
+//
+// Options are:
+//
+// - remote_lat - remote latency side (client, aka latency_client)
+// - local_lat - local latency side (server, aka latency_server)
+// - local_thr - local throughput side
+// - remote_thr - remote throughput side
+// - inproc_lat - inproc latency
+// - inproc_thr - inproc throughput
+//
+
+bool
+matches(const char *arg, const char *name)
+{
+ const char *ptr = arg;
+ const char *x;
+
+ while (((x = strchr(ptr, '/')) != NULL) ||
+ ((x = strchr(ptr, '\\')) != NULL) ||
+ ((x = strchr(ptr, ':')) != NULL)) {
+ ptr = x + 1;
+ }
+ for (;;) {
+ if (*name == '\0') {
+ break;
+ }
+ if (tolower(*ptr) != *name) {
+ return (false);
+ }
+ ptr++;
+ name++;
+ }
+
+ switch (*ptr) {
+ case '\0':
+ /* FALLTHROUGH*/
+ case '.': // extension; ignore it.
+ return (true);
+ default: // some other trailing bit.
+ return (false);
+ }
+}
+
+const int PAIR0 = 0;
+const int PAIR1 = 1;
+const int REQREP = 2;
+
+int
+main(int argc, char **argv)
+{
+ char *prog;
+
+#if defined(NNG_HAVE_PAIR1)
+ open_server = nng_pair1_open;
+ open_client = nng_pair1_open;
+#elif defined(NNG_HAVE_PAIR0)
+ open_server = nng_pair0_open;
+ open_client = nng_pair0_open;
+#endif
+
+ // Allow -m <remote_lat> or whatever to override argv[0].
+ if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
+ prog = argv[2];
+ argv += 3;
+ argc -= 3;
+ } else {
+ prog = argv[0];
+ argc--;
+ argv++;
+ }
+ if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
+ do_remote_lat(argc, argv);
+ } else if (matches(prog, "local_lat") ||
+ matches(prog, "latency_server")) {
+ do_local_lat(argc, argv);
+ } else if (matches(prog, "local_thr") ||
+ matches(prog, "throughput_server")) {
+ do_local_thr(argc, argv);
+ } else if (matches(prog, "remote_thr") ||
+ matches(prog, "throughput_client")) {
+ do_remote_thr(argc, argv);
+ } else if (matches(prog, "inproc_thr")) {
+ do_inproc_thr(argc, argv);
+ } else if (matches(prog, "inproc_lat")) {
+ do_inproc_lat(argc, argv);
+ } else {
+ die("Unknown program mode? Use -m <mode>.");
+ }
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a positive number less than around a billion.
+ if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+void
+do_local_lat(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_lat(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_client(argv[0], msgsize, trips);
+}
+
+void
+do_local_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: local_thr <listen-addr> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_thr <connect-to> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_client(argv[0], msgsize, trips);
+}
+
+struct inproc_args {
+ int count;
+ int msgsize;
+ const char *addr;
+ void (*func)(const char *, size_t, int);
+};
+
+static void
+do_inproc(void *args)
+{
+ struct inproc_args *ia = args;
+
+ ia->func(ia->addr, ia->msgsize, ia->count);
+}
+
+void
+do_inproc_lat(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int val;
+ int optidx;
+ char * arg;
+ char * addr;
+
+ addr = "inproc://latency_test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_BUS0:
+ open_client = nng_bus0_open;
+ open_server = nng_bus0_open;
+ break;
+
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_lat <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = latency_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ latency_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+do_inproc_thr(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int optidx;
+ int val;
+ char * arg;
+ char * addr = "inproc://throughput-test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+#if 0
+ // For now these protocols simply do not work with
+ // throughput -- they don't work with backpressure properly.
+ // In the future we should support synchronizing in the same
+ // process, and alerting the sender both on completion of
+ // a single message, and on completion of all messages.
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+#endif
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_PIPELINE0:
+ open_client = nng_pull0_open;
+ open_server = nng_push0_open;
+ break;
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_thr <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = throughput_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ throughput_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+latency_client(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ nng_time start, end;
+ int rv;
+ int i;
+ float total;
+ float latency;
+ if ((rv = open_client(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100);
+
+ if (nng_msg_alloc(&msg, msgsize) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ start = nng_clock();
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ }
+ end = nng_clock();
+
+ nng_msg_free(msg);
+ nng_close(s);
+
+ total = (float) ((end - start)) / 1000;
+ latency = ((float) ((total * 1000000)) / (float) (trips * 2));
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("round trip count: %d\n", trips);
+ printf("average latency: %.3f [us]\n", latency);
+}
+
+void
+latency_server(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ if ((rv = open_server(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Wait a bit for things to drain... linger should do this.
+ // 100ms ought to be enough.
+ nng_msleep(100);
+ nng_close(s);
+}
+
+// Our throughput story is quite a mess. Mostly I think because of the poor
+// caching and message reuse. We should probably implement a message pooling
+// API somewhere.
+
+void
+throughput_server(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+ uint64_t start, end;
+ float msgpersec, mbps, total;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+ rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ // Receive first synchronization message.
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ nng_msg_free(msg);
+ start = nng_clock();
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ nng_msg_free(msg);
+ }
+ end = nng_clock();
+ // Send a synchronization message (empty) to the other side,
+ // and wait a bit to make sure it goes out the wire.
+ nng_send(s, "", 0, 0);
+ nng_msleep(200);
+ nng_close(s);
+ total = (float) ((end - start)) / 1000;
+ msgpersec = (float) (count) / total;
+ mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("message count: %d\n", count);
+ printf("throughput: %.f [msg/s]\n", msgpersec);
+ printf("throughput: %.3f [Mb/s]\n", mbps);
+}
+
+void
+throughput_client(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ // We send one extra zero length message to start the timer.
+ count++;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
+ }
+
+ rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Attempt to get the completion indication from the other
+ // side.
+ if (nng_recvmsg(s, &msg, 0) == 0) {
+ nng_msg_free(msg);
+ }
+
+ nng_close(s);
+}