diff options
Diffstat (limited to 'perf')
| -rw-r--r-- | perf/CMakeLists.txt | 33 | ||||
| -rw-r--r-- | perf/perf.c | 662 | ||||
| -rw-r--r-- | perf/pubdrop.c | 325 |
3 files changed, 0 insertions, 1020 deletions
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt deleted file mode 100644 index 778e583b..00000000 --- a/perf/CMakeLists.txt +++ /dev/null @@ -1,33 +0,0 @@ -# -# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -# -# This software is supplied under the terms of the MIT License, a -# copy of which should be located in the distribution where this -# file was obtained (LICENSE.txt). A copy of the license may also be -# found online at https://opensource.org/licenses/MIT. -# - -# Build performance tests. - -if (NNG_TESTS) - macro (add_nng_perf NAME) - add_executable (${NAME} perf.c) - target_link_libraries (${NAME} nng nng_private) - endmacro (add_nng_perf) - - add_nng_perf(remote_lat) - add_nng_perf(local_lat) - add_nng_perf(local_thr) - add_nng_perf(remote_thr) - add_nng_perf(inproc_thr) - add_nng_perf(inproc_lat) - - add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000) - set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30) - - add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000) - set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30) - - add_executable (pubdrop pubdrop.c) - target_link_libraries(pubdrop nng nng_private) -endif () diff --git a/perf/perf.c b/perf/perf.c deleted file mode 100644 index accac621..00000000 --- a/perf/perf.c +++ /dev/null @@ -1,662 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <ctype.h> -#include <stdarg.h> -#include <stdint.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <nng/nng.h> -#include <nng/supplemental/util/options.h> -#include <nng/supplemental/util/platform.h> - -static void die(const char *, ...); -static int -no_open(nng_socket *arg) -{ - (void) arg; - die("Protocol not supported in this build!"); - return (NNG_ENOTSUP); -} - -typedef int (*open_func)(nng_socket *); - -static open_func open_server = no_open; -static open_func open_client = no_open; - -#if defined(NNG_HAVE_PAIR1) -#include <nng/protocol/pair1/pair.h> -#else -#define nng_pair1_open no_open -#endif - -#if defined(NNG_HAVE_PAIR0) -#include <nng/protocol/pair0/pair.h> -#else -#define nng_pair0_open no_open -#endif - -#if defined(NNG_HAVE_REQ0) -#include <nng/protocol/reqrep0/req.h> -#else -#define nng_req0_open no_open -#endif - -#if defined(NNG_HAVE_REP0) -#include <nng/protocol/reqrep0/rep.h> -#else -#define nng_rep0_open no_open -#endif - -#if defined(NNG_HAVE_BUS0) -#include <nng/protocol/bus0/bus.h> -#else -#define nng_bus0_open no_open -#endif - -#if defined(NNG_HAVE_PULL0) -#include <nng/protocol/pipeline0/pull.h> -#else -#define nng_pull0_open no_open -#endif - -#if defined(NNG_HAVE_PUSH0) -#include <nng/protocol/pipeline0/push.h> -#else -#define nng_push0_open no_open -#endif - -#if defined(NNG_HAVE_PUB0) -#include <nng/protocol/pubsub0/pub.h> -#else -#define nng_pub0_open no_open -#endif - -#if defined(NNG_HAVE_SUB0) -#include <nng/protocol/pubsub0/sub.h> -#else -#define nng_sub0_open no_open -#endif - -enum options { - OPT_PAIR0 = 1, - OPT_PAIR1, - OPT_REQREP0, - OPT_PUBSUB0, - OPT_PIPELINE0, - OPT_SURVEY0, - OPT_BUS0, - OPT_URL, -}; - -// These are not universally supported by the variants yet. -static nng_optspec opts[] = { - { .o_name = "pair1", .o_val = OPT_PAIR1 }, - { .o_name = "pair0", .o_val = OPT_PAIR0 }, - { .o_name = "reqrep0", .o_val = OPT_REQREP0 }, - { .o_name = "bus0", .o_val = OPT_BUS0 }, - { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 }, - { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 }, - { .o_name = "url", .o_val = OPT_URL, .o_arg = true }, - { .o_name = NULL, .o_val = 0 }, -}; - -static void latency_client(const char *, size_t, int); -static void latency_server(const char *, size_t, int); -static void throughput_client(const char *, size_t, int); -static void throughput_server(const char *, size_t, int); -static void do_remote_lat(int argc, char **argv); -static void do_local_lat(int argc, char **argv); -static void do_remote_thr(int argc, char **argv); -static void do_local_thr(int argc, char **argv); -static void do_inproc_thr(int argc, char **argv); -static void do_inproc_lat(int argc, char **argv); -static void die(const char *, ...); - -// perf implements the same performance tests found in the standard -// nanomsg & mangos performance tests. As with mangos, the decision -// about which test to run is determined by the program name (ARGV[0}]) -// that it is run under. -// -// Options are: -// -// - remote_lat - remote latency side (client, aka latency_client) -// - local_lat - local latency side (server, aka latency_server) -// - local_thr - local throughput side -// - remote_thr - remote throughput side -// - inproc_lat - inproc latency -// - inproc_thr - inproc throughput -// - -bool -matches(const char *arg, const char *name) -{ - const char *ptr = arg; - const char *x; - - while (((x = strchr(ptr, '/')) != NULL) || - ((x = strchr(ptr, '\\')) != NULL) || - ((x = strchr(ptr, ':')) != NULL)) { - ptr = x + 1; - } - for (;;) { - if (*name == '\0') { - break; - } - if (tolower(*ptr) != *name) { - return (false); - } - ptr++; - name++; - } - - switch (*ptr) { - case '\0': - /* FALLTHROUGH*/ - case '.': // extension; ignore it. - return (true); - default: // some other trailing bit. - return (false); - } -} - -const int PAIR0 = 0; -const int PAIR1 = 1; -const int REQREP = 2; - -int -main(int argc, char **argv) -{ - char *prog; - -#if defined(NNG_HAVE_PAIR1) - open_server = nng_pair1_open; - open_client = nng_pair1_open; -#elif defined(NNG_HAVE_PAIR0) - open_server = nng_pair0_open; - open_client = nng_pair0_open; -#endif - - // Allow -m <remote_lat> or whatever to override argv[0]. - if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) { - prog = argv[2]; - argv += 3; - argc -= 3; - } else { - prog = argv[0]; - argc--; - argv++; - } - if (matches(prog, "remote_lat") || matches(prog, "latency_client")) { - do_remote_lat(argc, argv); - } else if (matches(prog, "local_lat") || - matches(prog, "latency_server")) { - do_local_lat(argc, argv); - } else if (matches(prog, "local_thr") || - matches(prog, "throughput_server")) { - do_local_thr(argc, argv); - } else if (matches(prog, "remote_thr") || - matches(prog, "throughput_client")) { - do_remote_thr(argc, argv); - } else if (matches(prog, "inproc_thr")) { - do_inproc_thr(argc, argv); - } else if (matches(prog, "inproc_lat")) { - do_inproc_lat(argc, argv); - } else { - die("Unknown program mode? Use -m <mode>."); - } -} - -static void -die(const char *fmt, ...) -{ - va_list ap; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - exit(2); -} - -static int -parse_int(const char *arg, const char *what) -{ - long val; - char *eptr; - - val = strtol(arg, &eptr, 10); - // Must be a positive number less than around a billion. - if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) { - die("Invalid %s", what); - } - return ((int) val); -} - -void -do_local_lat(int argc, char **argv) -{ - long int msgsize; - long int trips; - - if (argc != 3) { - die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>"); - } - - msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "round-trips"); - - latency_server(argv[0], msgsize, trips); -} - -void -do_remote_lat(int argc, char **argv) -{ - int msgsize; - int trips; - - if (argc != 3) { - die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>"); - } - - msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "round-trips"); - - latency_client(argv[0], msgsize, trips); -} - -void -do_local_thr(int argc, char **argv) -{ - int msgsize; - int trips; - - if (argc != 3) { - die("Usage: local_thr <listen-addr> <msg-size> <count>"); - } - - msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "count"); - - throughput_server(argv[0], msgsize, trips); -} - -void -do_remote_thr(int argc, char **argv) -{ - int msgsize; - int trips; - - if (argc != 3) { - die("Usage: remote_thr <connect-to> <msg-size> <count>"); - } - - msgsize = parse_int(argv[1], "message size"); - trips = parse_int(argv[2], "count"); - - throughput_client(argv[0], msgsize, trips); -} - -struct inproc_args { - int count; - int msgsize; - const char *addr; - void (*func)(const char *, size_t, int); -}; - -static void -do_inproc(void *args) -{ - struct inproc_args *ia = args; - - ia->func(ia->addr, ia->msgsize, ia->count); -} - -void -do_inproc_lat(int argc, char **argv) -{ - nng_thread * thr; - struct inproc_args ia; - int rv; - int val; - int optidx; - char * arg; - char * addr; - - addr = "inproc://latency_test"; - - optidx = 0; - while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == - 0) { - switch (val) { - case OPT_REQREP0: - open_client = nng_req0_open; - open_server = nng_rep0_open; - break; - case OPT_PAIR0: - open_client = nng_pair0_open; - open_server = nng_pair0_open; - break; - case OPT_PAIR1: - open_client = nng_pair1_open; - open_server = nng_pair1_open; - break; - case OPT_BUS0: - open_client = nng_bus0_open; - open_server = nng_bus0_open; - break; - - case OPT_URL: - addr = arg; - break; - default: - die("bad option"); - } - } - argc -= optidx; - argv += optidx; - - if (argc != 2) { - die("Usage: inproc_lat <msg-size> <count>"); - } - - ia.addr = addr; - ia.msgsize = parse_int(argv[0], "message size"); - ia.count = parse_int(argv[1], "count"); - ia.func = latency_server; - - if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { - die("Cannot create thread: %s", nng_strerror(rv)); - } - - // Sleep a bit. - nng_msleep(100); - - latency_client(addr, ia.msgsize, ia.count); - nng_thread_destroy(thr); -} - -void -do_inproc_thr(int argc, char **argv) -{ - nng_thread * thr; - struct inproc_args ia; - int rv; - int optidx; - int val; - char * arg; - char * addr = "inproc://throughput-test"; - - optidx = 0; - while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == - 0) { - switch (val) { -#if 0 - // For now these protocols simply do not work with - // throughput -- they don't work with backpressure properly. - // In the future we should support synchronizing in the same - // process, and alerting the sender both on completion of - // a single message, and on completion of all messages. - case OPT_REQREP0: - open_client = nng_req0_open; - open_server = nng_rep0_open; - break; -#endif - case OPT_PAIR0: - open_client = nng_pair0_open; - open_server = nng_pair0_open; - break; - case OPT_PAIR1: - open_client = nng_pair1_open; - open_server = nng_pair1_open; - break; - case OPT_PIPELINE0: - open_client = nng_pull0_open; - open_server = nng_push0_open; - break; - case OPT_URL: - addr = arg; - break; - default: - die("bad option"); - } - } - argc -= optidx; - argv += optidx; - - if (argc != 2) { - die("Usage: inproc_thr <msg-size> <count>"); - } - - ia.addr = addr; - ia.msgsize = parse_int(argv[0], "message size"); - ia.count = parse_int(argv[1], "count"); - ia.func = throughput_server; - - if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { - die("Cannot create thread: %s", nng_strerror(rv)); - } - - // Sleep a bit. - nng_msleep(100); - - throughput_client(addr, ia.msgsize, ia.count); - nng_thread_destroy(thr); -} - -void -latency_client(const char *addr, size_t msgsize, int trips) -{ - nng_socket s; - nng_msg * msg; - nng_time start, end; - int rv; - int i; - float total; - float latency; - if ((rv = open_client(&s)) != 0) { - die("nng_socket: %s", nng_strerror(rv)); - } - - // XXX: set no delay - // XXX: other options (TLS in the future?, Linger?) - - if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { - die("nng_dial: %s", nng_strerror(rv)); - } - - nng_msleep(100); - - if (nng_msg_alloc(&msg, msgsize) != 0) { - die("nng_msg_alloc: %s", nng_strerror(rv)); - } - - start = nng_clock(); - for (i = 0; i < trips; i++) { - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - die("nng_sendmsg: %s", nng_strerror(rv)); - } - - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - die("nng_recvmsg: %s", nng_strerror(rv)); - } - } - end = nng_clock(); - - nng_msg_free(msg); - nng_close(s); - - total = (float) ((end - start)) / 1000; - latency = ((float) ((total * 1000000)) / (float) (trips * 2)); - printf("total time: %.3f [s]\n", total); - printf("message size: %d [B]\n", (int) msgsize); - printf("round trip count: %d\n", trips); - printf("average latency: %.3f [us]\n", latency); -} - -void -latency_server(const char *addr, size_t msgsize, int trips) -{ - nng_socket s; - nng_msg * msg; - int rv; - int i; - - if ((rv = open_server(&s)) != 0) { - die("nng_socket: %s", nng_strerror(rv)); - } - - // XXX: set no delay - // XXX: other options (TLS in the future?, Linger?) - - if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { - die("nng_listen: %s", nng_strerror(rv)); - } - - for (i = 0; i < trips; i++) { - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - die("nng_recvmsg: %s", nng_strerror(rv)); - } - if (nng_msg_len(msg) != msgsize) { - die("wrong message size: %lu != %lu", nng_msg_len(msg), - msgsize); - } - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - die("nng_sendmsg: %s", nng_strerror(rv)); - } - } - - // Wait a bit for things to drain... linger should do this. - // 100ms ought to be enough. - nng_msleep(100); - nng_close(s); -} - -// Our throughput story is quite a mess. Mostly I think because of the poor -// caching and message reuse. We should probably implement a message pooling -// API somewhere. - -void -throughput_server(const char *addr, size_t msgsize, int count) -{ - nng_socket s; - nng_msg * msg; - int rv; - int i; - uint64_t start, end; - float msgpersec, mbps, total; - - if ((rv = nng_pair_open(&s)) != 0) { - die("nng_socket: %s", nng_strerror(rv)); - } - rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128); - if (rv != 0) { - die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv)); - } - - // XXX: set no delay - // XXX: other options (TLS in the future?, Linger?) - - if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { - die("nng_listen: %s", nng_strerror(rv)); - } - - // Receive first synchronization message. - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - die("nng_recvmsg: %s", nng_strerror(rv)); - } - nng_msg_free(msg); - start = nng_clock(); - - for (i = 0; i < count; i++) { - if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { - die("nng_recvmsg: %s", nng_strerror(rv)); - } - if (nng_msg_len(msg) != msgsize) { - die("wrong message size: %lu != %lu", nng_msg_len(msg), - msgsize); - } - nng_msg_free(msg); - } - end = nng_clock(); - // Send a synchronization message (empty) to the other side, - // and wait a bit to make sure it goes out the wire. - nng_send(s, "", 0, 0); - nng_msleep(200); - nng_close(s); - total = (float) ((end - start)) / 1000; - msgpersec = (float) (count) / total; - mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024); - printf("total time: %.3f [s]\n", total); - printf("message size: %d [B]\n", (int) msgsize); - printf("message count: %d\n", count); - printf("throughput: %.f [msg/s]\n", msgpersec); - printf("throughput: %.3f [Mb/s]\n", mbps); -} - -void -throughput_client(const char *addr, size_t msgsize, int count) -{ - nng_socket s; - nng_msg * msg; - int rv; - int i; - - // We send one extra zero length message to start the timer. - count++; - - if ((rv = nng_pair_open(&s)) != 0) { - die("nng_socket: %s", nng_strerror(rv)); - } - - // XXX: set no delay - // XXX: other options (TLS in the future?, Linger?) - - rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128); - if (rv != 0) { - die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv)); - } - - rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000); - if (rv != 0) { - die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv)); - } - - if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { - die("nng_dial: %s", nng_strerror(rv)); - } - - if ((rv = nng_msg_alloc(&msg, 0)) != 0) { - die("nng_msg_alloc: %s", nng_strerror(rv)); - } - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - die("nng_sendmsg: %s", nng_strerror(rv)); - } - - for (i = 0; i < count; i++) { - if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) { - die("nng_msg_alloc: %s", nng_strerror(rv)); - } - - if ((rv = nng_sendmsg(s, msg, 0)) != 0) { - die("nng_sendmsg: %s", nng_strerror(rv)); - } - } - - // Attempt to get the completion indication from the other - // side. - if (nng_recvmsg(s, &msg, 0) == 0) { - nng_msg_free(msg); - } - - nng_close(s); -} diff --git a/perf/pubdrop.c b/perf/pubdrop.c deleted file mode 100644 index be51bb2f..00000000 --- a/perf/pubdrop.c +++ /dev/null @@ -1,325 +0,0 @@ -// -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <ctype.h> -#include <stdarg.h> -#include <stdint.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <nng/nng.h> -#include <nng/supplemental/util/platform.h> - -// pubdrop - this is a simple testing utility that lets us measure PUB/SUB -// performance, including dropped messages, delivery across multiple threads, -// etc. It actually uses a wild card subscription for now. - -#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0) -#include <nng/protocol/pubsub0/pub.h> -#include <nng/protocol/pubsub0/sub.h> - -#else - -static void die(const char *, ...); - -static int -nng_pub0_open(nng_socket *arg) -{ - (void) arg; - die("Pub protocol enabled in this build!"); - return (NNG_ENOTSUP); -} - -static int -nng_sub0_open(nng_socket *arg) -{ - (void) arg; - die("Sub protocol enabled in this build!"); - return (NNG_ENOTSUP); -} - -#endif // NNG_HAVE_PUB0.... - -static void die(const char *, ...); -static void do_pubdrop(int argc, char **argv); -static uint64_t nperusec; -static volatile int x; - -void -work(void) -{ - x = rand(); -} - -void -usdelay(unsigned long long nusec) -{ - nusec *= nperusec; - while (nusec > 0) { - work(); - nusec--; - } -} - -int -main(int argc, char **argv) -{ - argc--; - argv++; - - // We calculate a delay factor to roughly delay 1 usec. We don't - // need this to be perfect, just reproducible on the same host. - unsigned long cnt = 1000000; - - nng_time beg = nng_clock(); - for (unsigned long i = 0; i < cnt; i++) { - work(); - } - nng_time end = nng_clock(); - nperusec = cnt / (1000 * (end - beg)); - - do_pubdrop(argc, argv); -} - -static void -die(const char *fmt, ...) -{ - va_list ap; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - exit(2); -} - -static int -parse_int(const char *arg, const char *what) -{ - long val; - char *eptr; - - val = strtol(arg, &eptr, 10); - // Must be a postive number less than around a billion. - if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { - die("Invalid %s", what); - } - return ((int) val); -} - -struct pubdrop_args { - const char * addr; - bool start; - unsigned long long msgsize; - unsigned long long count; - unsigned long long intvl; - unsigned long long drops; - unsigned long long gaps; - unsigned long long errs; - unsigned long long recvs; - nng_time beg; - nng_time end; - nng_mtx * mtx; - nng_cv * cv; -}; - -static void -pub_server(void *arg) -{ - struct pubdrop_args *pa = arg; - nng_socket sock; - int rv; - nng_msg * msg; - nng_time start; - nng_time end; - - if ((rv = nng_pub0_open(&sock)) != 0) { - die("Cannot open sub: %s", nng_strerror(rv)); - } - if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) { - die("Cannot listen: %s", nng_strerror(rv)); - } - - nng_mtx_lock(pa->mtx); - while (!pa->start) { - nng_cv_wait(pa->cv); - } - nng_mtx_unlock(pa->mtx); - - start = nng_clock(); - for (uint64_t i = 0; i < pa->count; i++) { - // Unfortunately we need to allocate messages dynamically as we - // go. The other option would be to allocate them all up front, - // but that could be a rather excessive amount of memory. - if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) { - die("Message alloc failed"); - } - memcpy(nng_msg_body(msg), &i, sizeof(i)); - if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { - die("Sendmsg: %s", nng_strerror(rv)); - } - // It sure would be nice if we had a usec granularity option - // here. - if (pa->intvl > 0) { - usdelay((unsigned long long) pa->intvl); - } - } - - end = nng_clock(); - - nng_msleep(1000); // drain the queue - nng_close(sock); - - nng_mtx_lock(pa->mtx); - pa->beg = start; - pa->end = end; - nng_mtx_unlock(pa->mtx); -} - -static void -sub_client(void *arg) -{ - struct pubdrop_args *pa = arg; - nng_socket sock; - int rv; - nng_msg * msg; - unsigned long long recvs; - unsigned long long drops; - unsigned long long gaps; - unsigned long long errs; - unsigned long long expect; - - if ((rv = nng_sub0_open(&sock)) != 0) { - die("Cannot open sub: %s", nng_strerror(rv)); - } - if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) { - die("Cannot listen: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - - expect = 0; - recvs = drops = gaps = errs = 0; - - while (expect < pa->count) { - uint64_t got; - if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { - if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) { - // Closed without receiving the last message - drops += (pa->count - expect); - gaps++; - break; - } - printf("ERROR: %s\n", nng_strerror(rv)); - errs++; - break; - } - recvs++; - memcpy(&got, nng_msg_body(msg), sizeof(got)); - nng_msg_free(msg); - if (got != expect) { - gaps++; - if (got > expect) { - drops += (got - expect); - } else { - die("Misordered delivery"); - } - } - expect = got + 1; - } - - nng_mtx_lock(pa->mtx); - pa->drops += drops; - pa->errs += errs; - pa->recvs += recvs; - pa->gaps += gaps; - nng_mtx_unlock(pa->mtx); -} - -static void -do_pubdrop(int argc, char **argv) -{ - nng_thread ** thrs; - struct pubdrop_args pa; - int rv; - int nsubs; - - if (argc != 5) { - die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> " - "<interval>"); - } - - memset(&pa, 0, sizeof(pa)); - pa.addr = argv[0]; - pa.msgsize = parse_int(argv[1], "message size"); - pa.count = parse_int(argv[2], "count"); - pa.intvl = parse_int(argv[4], "interval"); - nsubs = parse_int(argv[3], "#subscribers"); - - if (pa.msgsize < sizeof(uint64_t)) { - die("Message size too small."); - } - - thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1); - if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) || - ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) { - die("Startup: %s\n", nng_strerror(rv)); - }; - - if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) { - die("Cannot create pub thread: %s", nng_strerror(rv)); - } - - nng_msleep(100); // give time for listener to start... - - for (int i = 0; i < nsubs; i++) { - if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) != - 0) { - die("Cannot create sub thread: %s", nng_strerror(rv)); - } - } - - // Sleep a bit for conns to establish. - nng_msleep(2000); - nng_mtx_lock(pa.mtx); - pa.start = true; - nng_cv_wake(pa.cv); - nng_mtx_unlock(pa.mtx); - - for (int i = 0; i < nsubs + 1; i++) { - nng_thread_destroy(thrs[i]); - } - - nng_mtx_lock(pa.mtx); - - unsigned long long expect = nsubs * pa.count; - unsigned long long missing = nsubs ? expect - pa.recvs : 0; - double dur = (pa.end - pa.beg) / 1000.0; - - printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n", - pa.count, dur, pa.count / dur); - printf("Expected %llu messages total\n", expect); - printf("Received %llu messages total\n", pa.recvs); - printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur); - printf("Errors %llu total\n", pa.errs); - printf("Reported %llu dropped messages in %llu gaps\n", pa.drops, - pa.gaps); - printf("Dropped %llu messages (missing)\n", missing); - printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0); - - nng_mtx_unlock(pa.mtx); -} |
