diff options
Diffstat (limited to 'src/tools/perf')
| -rw-r--r-- | src/tools/perf/CMakeLists.txt | 35 | ||||
| -rw-r--r-- | src/tools/perf/perf.c | 662 | ||||
| -rw-r--r-- | src/tools/perf/pubdrop.c | 325 |
3 files changed, 1022 insertions, 0 deletions
diff --git a/src/tools/perf/CMakeLists.txt b/src/tools/perf/CMakeLists.txt new file mode 100644 index 00000000..135544bb --- /dev/null +++ b/src/tools/perf/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Build performance tests. + +nng_directory(perf) + +if (NNG_TESTS) + macro (add_nng_perf NAME) + add_executable (${NAME} perf.c) + target_link_libraries (${NAME} nng nng_private) + endmacro (add_nng_perf) + + add_nng_perf(remote_lat) + add_nng_perf(local_lat) + add_nng_perf(local_thr) + add_nng_perf(remote_thr) + add_nng_perf(inproc_thr) + add_nng_perf(inproc_lat) + + add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000) + set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30) + + add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000) + set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30) + + add_executable (pubdrop pubdrop.c) + target_link_libraries(pubdrop nng nng_private) +endif () diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c new file mode 100644 index 00000000..accac621 --- /dev/null +++ b/src/tools/perf/perf.c @@ -0,0 +1,662 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/options.h> +#include <nng/supplemental/util/platform.h> + +static void die(const char *, ...); +static int +no_open(nng_socket *arg) +{ + (void) arg; + die("Protocol not supported in this build!"); + return (NNG_ENOTSUP); +} + +typedef int (*open_func)(nng_socket *); + +static open_func open_server = no_open; +static open_func open_client = no_open; + +#if defined(NNG_HAVE_PAIR1) +#include <nng/protocol/pair1/pair.h> +#else +#define nng_pair1_open no_open +#endif + +#if defined(NNG_HAVE_PAIR0) +#include <nng/protocol/pair0/pair.h> +#else +#define nng_pair0_open no_open +#endif + +#if defined(NNG_HAVE_REQ0) +#include <nng/protocol/reqrep0/req.h> +#else +#define nng_req0_open no_open +#endif + +#if defined(NNG_HAVE_REP0) +#include <nng/protocol/reqrep0/rep.h> +#else +#define nng_rep0_open no_open +#endif + +#if defined(NNG_HAVE_BUS0) +#include <nng/protocol/bus0/bus.h> +#else +#define nng_bus0_open no_open +#endif + +#if defined(NNG_HAVE_PULL0) +#include <nng/protocol/pipeline0/pull.h> +#else +#define nng_pull0_open no_open +#endif + +#if defined(NNG_HAVE_PUSH0) +#include <nng/protocol/pipeline0/push.h> +#else +#define nng_push0_open no_open +#endif + +#if defined(NNG_HAVE_PUB0) +#include <nng/protocol/pubsub0/pub.h> +#else +#define nng_pub0_open no_open +#endif + +#if defined(NNG_HAVE_SUB0) +#include <nng/protocol/pubsub0/sub.h> +#else +#define nng_sub0_open no_open +#endif + +enum options { + OPT_PAIR0 = 1, + OPT_PAIR1, + OPT_REQREP0, + OPT_PUBSUB0, + OPT_PIPELINE0, + OPT_SURVEY0, + OPT_BUS0, + OPT_URL, +}; + +// These are not universally supported by the variants yet. +static nng_optspec opts[] = { + { .o_name = "pair1", .o_val = OPT_PAIR1 }, + { .o_name = "pair0", .o_val = OPT_PAIR0 }, + { .o_name = "reqrep0", .o_val = OPT_REQREP0 }, + { .o_name = "bus0", .o_val = OPT_BUS0 }, + { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 }, + { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 }, + { .o_name = "url", .o_val = OPT_URL, .o_arg = true }, + { .o_name = NULL, .o_val = 0 }, +}; + +static void latency_client(const char *, size_t, int); +static void latency_server(const char *, size_t, int); +static void throughput_client(const char *, size_t, int); +static void throughput_server(const char *, size_t, int); +static void do_remote_lat(int argc, char **argv); +static void do_local_lat(int argc, char **argv); +static void do_remote_thr(int argc, char **argv); +static void do_local_thr(int argc, char **argv); +static void do_inproc_thr(int argc, char **argv); +static void do_inproc_lat(int argc, char **argv); +static void die(const char *, ...); + +// perf implements the same performance tests found in the standard +// nanomsg & mangos performance tests. As with mangos, the decision +// about which test to run is determined by the program name (ARGV[0}]) +// that it is run under. +// +// Options are: +// +// - remote_lat - remote latency side (client, aka latency_client) +// - local_lat - local latency side (server, aka latency_server) +// - local_thr - local throughput side +// - remote_thr - remote throughput side +// - inproc_lat - inproc latency +// - inproc_thr - inproc throughput +// + +bool +matches(const char *arg, const char *name) +{ + const char *ptr = arg; + const char *x; + + while (((x = strchr(ptr, '/')) != NULL) || + ((x = strchr(ptr, '\\')) != NULL) || + ((x = strchr(ptr, ':')) != NULL)) { + ptr = x + 1; + } + for (;;) { + if (*name == '\0') { + break; + } + if (tolower(*ptr) != *name) { + return (false); + } + ptr++; + name++; + } + + switch (*ptr) { + case '\0': + /* FALLTHROUGH*/ + case '.': // extension; ignore it. + return (true); + default: // some other trailing bit. + return (false); + } +} + +const int PAIR0 = 0; +const int PAIR1 = 1; +const int REQREP = 2; + +int +main(int argc, char **argv) +{ + char *prog; + +#if defined(NNG_HAVE_PAIR1) + open_server = nng_pair1_open; + open_client = nng_pair1_open; +#elif defined(NNG_HAVE_PAIR0) + open_server = nng_pair0_open; + open_client = nng_pair0_open; +#endif + + // Allow -m <remote_lat> or whatever to override argv[0]. + if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) { + prog = argv[2]; + argv += 3; + argc -= 3; + } else { + prog = argv[0]; + argc--; + argv++; + } + if (matches(prog, "remote_lat") || matches(prog, "latency_client")) { + do_remote_lat(argc, argv); + } else if (matches(prog, "local_lat") || + matches(prog, "latency_server")) { + do_local_lat(argc, argv); + } else if (matches(prog, "local_thr") || + matches(prog, "throughput_server")) { + do_local_thr(argc, argv); + } else if (matches(prog, "remote_thr") || + matches(prog, "throughput_client")) { + do_remote_thr(argc, argv); + } else if (matches(prog, "inproc_thr")) { + do_inproc_thr(argc, argv); + } else if (matches(prog, "inproc_lat")) { + do_inproc_lat(argc, argv); + } else { + die("Unknown program mode? Use -m <mode>."); + } +} + +static void +die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(2); +} + +static int +parse_int(const char *arg, const char *what) +{ + long val; + char *eptr; + + val = strtol(arg, &eptr, 10); + // Must be a positive number less than around a billion. + if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) { + die("Invalid %s", what); + } + return ((int) val); +} + +void +do_local_lat(int argc, char **argv) +{ + long int msgsize; + long int trips; + + if (argc != 3) { + die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "round-trips"); + + latency_server(argv[0], msgsize, trips); +} + +void +do_remote_lat(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "round-trips"); + + latency_client(argv[0], msgsize, trips); +} + +void +do_local_thr(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: local_thr <listen-addr> <msg-size> <count>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_server(argv[0], msgsize, trips); +} + +void +do_remote_thr(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: remote_thr <connect-to> <msg-size> <count>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_client(argv[0], msgsize, trips); +} + +struct inproc_args { + int count; + int msgsize; + const char *addr; + void (*func)(const char *, size_t, int); +}; + +static void +do_inproc(void *args) +{ + struct inproc_args *ia = args; + + ia->func(ia->addr, ia->msgsize, ia->count); +} + +void +do_inproc_lat(int argc, char **argv) +{ + nng_thread * thr; + struct inproc_args ia; + int rv; + int val; + int optidx; + char * arg; + char * addr; + + addr = "inproc://latency_test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_BUS0: + open_client = nng_bus0_open; + open_server = nng_bus0_open; + break; + + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; + + if (argc != 2) { + die("Usage: inproc_lat <msg-size> <count>"); + } + + ia.addr = addr; + ia.msgsize = parse_int(argv[0], "message size"); + ia.count = parse_int(argv[1], "count"); + ia.func = latency_server; + + if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { + die("Cannot create thread: %s", nng_strerror(rv)); + } + + // Sleep a bit. + nng_msleep(100); + + latency_client(addr, ia.msgsize, ia.count); + nng_thread_destroy(thr); +} + +void +do_inproc_thr(int argc, char **argv) +{ + nng_thread * thr; + struct inproc_args ia; + int rv; + int optidx; + int val; + char * arg; + char * addr = "inproc://throughput-test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { +#if 0 + // For now these protocols simply do not work with + // throughput -- they don't work with backpressure properly. + // In the future we should support synchronizing in the same + // process, and alerting the sender both on completion of + // a single message, and on completion of all messages. + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; +#endif + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_PIPELINE0: + open_client = nng_pull0_open; + open_server = nng_push0_open; + break; + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; + + if (argc != 2) { + die("Usage: inproc_thr <msg-size> <count>"); + } + + ia.addr = addr; + ia.msgsize = parse_int(argv[0], "message size"); + ia.count = parse_int(argv[1], "count"); + ia.func = throughput_server; + + if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { + die("Cannot create thread: %s", nng_strerror(rv)); + } + + // Sleep a bit. + nng_msleep(100); + + throughput_client(addr, ia.msgsize, ia.count); + nng_thread_destroy(thr); +} + +void +latency_client(const char *addr, size_t msgsize, int trips) +{ + nng_socket s; + nng_msg * msg; + nng_time start, end; + int rv; + int i; + float total; + float latency; + if ((rv = open_client(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { + die("nng_dial: %s", nng_strerror(rv)); + } + + nng_msleep(100); + + if (nng_msg_alloc(&msg, msgsize) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + + start = nng_clock(); + for (i = 0; i < trips; i++) { + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + } + end = nng_clock(); + + nng_msg_free(msg); + nng_close(s); + + total = (float) ((end - start)) / 1000; + latency = ((float) ((total * 1000000)) / (float) (trips * 2)); + printf("total time: %.3f [s]\n", total); + printf("message size: %d [B]\n", (int) msgsize); + printf("round trip count: %d\n", trips); + printf("average latency: %.3f [us]\n", latency); +} + +void +latency_server(const char *addr, size_t msgsize, int trips) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + + if ((rv = open_server(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { + die("nng_listen: %s", nng_strerror(rv)); + } + + for (i = 0; i < trips; i++) { + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + if (nng_msg_len(msg) != msgsize) { + die("wrong message size: %lu != %lu", nng_msg_len(msg), + msgsize); + } + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + } + + // Wait a bit for things to drain... linger should do this. + // 100ms ought to be enough. + nng_msleep(100); + nng_close(s); +} + +// Our throughput story is quite a mess. Mostly I think because of the poor +// caching and message reuse. We should probably implement a message pooling +// API somewhere. + +void +throughput_server(const char *addr, size_t msgsize, int count) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + uint64_t start, end; + float msgpersec, mbps, total; + + if ((rv = nng_pair_open(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128); + if (rv != 0) { + die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { + die("nng_listen: %s", nng_strerror(rv)); + } + + // Receive first synchronization message. + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + nng_msg_free(msg); + start = nng_clock(); + + for (i = 0; i < count; i++) { + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + if (nng_msg_len(msg) != msgsize) { + die("wrong message size: %lu != %lu", nng_msg_len(msg), + msgsize); + } + nng_msg_free(msg); + } + end = nng_clock(); + // Send a synchronization message (empty) to the other side, + // and wait a bit to make sure it goes out the wire. + nng_send(s, "", 0, 0); + nng_msleep(200); + nng_close(s); + total = (float) ((end - start)) / 1000; + msgpersec = (float) (count) / total; + mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024); + printf("total time: %.3f [s]\n", total); + printf("message size: %d [B]\n", (int) msgsize); + printf("message count: %d\n", count); + printf("throughput: %.f [msg/s]\n", msgpersec); + printf("throughput: %.3f [Mb/s]\n", mbps); +} + +void +throughput_client(const char *addr, size_t msgsize, int count) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + + // We send one extra zero length message to start the timer. + count++; + + if ((rv = nng_pair_open(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128); + if (rv != 0) { + die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv)); + } + + rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000); + if (rv != 0) { + die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv)); + } + + if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { + die("nng_dial: %s", nng_strerror(rv)); + } + + if ((rv = nng_msg_alloc(&msg, 0)) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + + for (i = 0; i < count; i++) { + if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + } + + // Attempt to get the completion indication from the other + // side. + if (nng_recvmsg(s, &msg, 0) == 0) { + nng_msg_free(msg); + } + + nng_close(s); +} diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c new file mode 100644 index 00000000..be51bb2f --- /dev/null +++ b/src/tools/perf/pubdrop.c @@ -0,0 +1,325 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/platform.h> + +// pubdrop - this is a simple testing utility that lets us measure PUB/SUB +// performance, including dropped messages, delivery across multiple threads, +// etc. It actually uses a wild card subscription for now. + +#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0) +#include <nng/protocol/pubsub0/pub.h> +#include <nng/protocol/pubsub0/sub.h> + +#else + +static void die(const char *, ...); + +static int +nng_pub0_open(nng_socket *arg) +{ + (void) arg; + die("Pub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +static int +nng_sub0_open(nng_socket *arg) +{ + (void) arg; + die("Sub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +#endif // NNG_HAVE_PUB0.... + +static void die(const char *, ...); +static void do_pubdrop(int argc, char **argv); +static uint64_t nperusec; +static volatile int x; + +void +work(void) +{ + x = rand(); +} + +void +usdelay(unsigned long long nusec) +{ + nusec *= nperusec; + while (nusec > 0) { + work(); + nusec--; + } +} + +int +main(int argc, char **argv) +{ + argc--; + argv++; + + // We calculate a delay factor to roughly delay 1 usec. We don't + // need this to be perfect, just reproducible on the same host. + unsigned long cnt = 1000000; + + nng_time beg = nng_clock(); + for (unsigned long i = 0; i < cnt; i++) { + work(); + } + nng_time end = nng_clock(); + nperusec = cnt / (1000 * (end - beg)); + + do_pubdrop(argc, argv); +} + +static void +die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(2); +} + +static int +parse_int(const char *arg, const char *what) +{ + long val; + char *eptr; + + val = strtol(arg, &eptr, 10); + // Must be a postive number less than around a billion. + if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { + die("Invalid %s", what); + } + return ((int) val); +} + +struct pubdrop_args { + const char * addr; + bool start; + unsigned long long msgsize; + unsigned long long count; + unsigned long long intvl; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long recvs; + nng_time beg; + nng_time end; + nng_mtx * mtx; + nng_cv * cv; +}; + +static void +pub_server(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + + if ((rv = nng_pub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + + nng_mtx_lock(pa->mtx); + while (!pa->start) { + nng_cv_wait(pa->cv); + } + nng_mtx_unlock(pa->mtx); + + start = nng_clock(); + for (uint64_t i = 0; i < pa->count; i++) { + // Unfortunately we need to allocate messages dynamically as we + // go. The other option would be to allocate them all up front, + // but that could be a rather excessive amount of memory. + if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) { + die("Message alloc failed"); + } + memcpy(nng_msg_body(msg), &i, sizeof(i)); + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + die("Sendmsg: %s", nng_strerror(rv)); + } + // It sure would be nice if we had a usec granularity option + // here. + if (pa->intvl > 0) { + usdelay((unsigned long long) pa->intvl); + } + } + + end = nng_clock(); + + nng_msleep(1000); // drain the queue + nng_close(sock); + + nng_mtx_lock(pa->mtx); + pa->beg = start; + pa->end = end; + nng_mtx_unlock(pa->mtx); +} + +static void +sub_client(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + unsigned long long recvs; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long expect; + + if ((rv = nng_sub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + + expect = 0; + recvs = drops = gaps = errs = 0; + + while (expect < pa->count) { + uint64_t got; + if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) { + // Closed without receiving the last message + drops += (pa->count - expect); + gaps++; + break; + } + printf("ERROR: %s\n", nng_strerror(rv)); + errs++; + break; + } + recvs++; + memcpy(&got, nng_msg_body(msg), sizeof(got)); + nng_msg_free(msg); + if (got != expect) { + gaps++; + if (got > expect) { + drops += (got - expect); + } else { + die("Misordered delivery"); + } + } + expect = got + 1; + } + + nng_mtx_lock(pa->mtx); + pa->drops += drops; + pa->errs += errs; + pa->recvs += recvs; + pa->gaps += gaps; + nng_mtx_unlock(pa->mtx); +} + +static void +do_pubdrop(int argc, char **argv) +{ + nng_thread ** thrs; + struct pubdrop_args pa; + int rv; + int nsubs; + + if (argc != 5) { + die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> " + "<interval>"); + } + + memset(&pa, 0, sizeof(pa)); + pa.addr = argv[0]; + pa.msgsize = parse_int(argv[1], "message size"); + pa.count = parse_int(argv[2], "count"); + pa.intvl = parse_int(argv[4], "interval"); + nsubs = parse_int(argv[3], "#subscribers"); + + if (pa.msgsize < sizeof(uint64_t)) { + die("Message size too small."); + } + + thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1); + if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) || + ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) { + die("Startup: %s\n", nng_strerror(rv)); + }; + + if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) { + die("Cannot create pub thread: %s", nng_strerror(rv)); + } + + nng_msleep(100); // give time for listener to start... + + for (int i = 0; i < nsubs; i++) { + if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) != + 0) { + die("Cannot create sub thread: %s", nng_strerror(rv)); + } + } + + // Sleep a bit for conns to establish. + nng_msleep(2000); + nng_mtx_lock(pa.mtx); + pa.start = true; + nng_cv_wake(pa.cv); + nng_mtx_unlock(pa.mtx); + + for (int i = 0; i < nsubs + 1; i++) { + nng_thread_destroy(thrs[i]); + } + + nng_mtx_lock(pa.mtx); + + unsigned long long expect = nsubs * pa.count; + unsigned long long missing = nsubs ? expect - pa.recvs : 0; + double dur = (pa.end - pa.beg) / 1000.0; + + printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n", + pa.count, dur, pa.count / dur); + printf("Expected %llu messages total\n", expect); + printf("Received %llu messages total\n", pa.recvs); + printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur); + printf("Errors %llu total\n", pa.errs); + printf("Reported %llu dropped messages in %llu gaps\n", pa.drops, + pa.gaps); + printf("Dropped %llu messages (missing)\n", missing); + printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0); + + nng_mtx_unlock(pa.mtx); +} |
