aboutsummaryrefslogtreecommitdiff
path: root/perf
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-01 11:30:03 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-01 12:46:17 -0800
commited542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch)
tree673924ff077d468e6756529c2c204698d3faa47c /perf
parent1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff)
downloadnng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'perf')
-rw-r--r--perf/CMakeLists.txt33
-rw-r--r--perf/perf.c662
-rw-r--r--perf/pubdrop.c325
3 files changed, 0 insertions, 1020 deletions
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt
deleted file mode 100644
index 778e583b..00000000
--- a/perf/CMakeLists.txt
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-#
-# This software is supplied under the terms of the MIT License, a
-# copy of which should be located in the distribution where this
-# file was obtained (LICENSE.txt). A copy of the license may also be
-# found online at https://opensource.org/licenses/MIT.
-#
-
-# Build performance tests.
-
-if (NNG_TESTS)
- macro (add_nng_perf NAME)
- add_executable (${NAME} perf.c)
- target_link_libraries (${NAME} nng nng_private)
- endmacro (add_nng_perf)
-
- add_nng_perf(remote_lat)
- add_nng_perf(local_lat)
- add_nng_perf(local_thr)
- add_nng_perf(remote_thr)
- add_nng_perf(inproc_thr)
- add_nng_perf(inproc_lat)
-
- add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000)
- set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30)
-
- add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000)
- set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30)
-
- add_executable (pubdrop pubdrop.c)
- target_link_libraries(pubdrop nng nng_private)
-endif ()
diff --git a/perf/perf.c b/perf/perf.c
deleted file mode 100644
index accac621..00000000
--- a/perf/perf.c
+++ /dev/null
@@ -1,662 +0,0 @@
-//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <ctype.h>
-#include <stdarg.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/util/options.h>
-#include <nng/supplemental/util/platform.h>
-
-static void die(const char *, ...);
-static int
-no_open(nng_socket *arg)
-{
- (void) arg;
- die("Protocol not supported in this build!");
- return (NNG_ENOTSUP);
-}
-
-typedef int (*open_func)(nng_socket *);
-
-static open_func open_server = no_open;
-static open_func open_client = no_open;
-
-#if defined(NNG_HAVE_PAIR1)
-#include <nng/protocol/pair1/pair.h>
-#else
-#define nng_pair1_open no_open
-#endif
-
-#if defined(NNG_HAVE_PAIR0)
-#include <nng/protocol/pair0/pair.h>
-#else
-#define nng_pair0_open no_open
-#endif
-
-#if defined(NNG_HAVE_REQ0)
-#include <nng/protocol/reqrep0/req.h>
-#else
-#define nng_req0_open no_open
-#endif
-
-#if defined(NNG_HAVE_REP0)
-#include <nng/protocol/reqrep0/rep.h>
-#else
-#define nng_rep0_open no_open
-#endif
-
-#if defined(NNG_HAVE_BUS0)
-#include <nng/protocol/bus0/bus.h>
-#else
-#define nng_bus0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PULL0)
-#include <nng/protocol/pipeline0/pull.h>
-#else
-#define nng_pull0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PUSH0)
-#include <nng/protocol/pipeline0/push.h>
-#else
-#define nng_push0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PUB0)
-#include <nng/protocol/pubsub0/pub.h>
-#else
-#define nng_pub0_open no_open
-#endif
-
-#if defined(NNG_HAVE_SUB0)
-#include <nng/protocol/pubsub0/sub.h>
-#else
-#define nng_sub0_open no_open
-#endif
-
-enum options {
- OPT_PAIR0 = 1,
- OPT_PAIR1,
- OPT_REQREP0,
- OPT_PUBSUB0,
- OPT_PIPELINE0,
- OPT_SURVEY0,
- OPT_BUS0,
- OPT_URL,
-};
-
-// These are not universally supported by the variants yet.
-static nng_optspec opts[] = {
- { .o_name = "pair1", .o_val = OPT_PAIR1 },
- { .o_name = "pair0", .o_val = OPT_PAIR0 },
- { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
- { .o_name = "bus0", .o_val = OPT_BUS0 },
- { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
- { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
- { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
- { .o_name = NULL, .o_val = 0 },
-};
-
-static void latency_client(const char *, size_t, int);
-static void latency_server(const char *, size_t, int);
-static void throughput_client(const char *, size_t, int);
-static void throughput_server(const char *, size_t, int);
-static void do_remote_lat(int argc, char **argv);
-static void do_local_lat(int argc, char **argv);
-static void do_remote_thr(int argc, char **argv);
-static void do_local_thr(int argc, char **argv);
-static void do_inproc_thr(int argc, char **argv);
-static void do_inproc_lat(int argc, char **argv);
-static void die(const char *, ...);
-
-// perf implements the same performance tests found in the standard
-// nanomsg & mangos performance tests. As with mangos, the decision
-// about which test to run is determined by the program name (ARGV[0}])
-// that it is run under.
-//
-// Options are:
-//
-// - remote_lat - remote latency side (client, aka latency_client)
-// - local_lat - local latency side (server, aka latency_server)
-// - local_thr - local throughput side
-// - remote_thr - remote throughput side
-// - inproc_lat - inproc latency
-// - inproc_thr - inproc throughput
-//
-
-bool
-matches(const char *arg, const char *name)
-{
- const char *ptr = arg;
- const char *x;
-
- while (((x = strchr(ptr, '/')) != NULL) ||
- ((x = strchr(ptr, '\\')) != NULL) ||
- ((x = strchr(ptr, ':')) != NULL)) {
- ptr = x + 1;
- }
- for (;;) {
- if (*name == '\0') {
- break;
- }
- if (tolower(*ptr) != *name) {
- return (false);
- }
- ptr++;
- name++;
- }
-
- switch (*ptr) {
- case '\0':
- /* FALLTHROUGH*/
- case '.': // extension; ignore it.
- return (true);
- default: // some other trailing bit.
- return (false);
- }
-}
-
-const int PAIR0 = 0;
-const int PAIR1 = 1;
-const int REQREP = 2;
-
-int
-main(int argc, char **argv)
-{
- char *prog;
-
-#if defined(NNG_HAVE_PAIR1)
- open_server = nng_pair1_open;
- open_client = nng_pair1_open;
-#elif defined(NNG_HAVE_PAIR0)
- open_server = nng_pair0_open;
- open_client = nng_pair0_open;
-#endif
-
- // Allow -m <remote_lat> or whatever to override argv[0].
- if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
- prog = argv[2];
- argv += 3;
- argc -= 3;
- } else {
- prog = argv[0];
- argc--;
- argv++;
- }
- if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
- do_remote_lat(argc, argv);
- } else if (matches(prog, "local_lat") ||
- matches(prog, "latency_server")) {
- do_local_lat(argc, argv);
- } else if (matches(prog, "local_thr") ||
- matches(prog, "throughput_server")) {
- do_local_thr(argc, argv);
- } else if (matches(prog, "remote_thr") ||
- matches(prog, "throughput_client")) {
- do_remote_thr(argc, argv);
- } else if (matches(prog, "inproc_thr")) {
- do_inproc_thr(argc, argv);
- } else if (matches(prog, "inproc_lat")) {
- do_inproc_lat(argc, argv);
- } else {
- die("Unknown program mode? Use -m <mode>.");
- }
-}
-
-static void
-die(const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(2);
-}
-
-static int
-parse_int(const char *arg, const char *what)
-{
- long val;
- char *eptr;
-
- val = strtol(arg, &eptr, 10);
- // Must be a positive number less than around a billion.
- if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
- die("Invalid %s", what);
- }
- return ((int) val);
-}
-
-void
-do_local_lat(int argc, char **argv)
-{
- long int msgsize;
- long int trips;
-
- if (argc != 3) {
- die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
-
- latency_server(argv[0], msgsize, trips);
-}
-
-void
-do_remote_lat(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
-
- latency_client(argv[0], msgsize, trips);
-}
-
-void
-do_local_thr(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: local_thr <listen-addr> <msg-size> <count>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
-
- throughput_server(argv[0], msgsize, trips);
-}
-
-void
-do_remote_thr(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: remote_thr <connect-to> <msg-size> <count>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
-
- throughput_client(argv[0], msgsize, trips);
-}
-
-struct inproc_args {
- int count;
- int msgsize;
- const char *addr;
- void (*func)(const char *, size_t, int);
-};
-
-static void
-do_inproc(void *args)
-{
- struct inproc_args *ia = args;
-
- ia->func(ia->addr, ia->msgsize, ia->count);
-}
-
-void
-do_inproc_lat(int argc, char **argv)
-{
- nng_thread * thr;
- struct inproc_args ia;
- int rv;
- int val;
- int optidx;
- char * arg;
- char * addr;
-
- addr = "inproc://latency_test";
-
- optidx = 0;
- while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
- 0) {
- switch (val) {
- case OPT_REQREP0:
- open_client = nng_req0_open;
- open_server = nng_rep0_open;
- break;
- case OPT_PAIR0:
- open_client = nng_pair0_open;
- open_server = nng_pair0_open;
- break;
- case OPT_PAIR1:
- open_client = nng_pair1_open;
- open_server = nng_pair1_open;
- break;
- case OPT_BUS0:
- open_client = nng_bus0_open;
- open_server = nng_bus0_open;
- break;
-
- case OPT_URL:
- addr = arg;
- break;
- default:
- die("bad option");
- }
- }
- argc -= optidx;
- argv += optidx;
-
- if (argc != 2) {
- die("Usage: inproc_lat <msg-size> <count>");
- }
-
- ia.addr = addr;
- ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = latency_server;
-
- if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
- die("Cannot create thread: %s", nng_strerror(rv));
- }
-
- // Sleep a bit.
- nng_msleep(100);
-
- latency_client(addr, ia.msgsize, ia.count);
- nng_thread_destroy(thr);
-}
-
-void
-do_inproc_thr(int argc, char **argv)
-{
- nng_thread * thr;
- struct inproc_args ia;
- int rv;
- int optidx;
- int val;
- char * arg;
- char * addr = "inproc://throughput-test";
-
- optidx = 0;
- while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
- 0) {
- switch (val) {
-#if 0
- // For now these protocols simply do not work with
- // throughput -- they don't work with backpressure properly.
- // In the future we should support synchronizing in the same
- // process, and alerting the sender both on completion of
- // a single message, and on completion of all messages.
- case OPT_REQREP0:
- open_client = nng_req0_open;
- open_server = nng_rep0_open;
- break;
-#endif
- case OPT_PAIR0:
- open_client = nng_pair0_open;
- open_server = nng_pair0_open;
- break;
- case OPT_PAIR1:
- open_client = nng_pair1_open;
- open_server = nng_pair1_open;
- break;
- case OPT_PIPELINE0:
- open_client = nng_pull0_open;
- open_server = nng_push0_open;
- break;
- case OPT_URL:
- addr = arg;
- break;
- default:
- die("bad option");
- }
- }
- argc -= optidx;
- argv += optidx;
-
- if (argc != 2) {
- die("Usage: inproc_thr <msg-size> <count>");
- }
-
- ia.addr = addr;
- ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = throughput_server;
-
- if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
- die("Cannot create thread: %s", nng_strerror(rv));
- }
-
- // Sleep a bit.
- nng_msleep(100);
-
- throughput_client(addr, ia.msgsize, ia.count);
- nng_thread_destroy(thr);
-}
-
-void
-latency_client(const char *addr, size_t msgsize, int trips)
-{
- nng_socket s;
- nng_msg * msg;
- nng_time start, end;
- int rv;
- int i;
- float total;
- float latency;
- if ((rv = open_client(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
- die("nng_dial: %s", nng_strerror(rv));
- }
-
- nng_msleep(100);
-
- if (nng_msg_alloc(&msg, msgsize) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
-
- start = nng_clock();
- for (i = 0; i < trips; i++) {
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- }
- end = nng_clock();
-
- nng_msg_free(msg);
- nng_close(s);
-
- total = (float) ((end - start)) / 1000;
- latency = ((float) ((total * 1000000)) / (float) (trips * 2));
- printf("total time: %.3f [s]\n", total);
- printf("message size: %d [B]\n", (int) msgsize);
- printf("round trip count: %d\n", trips);
- printf("average latency: %.3f [us]\n", latency);
-}
-
-void
-latency_server(const char *addr, size_t msgsize, int trips)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
-
- if ((rv = open_server(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
- die("nng_listen: %s", nng_strerror(rv));
- }
-
- for (i = 0; i < trips; i++) {
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu", nng_msg_len(msg),
- msgsize);
- }
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
- }
-
- // Wait a bit for things to drain... linger should do this.
- // 100ms ought to be enough.
- nng_msleep(100);
- nng_close(s);
-}
-
-// Our throughput story is quite a mess. Mostly I think because of the poor
-// caching and message reuse. We should probably implement a message pooling
-// API somewhere.
-
-void
-throughput_server(const char *addr, size_t msgsize, int count)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
- uint64_t start, end;
- float msgpersec, mbps, total;
-
- if ((rv = nng_pair_open(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
- rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
- die("nng_listen: %s", nng_strerror(rv));
- }
-
- // Receive first synchronization message.
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- nng_msg_free(msg);
- start = nng_clock();
-
- for (i = 0; i < count; i++) {
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu", nng_msg_len(msg),
- msgsize);
- }
- nng_msg_free(msg);
- }
- end = nng_clock();
- // Send a synchronization message (empty) to the other side,
- // and wait a bit to make sure it goes out the wire.
- nng_send(s, "", 0, 0);
- nng_msleep(200);
- nng_close(s);
- total = (float) ((end - start)) / 1000;
- msgpersec = (float) (count) / total;
- mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
- printf("total time: %.3f [s]\n", total);
- printf("message size: %d [B]\n", (int) msgsize);
- printf("message count: %d\n", count);
- printf("throughput: %.f [msg/s]\n", msgpersec);
- printf("throughput: %.3f [Mb/s]\n", mbps);
-}
-
-void
-throughput_client(const char *addr, size_t msgsize, int count)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
-
- // We send one extra zero length message to start the timer.
- count++;
-
- if ((rv = nng_pair_open(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
- }
-
- rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
- }
-
- if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
- die("nng_dial: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
-
- for (i = 0; i < count; i++) {
- if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
- }
-
- // Attempt to get the completion indication from the other
- // side.
- if (nng_recvmsg(s, &msg, 0) == 0) {
- nng_msg_free(msg);
- }
-
- nng_close(s);
-}
diff --git a/perf/pubdrop.c b/perf/pubdrop.c
deleted file mode 100644
index be51bb2f..00000000
--- a/perf/pubdrop.c
+++ /dev/null
@@ -1,325 +0,0 @@
-//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <ctype.h>
-#include <stdarg.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/util/platform.h>
-
-// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
-// performance, including dropped messages, delivery across multiple threads,
-// etc. It actually uses a wild card subscription for now.
-
-#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-
-#else
-
-static void die(const char *, ...);
-
-static int
-nng_pub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Pub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-static int
-nng_sub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Sub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-#endif // NNG_HAVE_PUB0....
-
-static void die(const char *, ...);
-static void do_pubdrop(int argc, char **argv);
-static uint64_t nperusec;
-static volatile int x;
-
-void
-work(void)
-{
- x = rand();
-}
-
-void
-usdelay(unsigned long long nusec)
-{
- nusec *= nperusec;
- while (nusec > 0) {
- work();
- nusec--;
- }
-}
-
-int
-main(int argc, char **argv)
-{
- argc--;
- argv++;
-
- // We calculate a delay factor to roughly delay 1 usec. We don't
- // need this to be perfect, just reproducible on the same host.
- unsigned long cnt = 1000000;
-
- nng_time beg = nng_clock();
- for (unsigned long i = 0; i < cnt; i++) {
- work();
- }
- nng_time end = nng_clock();
- nperusec = cnt / (1000 * (end - beg));
-
- do_pubdrop(argc, argv);
-}
-
-static void
-die(const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(2);
-}
-
-static int
-parse_int(const char *arg, const char *what)
-{
- long val;
- char *eptr;
-
- val = strtol(arg, &eptr, 10);
- // Must be a postive number less than around a billion.
- if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
- die("Invalid %s", what);
- }
- return ((int) val);
-}
-
-struct pubdrop_args {
- const char * addr;
- bool start;
- unsigned long long msgsize;
- unsigned long long count;
- unsigned long long intvl;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long recvs;
- nng_time beg;
- nng_time end;
- nng_mtx * mtx;
- nng_cv * cv;
-};
-
-static void
-pub_server(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- nng_time start;
- nng_time end;
-
- if ((rv = nng_pub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
-
- nng_mtx_lock(pa->mtx);
- while (!pa->start) {
- nng_cv_wait(pa->cv);
- }
- nng_mtx_unlock(pa->mtx);
-
- start = nng_clock();
- for (uint64_t i = 0; i < pa->count; i++) {
- // Unfortunately we need to allocate messages dynamically as we
- // go. The other option would be to allocate them all up front,
- // but that could be a rather excessive amount of memory.
- if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
- die("Message alloc failed");
- }
- memcpy(nng_msg_body(msg), &i, sizeof(i));
- if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
- die("Sendmsg: %s", nng_strerror(rv));
- }
- // It sure would be nice if we had a usec granularity option
- // here.
- if (pa->intvl > 0) {
- usdelay((unsigned long long) pa->intvl);
- }
- }
-
- end = nng_clock();
-
- nng_msleep(1000); // drain the queue
- nng_close(sock);
-
- nng_mtx_lock(pa->mtx);
- pa->beg = start;
- pa->end = end;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-sub_client(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- unsigned long long recvs;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long expect;
-
- if ((rv = nng_sub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
-
- expect = 0;
- recvs = drops = gaps = errs = 0;
-
- while (expect < pa->count) {
- uint64_t got;
- if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
- if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
- // Closed without receiving the last message
- drops += (pa->count - expect);
- gaps++;
- break;
- }
- printf("ERROR: %s\n", nng_strerror(rv));
- errs++;
- break;
- }
- recvs++;
- memcpy(&got, nng_msg_body(msg), sizeof(got));
- nng_msg_free(msg);
- if (got != expect) {
- gaps++;
- if (got > expect) {
- drops += (got - expect);
- } else {
- die("Misordered delivery");
- }
- }
- expect = got + 1;
- }
-
- nng_mtx_lock(pa->mtx);
- pa->drops += drops;
- pa->errs += errs;
- pa->recvs += recvs;
- pa->gaps += gaps;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-do_pubdrop(int argc, char **argv)
-{
- nng_thread ** thrs;
- struct pubdrop_args pa;
- int rv;
- int nsubs;
-
- if (argc != 5) {
- die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
- "<interval>");
- }
-
- memset(&pa, 0, sizeof(pa));
- pa.addr = argv[0];
- pa.msgsize = parse_int(argv[1], "message size");
- pa.count = parse_int(argv[2], "count");
- pa.intvl = parse_int(argv[4], "interval");
- nsubs = parse_int(argv[3], "#subscribers");
-
- if (pa.msgsize < sizeof(uint64_t)) {
- die("Message size too small.");
- }
-
- thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
- if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
- ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
- die("Startup: %s\n", nng_strerror(rv));
- };
-
- if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
- die("Cannot create pub thread: %s", nng_strerror(rv));
- }
-
- nng_msleep(100); // give time for listener to start...
-
- for (int i = 0; i < nsubs; i++) {
- if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
- 0) {
- die("Cannot create sub thread: %s", nng_strerror(rv));
- }
- }
-
- // Sleep a bit for conns to establish.
- nng_msleep(2000);
- nng_mtx_lock(pa.mtx);
- pa.start = true;
- nng_cv_wake(pa.cv);
- nng_mtx_unlock(pa.mtx);
-
- for (int i = 0; i < nsubs + 1; i++) {
- nng_thread_destroy(thrs[i]);
- }
-
- nng_mtx_lock(pa.mtx);
-
- unsigned long long expect = nsubs * pa.count;
- unsigned long long missing = nsubs ? expect - pa.recvs : 0;
- double dur = (pa.end - pa.beg) / 1000.0;
-
- printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
- pa.count, dur, pa.count / dur);
- printf("Expected %llu messages total\n", expect);
- printf("Received %llu messages total\n", pa.recvs);
- printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
- printf("Errors %llu total\n", pa.errs);
- printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
- pa.gaps);
- printf("Dropped %llu messages (missing)\n", missing);
- printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
-
- nng_mtx_unlock(pa.mtx);
-}