aboutsummaryrefslogtreecommitdiff
path: root/perf
diff options
context:
space:
mode:
Diffstat (limited to 'perf')
-rw-r--r--perf/CMakeLists.txt33
-rw-r--r--perf/perf.c662
-rw-r--r--perf/pubdrop.c325
3 files changed, 0 insertions, 1020 deletions
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt
deleted file mode 100644
index 778e583b..00000000
--- a/perf/CMakeLists.txt
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-#
-# This software is supplied under the terms of the MIT License, a
-# copy of which should be located in the distribution where this
-# file was obtained (LICENSE.txt). A copy of the license may also be
-# found online at https://opensource.org/licenses/MIT.
-#
-
-# Build performance tests.
-
-if (NNG_TESTS)
- macro (add_nng_perf NAME)
- add_executable (${NAME} perf.c)
- target_link_libraries (${NAME} nng nng_private)
- endmacro (add_nng_perf)
-
- add_nng_perf(remote_lat)
- add_nng_perf(local_lat)
- add_nng_perf(local_thr)
- add_nng_perf(remote_thr)
- add_nng_perf(inproc_thr)
- add_nng_perf(inproc_lat)
-
- add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000)
- set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30)
-
- add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000)
- set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30)
-
- add_executable (pubdrop pubdrop.c)
- target_link_libraries(pubdrop nng nng_private)
-endif ()
diff --git a/perf/perf.c b/perf/perf.c
deleted file mode 100644
index accac621..00000000
--- a/perf/perf.c
+++ /dev/null
@@ -1,662 +0,0 @@
-//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <ctype.h>
-#include <stdarg.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/util/options.h>
-#include <nng/supplemental/util/platform.h>
-
-static void die(const char *, ...);
-static int
-no_open(nng_socket *arg)
-{
- (void) arg;
- die("Protocol not supported in this build!");
- return (NNG_ENOTSUP);
-}
-
-typedef int (*open_func)(nng_socket *);
-
-static open_func open_server = no_open;
-static open_func open_client = no_open;
-
-#if defined(NNG_HAVE_PAIR1)
-#include <nng/protocol/pair1/pair.h>
-#else
-#define nng_pair1_open no_open
-#endif
-
-#if defined(NNG_HAVE_PAIR0)
-#include <nng/protocol/pair0/pair.h>
-#else
-#define nng_pair0_open no_open
-#endif
-
-#if defined(NNG_HAVE_REQ0)
-#include <nng/protocol/reqrep0/req.h>
-#else
-#define nng_req0_open no_open
-#endif
-
-#if defined(NNG_HAVE_REP0)
-#include <nng/protocol/reqrep0/rep.h>
-#else
-#define nng_rep0_open no_open
-#endif
-
-#if defined(NNG_HAVE_BUS0)
-#include <nng/protocol/bus0/bus.h>
-#else
-#define nng_bus0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PULL0)
-#include <nng/protocol/pipeline0/pull.h>
-#else
-#define nng_pull0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PUSH0)
-#include <nng/protocol/pipeline0/push.h>
-#else
-#define nng_push0_open no_open
-#endif
-
-#if defined(NNG_HAVE_PUB0)
-#include <nng/protocol/pubsub0/pub.h>
-#else
-#define nng_pub0_open no_open
-#endif
-
-#if defined(NNG_HAVE_SUB0)
-#include <nng/protocol/pubsub0/sub.h>
-#else
-#define nng_sub0_open no_open
-#endif
-
-enum options {
- OPT_PAIR0 = 1,
- OPT_PAIR1,
- OPT_REQREP0,
- OPT_PUBSUB0,
- OPT_PIPELINE0,
- OPT_SURVEY0,
- OPT_BUS0,
- OPT_URL,
-};
-
-// These are not universally supported by the variants yet.
-static nng_optspec opts[] = {
- { .o_name = "pair1", .o_val = OPT_PAIR1 },
- { .o_name = "pair0", .o_val = OPT_PAIR0 },
- { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
- { .o_name = "bus0", .o_val = OPT_BUS0 },
- { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
- { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
- { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
- { .o_name = NULL, .o_val = 0 },
-};
-
-static void latency_client(const char *, size_t, int);
-static void latency_server(const char *, size_t, int);
-static void throughput_client(const char *, size_t, int);
-static void throughput_server(const char *, size_t, int);
-static void do_remote_lat(int argc, char **argv);
-static void do_local_lat(int argc, char **argv);
-static void do_remote_thr(int argc, char **argv);
-static void do_local_thr(int argc, char **argv);
-static void do_inproc_thr(int argc, char **argv);
-static void do_inproc_lat(int argc, char **argv);
-static void die(const char *, ...);
-
-// perf implements the same performance tests found in the standard
-// nanomsg & mangos performance tests. As with mangos, the decision
-// about which test to run is determined by the program name (ARGV[0}])
-// that it is run under.
-//
-// Options are:
-//
-// - remote_lat - remote latency side (client, aka latency_client)
-// - local_lat - local latency side (server, aka latency_server)
-// - local_thr - local throughput side
-// - remote_thr - remote throughput side
-// - inproc_lat - inproc latency
-// - inproc_thr - inproc throughput
-//
-
-bool
-matches(const char *arg, const char *name)
-{
- const char *ptr = arg;
- const char *x;
-
- while (((x = strchr(ptr, '/')) != NULL) ||
- ((x = strchr(ptr, '\\')) != NULL) ||
- ((x = strchr(ptr, ':')) != NULL)) {
- ptr = x + 1;
- }
- for (;;) {
- if (*name == '\0') {
- break;
- }
- if (tolower(*ptr) != *name) {
- return (false);
- }
- ptr++;
- name++;
- }
-
- switch (*ptr) {
- case '\0':
- /* FALLTHROUGH*/
- case '.': // extension; ignore it.
- return (true);
- default: // some other trailing bit.
- return (false);
- }
-}
-
-const int PAIR0 = 0;
-const int PAIR1 = 1;
-const int REQREP = 2;
-
-int
-main(int argc, char **argv)
-{
- char *prog;
-
-#if defined(NNG_HAVE_PAIR1)
- open_server = nng_pair1_open;
- open_client = nng_pair1_open;
-#elif defined(NNG_HAVE_PAIR0)
- open_server = nng_pair0_open;
- open_client = nng_pair0_open;
-#endif
-
- // Allow -m <remote_lat> or whatever to override argv[0].
- if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
- prog = argv[2];
- argv += 3;
- argc -= 3;
- } else {
- prog = argv[0];
- argc--;
- argv++;
- }
- if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
- do_remote_lat(argc, argv);
- } else if (matches(prog, "local_lat") ||
- matches(prog, "latency_server")) {
- do_local_lat(argc, argv);
- } else if (matches(prog, "local_thr") ||
- matches(prog, "throughput_server")) {
- do_local_thr(argc, argv);
- } else if (matches(prog, "remote_thr") ||
- matches(prog, "throughput_client")) {
- do_remote_thr(argc, argv);
- } else if (matches(prog, "inproc_thr")) {
- do_inproc_thr(argc, argv);
- } else if (matches(prog, "inproc_lat")) {
- do_inproc_lat(argc, argv);
- } else {
- die("Unknown program mode? Use -m <mode>.");
- }
-}
-
-static void
-die(const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(2);
-}
-
-static int
-parse_int(const char *arg, const char *what)
-{
- long val;
- char *eptr;
-
- val = strtol(arg, &eptr, 10);
- // Must be a positive number less than around a billion.
- if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
- die("Invalid %s", what);
- }
- return ((int) val);
-}
-
-void
-do_local_lat(int argc, char **argv)
-{
- long int msgsize;
- long int trips;
-
- if (argc != 3) {
- die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
-
- latency_server(argv[0], msgsize, trips);
-}
-
-void
-do_remote_lat(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "round-trips");
-
- latency_client(argv[0], msgsize, trips);
-}
-
-void
-do_local_thr(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: local_thr <listen-addr> <msg-size> <count>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
-
- throughput_server(argv[0], msgsize, trips);
-}
-
-void
-do_remote_thr(int argc, char **argv)
-{
- int msgsize;
- int trips;
-
- if (argc != 3) {
- die("Usage: remote_thr <connect-to> <msg-size> <count>");
- }
-
- msgsize = parse_int(argv[1], "message size");
- trips = parse_int(argv[2], "count");
-
- throughput_client(argv[0], msgsize, trips);
-}
-
-struct inproc_args {
- int count;
- int msgsize;
- const char *addr;
- void (*func)(const char *, size_t, int);
-};
-
-static void
-do_inproc(void *args)
-{
- struct inproc_args *ia = args;
-
- ia->func(ia->addr, ia->msgsize, ia->count);
-}
-
-void
-do_inproc_lat(int argc, char **argv)
-{
- nng_thread * thr;
- struct inproc_args ia;
- int rv;
- int val;
- int optidx;
- char * arg;
- char * addr;
-
- addr = "inproc://latency_test";
-
- optidx = 0;
- while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
- 0) {
- switch (val) {
- case OPT_REQREP0:
- open_client = nng_req0_open;
- open_server = nng_rep0_open;
- break;
- case OPT_PAIR0:
- open_client = nng_pair0_open;
- open_server = nng_pair0_open;
- break;
- case OPT_PAIR1:
- open_client = nng_pair1_open;
- open_server = nng_pair1_open;
- break;
- case OPT_BUS0:
- open_client = nng_bus0_open;
- open_server = nng_bus0_open;
- break;
-
- case OPT_URL:
- addr = arg;
- break;
- default:
- die("bad option");
- }
- }
- argc -= optidx;
- argv += optidx;
-
- if (argc != 2) {
- die("Usage: inproc_lat <msg-size> <count>");
- }
-
- ia.addr = addr;
- ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = latency_server;
-
- if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
- die("Cannot create thread: %s", nng_strerror(rv));
- }
-
- // Sleep a bit.
- nng_msleep(100);
-
- latency_client(addr, ia.msgsize, ia.count);
- nng_thread_destroy(thr);
-}
-
-void
-do_inproc_thr(int argc, char **argv)
-{
- nng_thread * thr;
- struct inproc_args ia;
- int rv;
- int optidx;
- int val;
- char * arg;
- char * addr = "inproc://throughput-test";
-
- optidx = 0;
- while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
- 0) {
- switch (val) {
-#if 0
- // For now these protocols simply do not work with
- // throughput -- they don't work with backpressure properly.
- // In the future we should support synchronizing in the same
- // process, and alerting the sender both on completion of
- // a single message, and on completion of all messages.
- case OPT_REQREP0:
- open_client = nng_req0_open;
- open_server = nng_rep0_open;
- break;
-#endif
- case OPT_PAIR0:
- open_client = nng_pair0_open;
- open_server = nng_pair0_open;
- break;
- case OPT_PAIR1:
- open_client = nng_pair1_open;
- open_server = nng_pair1_open;
- break;
- case OPT_PIPELINE0:
- open_client = nng_pull0_open;
- open_server = nng_push0_open;
- break;
- case OPT_URL:
- addr = arg;
- break;
- default:
- die("bad option");
- }
- }
- argc -= optidx;
- argv += optidx;
-
- if (argc != 2) {
- die("Usage: inproc_thr <msg-size> <count>");
- }
-
- ia.addr = addr;
- ia.msgsize = parse_int(argv[0], "message size");
- ia.count = parse_int(argv[1], "count");
- ia.func = throughput_server;
-
- if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
- die("Cannot create thread: %s", nng_strerror(rv));
- }
-
- // Sleep a bit.
- nng_msleep(100);
-
- throughput_client(addr, ia.msgsize, ia.count);
- nng_thread_destroy(thr);
-}
-
-void
-latency_client(const char *addr, size_t msgsize, int trips)
-{
- nng_socket s;
- nng_msg * msg;
- nng_time start, end;
- int rv;
- int i;
- float total;
- float latency;
- if ((rv = open_client(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
- die("nng_dial: %s", nng_strerror(rv));
- }
-
- nng_msleep(100);
-
- if (nng_msg_alloc(&msg, msgsize) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
-
- start = nng_clock();
- for (i = 0; i < trips; i++) {
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- }
- end = nng_clock();
-
- nng_msg_free(msg);
- nng_close(s);
-
- total = (float) ((end - start)) / 1000;
- latency = ((float) ((total * 1000000)) / (float) (trips * 2));
- printf("total time: %.3f [s]\n", total);
- printf("message size: %d [B]\n", (int) msgsize);
- printf("round trip count: %d\n", trips);
- printf("average latency: %.3f [us]\n", latency);
-}
-
-void
-latency_server(const char *addr, size_t msgsize, int trips)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
-
- if ((rv = open_server(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
- die("nng_listen: %s", nng_strerror(rv));
- }
-
- for (i = 0; i < trips; i++) {
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu", nng_msg_len(msg),
- msgsize);
- }
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
- }
-
- // Wait a bit for things to drain... linger should do this.
- // 100ms ought to be enough.
- nng_msleep(100);
- nng_close(s);
-}
-
-// Our throughput story is quite a mess. Mostly I think because of the poor
-// caching and message reuse. We should probably implement a message pooling
-// API somewhere.
-
-void
-throughput_server(const char *addr, size_t msgsize, int count)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
- uint64_t start, end;
- float msgpersec, mbps, total;
-
- if ((rv = nng_pair_open(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
- rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
- die("nng_listen: %s", nng_strerror(rv));
- }
-
- // Receive first synchronization message.
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- nng_msg_free(msg);
- start = nng_clock();
-
- for (i = 0; i < count; i++) {
- if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
- die("nng_recvmsg: %s", nng_strerror(rv));
- }
- if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu", nng_msg_len(msg),
- msgsize);
- }
- nng_msg_free(msg);
- }
- end = nng_clock();
- // Send a synchronization message (empty) to the other side,
- // and wait a bit to make sure it goes out the wire.
- nng_send(s, "", 0, 0);
- nng_msleep(200);
- nng_close(s);
- total = (float) ((end - start)) / 1000;
- msgpersec = (float) (count) / total;
- mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
- printf("total time: %.3f [s]\n", total);
- printf("message size: %d [B]\n", (int) msgsize);
- printf("message count: %d\n", count);
- printf("throughput: %.f [msg/s]\n", msgpersec);
- printf("throughput: %.3f [Mb/s]\n", mbps);
-}
-
-void
-throughput_client(const char *addr, size_t msgsize, int count)
-{
- nng_socket s;
- nng_msg * msg;
- int rv;
- int i;
-
- // We send one extra zero length message to start the timer.
- count++;
-
- if ((rv = nng_pair_open(&s)) != 0) {
- die("nng_socket: %s", nng_strerror(rv));
- }
-
- // XXX: set no delay
- // XXX: other options (TLS in the future?, Linger?)
-
- rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
- }
-
- rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
- if (rv != 0) {
- die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
- }
-
- if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
- die("nng_dial: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
-
- for (i = 0; i < count; i++) {
- if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
- die("nng_msg_alloc: %s", nng_strerror(rv));
- }
-
- if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
- die("nng_sendmsg: %s", nng_strerror(rv));
- }
- }
-
- // Attempt to get the completion indication from the other
- // side.
- if (nng_recvmsg(s, &msg, 0) == 0) {
- nng_msg_free(msg);
- }
-
- nng_close(s);
-}
diff --git a/perf/pubdrop.c b/perf/pubdrop.c
deleted file mode 100644
index be51bb2f..00000000
--- a/perf/pubdrop.c
+++ /dev/null
@@ -1,325 +0,0 @@
-//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <ctype.h>
-#include <stdarg.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/util/platform.h>
-
-// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
-// performance, including dropped messages, delivery across multiple threads,
-// etc. It actually uses a wild card subscription for now.
-
-#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-
-#else
-
-static void die(const char *, ...);
-
-static int
-nng_pub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Pub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-static int
-nng_sub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Sub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-#endif // NNG_HAVE_PUB0....
-
-static void die(const char *, ...);
-static void do_pubdrop(int argc, char **argv);
-static uint64_t nperusec;
-static volatile int x;
-
-void
-work(void)
-{
- x = rand();
-}
-
-void
-usdelay(unsigned long long nusec)
-{
- nusec *= nperusec;
- while (nusec > 0) {
- work();
- nusec--;
- }
-}
-
-int
-main(int argc, char **argv)
-{
- argc--;
- argv++;
-
- // We calculate a delay factor to roughly delay 1 usec. We don't
- // need this to be perfect, just reproducible on the same host.
- unsigned long cnt = 1000000;
-
- nng_time beg = nng_clock();
- for (unsigned long i = 0; i < cnt; i++) {
- work();
- }
- nng_time end = nng_clock();
- nperusec = cnt / (1000 * (end - beg));
-
- do_pubdrop(argc, argv);
-}
-
-static void
-die(const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(2);
-}
-
-static int
-parse_int(const char *arg, const char *what)
-{
- long val;
- char *eptr;
-
- val = strtol(arg, &eptr, 10);
- // Must be a postive number less than around a billion.
- if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
- die("Invalid %s", what);
- }
- return ((int) val);
-}
-
-struct pubdrop_args {
- const char * addr;
- bool start;
- unsigned long long msgsize;
- unsigned long long count;
- unsigned long long intvl;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long recvs;
- nng_time beg;
- nng_time end;
- nng_mtx * mtx;
- nng_cv * cv;
-};
-
-static void
-pub_server(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- nng_time start;
- nng_time end;
-
- if ((rv = nng_pub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
-
- nng_mtx_lock(pa->mtx);
- while (!pa->start) {
- nng_cv_wait(pa->cv);
- }
- nng_mtx_unlock(pa->mtx);
-
- start = nng_clock();
- for (uint64_t i = 0; i < pa->count; i++) {
- // Unfortunately we need to allocate messages dynamically as we
- // go. The other option would be to allocate them all up front,
- // but that could be a rather excessive amount of memory.
- if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
- die("Message alloc failed");
- }
- memcpy(nng_msg_body(msg), &i, sizeof(i));
- if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
- die("Sendmsg: %s", nng_strerror(rv));
- }
- // It sure would be nice if we had a usec granularity option
- // here.
- if (pa->intvl > 0) {
- usdelay((unsigned long long) pa->intvl);
- }
- }
-
- end = nng_clock();
-
- nng_msleep(1000); // drain the queue
- nng_close(sock);
-
- nng_mtx_lock(pa->mtx);
- pa->beg = start;
- pa->end = end;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-sub_client(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- unsigned long long recvs;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long expect;
-
- if ((rv = nng_sub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
-
- expect = 0;
- recvs = drops = gaps = errs = 0;
-
- while (expect < pa->count) {
- uint64_t got;
- if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
- if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
- // Closed without receiving the last message
- drops += (pa->count - expect);
- gaps++;
- break;
- }
- printf("ERROR: %s\n", nng_strerror(rv));
- errs++;
- break;
- }
- recvs++;
- memcpy(&got, nng_msg_body(msg), sizeof(got));
- nng_msg_free(msg);
- if (got != expect) {
- gaps++;
- if (got > expect) {
- drops += (got - expect);
- } else {
- die("Misordered delivery");
- }
- }
- expect = got + 1;
- }
-
- nng_mtx_lock(pa->mtx);
- pa->drops += drops;
- pa->errs += errs;
- pa->recvs += recvs;
- pa->gaps += gaps;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-do_pubdrop(int argc, char **argv)
-{
- nng_thread ** thrs;
- struct pubdrop_args pa;
- int rv;
- int nsubs;
-
- if (argc != 5) {
- die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
- "<interval>");
- }
-
- memset(&pa, 0, sizeof(pa));
- pa.addr = argv[0];
- pa.msgsize = parse_int(argv[1], "message size");
- pa.count = parse_int(argv[2], "count");
- pa.intvl = parse_int(argv[4], "interval");
- nsubs = parse_int(argv[3], "#subscribers");
-
- if (pa.msgsize < sizeof(uint64_t)) {
- die("Message size too small.");
- }
-
- thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
- if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
- ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
- die("Startup: %s\n", nng_strerror(rv));
- };
-
- if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
- die("Cannot create pub thread: %s", nng_strerror(rv));
- }
-
- nng_msleep(100); // give time for listener to start...
-
- for (int i = 0; i < nsubs; i++) {
- if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
- 0) {
- die("Cannot create sub thread: %s", nng_strerror(rv));
- }
- }
-
- // Sleep a bit for conns to establish.
- nng_msleep(2000);
- nng_mtx_lock(pa.mtx);
- pa.start = true;
- nng_cv_wake(pa.cv);
- nng_mtx_unlock(pa.mtx);
-
- for (int i = 0; i < nsubs + 1; i++) {
- nng_thread_destroy(thrs[i]);
- }
-
- nng_mtx_lock(pa.mtx);
-
- unsigned long long expect = nsubs * pa.count;
- unsigned long long missing = nsubs ? expect - pa.recvs : 0;
- double dur = (pa.end - pa.beg) / 1000.0;
-
- printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
- pa.count, dur, pa.count / dur);
- printf("Expected %llu messages total\n", expect);
- printf("Received %llu messages total\n", pa.recvs);
- printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
- printf("Errors %llu total\n", pa.errs);
- printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
- pa.gaps);
- printf("Dropped %llu messages (missing)\n", missing);
- printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
-
- nng_mtx_unlock(pa.mtx);
-}