aboutsummaryrefslogtreecommitdiff
path: root/src/tools/perf
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/perf')
-rw-r--r--src/tools/perf/CMakeLists.txt35
-rw-r--r--src/tools/perf/perf.c662
-rw-r--r--src/tools/perf/pubdrop.c325
3 files changed, 1022 insertions, 0 deletions
diff --git a/src/tools/perf/CMakeLists.txt b/src/tools/perf/CMakeLists.txt
new file mode 100644
index 00000000..135544bb
--- /dev/null
+++ b/src/tools/perf/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Build performance tests.
+
+nng_directory(perf)
+
+if (NNG_TESTS)
+ macro (add_nng_perf NAME)
+ add_executable (${NAME} perf.c)
+ target_link_libraries (${NAME} nng nng_private)
+ endmacro (add_nng_perf)
+
+ add_nng_perf(remote_lat)
+ add_nng_perf(local_lat)
+ add_nng_perf(local_thr)
+ add_nng_perf(remote_thr)
+ add_nng_perf(inproc_thr)
+ add_nng_perf(inproc_lat)
+
+ add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000)
+ set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30)
+
+ add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000)
+ set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30)
+
+ add_executable (pubdrop pubdrop.c)
+ target_link_libraries(pubdrop nng nng_private)
+endif ()
diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c
new file mode 100644
index 00000000..accac621
--- /dev/null
+++ b/src/tools/perf/perf.c
@@ -0,0 +1,662 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+
+static void die(const char *, ...);
+static int
+no_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Protocol not supported in this build!");
+ return (NNG_ENOTSUP);
+}
+
+typedef int (*open_func)(nng_socket *);
+
+static open_func open_server = no_open;
+static open_func open_client = no_open;
+
+#if defined(NNG_HAVE_PAIR1)
+#include <nng/protocol/pair1/pair.h>
+#else
+#define nng_pair1_open no_open
+#endif
+
+#if defined(NNG_HAVE_PAIR0)
+#include <nng/protocol/pair0/pair.h>
+#else
+#define nng_pair0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REQ0)
+#include <nng/protocol/reqrep0/req.h>
+#else
+#define nng_req0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REP0)
+#include <nng/protocol/reqrep0/rep.h>
+#else
+#define nng_rep0_open no_open
+#endif
+
+#if defined(NNG_HAVE_BUS0)
+#include <nng/protocol/bus0/bus.h>
+#else
+#define nng_bus0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PULL0)
+#include <nng/protocol/pipeline0/pull.h>
+#else
+#define nng_pull0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUSH0)
+#include <nng/protocol/pipeline0/push.h>
+#else
+#define nng_push0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#else
+#define nng_pub0_open no_open
+#endif
+
+#if defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/sub.h>
+#else
+#define nng_sub0_open no_open
+#endif
+
+enum options {
+ OPT_PAIR0 = 1,
+ OPT_PAIR1,
+ OPT_REQREP0,
+ OPT_PUBSUB0,
+ OPT_PIPELINE0,
+ OPT_SURVEY0,
+ OPT_BUS0,
+ OPT_URL,
+};
+
+// These are not universally supported by the variants yet.
+static nng_optspec opts[] = {
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
+ { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
+ { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void latency_client(const char *, size_t, int);
+static void latency_server(const char *, size_t, int);
+static void throughput_client(const char *, size_t, int);
+static void throughput_server(const char *, size_t, int);
+static void do_remote_lat(int argc, char **argv);
+static void do_local_lat(int argc, char **argv);
+static void do_remote_thr(int argc, char **argv);
+static void do_local_thr(int argc, char **argv);
+static void do_inproc_thr(int argc, char **argv);
+static void do_inproc_lat(int argc, char **argv);
+static void die(const char *, ...);
+
+// perf implements the same performance tests found in the standard
+// nanomsg & mangos performance tests. As with mangos, the decision
+// about which test to run is determined by the program name (ARGV[0}])
+// that it is run under.
+//
+// Options are:
+//
+// - remote_lat - remote latency side (client, aka latency_client)
+// - local_lat - local latency side (server, aka latency_server)
+// - local_thr - local throughput side
+// - remote_thr - remote throughput side
+// - inproc_lat - inproc latency
+// - inproc_thr - inproc throughput
+//
+
+bool
+matches(const char *arg, const char *name)
+{
+ const char *ptr = arg;
+ const char *x;
+
+ while (((x = strchr(ptr, '/')) != NULL) ||
+ ((x = strchr(ptr, '\\')) != NULL) ||
+ ((x = strchr(ptr, ':')) != NULL)) {
+ ptr = x + 1;
+ }
+ for (;;) {
+ if (*name == '\0') {
+ break;
+ }
+ if (tolower(*ptr) != *name) {
+ return (false);
+ }
+ ptr++;
+ name++;
+ }
+
+ switch (*ptr) {
+ case '\0':
+ /* FALLTHROUGH*/
+ case '.': // extension; ignore it.
+ return (true);
+ default: // some other trailing bit.
+ return (false);
+ }
+}
+
+const int PAIR0 = 0;
+const int PAIR1 = 1;
+const int REQREP = 2;
+
+int
+main(int argc, char **argv)
+{
+ char *prog;
+
+#if defined(NNG_HAVE_PAIR1)
+ open_server = nng_pair1_open;
+ open_client = nng_pair1_open;
+#elif defined(NNG_HAVE_PAIR0)
+ open_server = nng_pair0_open;
+ open_client = nng_pair0_open;
+#endif
+
+ // Allow -m <remote_lat> or whatever to override argv[0].
+ if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
+ prog = argv[2];
+ argv += 3;
+ argc -= 3;
+ } else {
+ prog = argv[0];
+ argc--;
+ argv++;
+ }
+ if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
+ do_remote_lat(argc, argv);
+ } else if (matches(prog, "local_lat") ||
+ matches(prog, "latency_server")) {
+ do_local_lat(argc, argv);
+ } else if (matches(prog, "local_thr") ||
+ matches(prog, "throughput_server")) {
+ do_local_thr(argc, argv);
+ } else if (matches(prog, "remote_thr") ||
+ matches(prog, "throughput_client")) {
+ do_remote_thr(argc, argv);
+ } else if (matches(prog, "inproc_thr")) {
+ do_inproc_thr(argc, argv);
+ } else if (matches(prog, "inproc_lat")) {
+ do_inproc_lat(argc, argv);
+ } else {
+ die("Unknown program mode? Use -m <mode>.");
+ }
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a positive number less than around a billion.
+ if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+void
+do_local_lat(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_lat(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_client(argv[0], msgsize, trips);
+}
+
+void
+do_local_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: local_thr <listen-addr> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_thr <connect-to> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_client(argv[0], msgsize, trips);
+}
+
+struct inproc_args {
+ int count;
+ int msgsize;
+ const char *addr;
+ void (*func)(const char *, size_t, int);
+};
+
+static void
+do_inproc(void *args)
+{
+ struct inproc_args *ia = args;
+
+ ia->func(ia->addr, ia->msgsize, ia->count);
+}
+
+void
+do_inproc_lat(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int val;
+ int optidx;
+ char * arg;
+ char * addr;
+
+ addr = "inproc://latency_test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_BUS0:
+ open_client = nng_bus0_open;
+ open_server = nng_bus0_open;
+ break;
+
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_lat <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = latency_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ latency_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+do_inproc_thr(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int optidx;
+ int val;
+ char * arg;
+ char * addr = "inproc://throughput-test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+#if 0
+ // For now these protocols simply do not work with
+ // throughput -- they don't work with backpressure properly.
+ // In the future we should support synchronizing in the same
+ // process, and alerting the sender both on completion of
+ // a single message, and on completion of all messages.
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+#endif
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_PIPELINE0:
+ open_client = nng_pull0_open;
+ open_server = nng_push0_open;
+ break;
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_thr <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = throughput_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ throughput_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+latency_client(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ nng_time start, end;
+ int rv;
+ int i;
+ float total;
+ float latency;
+ if ((rv = open_client(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100);
+
+ if (nng_msg_alloc(&msg, msgsize) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ start = nng_clock();
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ }
+ end = nng_clock();
+
+ nng_msg_free(msg);
+ nng_close(s);
+
+ total = (float) ((end - start)) / 1000;
+ latency = ((float) ((total * 1000000)) / (float) (trips * 2));
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("round trip count: %d\n", trips);
+ printf("average latency: %.3f [us]\n", latency);
+}
+
+void
+latency_server(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ if ((rv = open_server(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Wait a bit for things to drain... linger should do this.
+ // 100ms ought to be enough.
+ nng_msleep(100);
+ nng_close(s);
+}
+
+// Our throughput story is quite a mess. Mostly I think because of the poor
+// caching and message reuse. We should probably implement a message pooling
+// API somewhere.
+
+void
+throughput_server(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+ uint64_t start, end;
+ float msgpersec, mbps, total;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+ rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ // Receive first synchronization message.
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ nng_msg_free(msg);
+ start = nng_clock();
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ nng_msg_free(msg);
+ }
+ end = nng_clock();
+ // Send a synchronization message (empty) to the other side,
+ // and wait a bit to make sure it goes out the wire.
+ nng_send(s, "", 0, 0);
+ nng_msleep(200);
+ nng_close(s);
+ total = (float) ((end - start)) / 1000;
+ msgpersec = (float) (count) / total;
+ mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("message count: %d\n", count);
+ printf("throughput: %.f [msg/s]\n", msgpersec);
+ printf("throughput: %.3f [Mb/s]\n", mbps);
+}
+
+void
+throughput_client(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ // We send one extra zero length message to start the timer.
+ count++;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
+ }
+
+ rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Attempt to get the completion indication from the other
+ // side.
+ if (nng_recvmsg(s, &msg, 0) == 0) {
+ nng_msg_free(msg);
+ }
+
+ nng_close(s);
+}
diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c
new file mode 100644
index 00000000..be51bb2f
--- /dev/null
+++ b/src/tools/perf/pubdrop.c
@@ -0,0 +1,325 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/platform.h>
+
+// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
+// performance, including dropped messages, delivery across multiple threads,
+// etc. It actually uses a wild card subscription for now.
+
+#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+
+#else
+
+static void die(const char *, ...);
+
+static int
+nng_pub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Pub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+static int
+nng_sub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Sub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+#endif // NNG_HAVE_PUB0....
+
+static void die(const char *, ...);
+static void do_pubdrop(int argc, char **argv);
+static uint64_t nperusec;
+static volatile int x;
+
+void
+work(void)
+{
+ x = rand();
+}
+
+void
+usdelay(unsigned long long nusec)
+{
+ nusec *= nperusec;
+ while (nusec > 0) {
+ work();
+ nusec--;
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ argc--;
+ argv++;
+
+ // We calculate a delay factor to roughly delay 1 usec. We don't
+ // need this to be perfect, just reproducible on the same host.
+ unsigned long cnt = 1000000;
+
+ nng_time beg = nng_clock();
+ for (unsigned long i = 0; i < cnt; i++) {
+ work();
+ }
+ nng_time end = nng_clock();
+ nperusec = cnt / (1000 * (end - beg));
+
+ do_pubdrop(argc, argv);
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a postive number less than around a billion.
+ if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+struct pubdrop_args {
+ const char * addr;
+ bool start;
+ unsigned long long msgsize;
+ unsigned long long count;
+ unsigned long long intvl;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long recvs;
+ nng_time beg;
+ nng_time end;
+ nng_mtx * mtx;
+ nng_cv * cv;
+};
+
+static void
+pub_server(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+
+ if ((rv = nng_pub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+
+ nng_mtx_lock(pa->mtx);
+ while (!pa->start) {
+ nng_cv_wait(pa->cv);
+ }
+ nng_mtx_unlock(pa->mtx);
+
+ start = nng_clock();
+ for (uint64_t i = 0; i < pa->count; i++) {
+ // Unfortunately we need to allocate messages dynamically as we
+ // go. The other option would be to allocate them all up front,
+ // but that could be a rather excessive amount of memory.
+ if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
+ die("Message alloc failed");
+ }
+ memcpy(nng_msg_body(msg), &i, sizeof(i));
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ die("Sendmsg: %s", nng_strerror(rv));
+ }
+ // It sure would be nice if we had a usec granularity option
+ // here.
+ if (pa->intvl > 0) {
+ usdelay((unsigned long long) pa->intvl);
+ }
+ }
+
+ end = nng_clock();
+
+ nng_msleep(1000); // drain the queue
+ nng_close(sock);
+
+ nng_mtx_lock(pa->mtx);
+ pa->beg = start;
+ pa->end = end;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+sub_client(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ unsigned long long recvs;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long expect;
+
+ if ((rv = nng_sub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+
+ expect = 0;
+ recvs = drops = gaps = errs = 0;
+
+ while (expect < pa->count) {
+ uint64_t got;
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
+ // Closed without receiving the last message
+ drops += (pa->count - expect);
+ gaps++;
+ break;
+ }
+ printf("ERROR: %s\n", nng_strerror(rv));
+ errs++;
+ break;
+ }
+ recvs++;
+ memcpy(&got, nng_msg_body(msg), sizeof(got));
+ nng_msg_free(msg);
+ if (got != expect) {
+ gaps++;
+ if (got > expect) {
+ drops += (got - expect);
+ } else {
+ die("Misordered delivery");
+ }
+ }
+ expect = got + 1;
+ }
+
+ nng_mtx_lock(pa->mtx);
+ pa->drops += drops;
+ pa->errs += errs;
+ pa->recvs += recvs;
+ pa->gaps += gaps;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+do_pubdrop(int argc, char **argv)
+{
+ nng_thread ** thrs;
+ struct pubdrop_args pa;
+ int rv;
+ int nsubs;
+
+ if (argc != 5) {
+ die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
+ "<interval>");
+ }
+
+ memset(&pa, 0, sizeof(pa));
+ pa.addr = argv[0];
+ pa.msgsize = parse_int(argv[1], "message size");
+ pa.count = parse_int(argv[2], "count");
+ pa.intvl = parse_int(argv[4], "interval");
+ nsubs = parse_int(argv[3], "#subscribers");
+
+ if (pa.msgsize < sizeof(uint64_t)) {
+ die("Message size too small.");
+ }
+
+ thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
+ if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
+ ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
+ die("Startup: %s\n", nng_strerror(rv));
+ };
+
+ if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
+ die("Cannot create pub thread: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100); // give time for listener to start...
+
+ for (int i = 0; i < nsubs; i++) {
+ if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
+ 0) {
+ die("Cannot create sub thread: %s", nng_strerror(rv));
+ }
+ }
+
+ // Sleep a bit for conns to establish.
+ nng_msleep(2000);
+ nng_mtx_lock(pa.mtx);
+ pa.start = true;
+ nng_cv_wake(pa.cv);
+ nng_mtx_unlock(pa.mtx);
+
+ for (int i = 0; i < nsubs + 1; i++) {
+ nng_thread_destroy(thrs[i]);
+ }
+
+ nng_mtx_lock(pa.mtx);
+
+ unsigned long long expect = nsubs * pa.count;
+ unsigned long long missing = nsubs ? expect - pa.recvs : 0;
+ double dur = (pa.end - pa.beg) / 1000.0;
+
+ printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
+ pa.count, dur, pa.count / dur);
+ printf("Expected %llu messages total\n", expect);
+ printf("Received %llu messages total\n", pa.recvs);
+ printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
+ printf("Errors %llu total\n", pa.errs);
+ printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
+ pa.gaps);
+ printf("Dropped %llu messages (missing)\n", missing);
+ printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
+
+ nng_mtx_unlock(pa.mtx);
+}