diff options
Diffstat (limited to 'src/tools/perf/pubdrop.c')
| -rw-r--r-- | src/tools/perf/pubdrop.c | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c new file mode 100644 index 00000000..be51bb2f --- /dev/null +++ b/src/tools/perf/pubdrop.c @@ -0,0 +1,325 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/platform.h> + +// pubdrop - this is a simple testing utility that lets us measure PUB/SUB +// performance, including dropped messages, delivery across multiple threads, +// etc. It actually uses a wild card subscription for now. + +#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0) +#include <nng/protocol/pubsub0/pub.h> +#include <nng/protocol/pubsub0/sub.h> + +#else + +static void die(const char *, ...); + +static int +nng_pub0_open(nng_socket *arg) +{ + (void) arg; + die("Pub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +static int +nng_sub0_open(nng_socket *arg) +{ + (void) arg; + die("Sub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +#endif // NNG_HAVE_PUB0.... + +static void die(const char *, ...); +static void do_pubdrop(int argc, char **argv); +static uint64_t nperusec; +static volatile int x; + +void +work(void) +{ + x = rand(); +} + +void +usdelay(unsigned long long nusec) +{ + nusec *= nperusec; + while (nusec > 0) { + work(); + nusec--; + } +} + +int +main(int argc, char **argv) +{ + argc--; + argv++; + + // We calculate a delay factor to roughly delay 1 usec. We don't + // need this to be perfect, just reproducible on the same host. + unsigned long cnt = 1000000; + + nng_time beg = nng_clock(); + for (unsigned long i = 0; i < cnt; i++) { + work(); + } + nng_time end = nng_clock(); + nperusec = cnt / (1000 * (end - beg)); + + do_pubdrop(argc, argv); +} + +static void +die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(2); +} + +static int +parse_int(const char *arg, const char *what) +{ + long val; + char *eptr; + + val = strtol(arg, &eptr, 10); + // Must be a postive number less than around a billion. + if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { + die("Invalid %s", what); + } + return ((int) val); +} + +struct pubdrop_args { + const char * addr; + bool start; + unsigned long long msgsize; + unsigned long long count; + unsigned long long intvl; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long recvs; + nng_time beg; + nng_time end; + nng_mtx * mtx; + nng_cv * cv; +}; + +static void +pub_server(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + + if ((rv = nng_pub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + + nng_mtx_lock(pa->mtx); + while (!pa->start) { + nng_cv_wait(pa->cv); + } + nng_mtx_unlock(pa->mtx); + + start = nng_clock(); + for (uint64_t i = 0; i < pa->count; i++) { + // Unfortunately we need to allocate messages dynamically as we + // go. The other option would be to allocate them all up front, + // but that could be a rather excessive amount of memory. + if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) { + die("Message alloc failed"); + } + memcpy(nng_msg_body(msg), &i, sizeof(i)); + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + die("Sendmsg: %s", nng_strerror(rv)); + } + // It sure would be nice if we had a usec granularity option + // here. + if (pa->intvl > 0) { + usdelay((unsigned long long) pa->intvl); + } + } + + end = nng_clock(); + + nng_msleep(1000); // drain the queue + nng_close(sock); + + nng_mtx_lock(pa->mtx); + pa->beg = start; + pa->end = end; + nng_mtx_unlock(pa->mtx); +} + +static void +sub_client(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + unsigned long long recvs; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long expect; + + if ((rv = nng_sub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + + expect = 0; + recvs = drops = gaps = errs = 0; + + while (expect < pa->count) { + uint64_t got; + if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) { + // Closed without receiving the last message + drops += (pa->count - expect); + gaps++; + break; + } + printf("ERROR: %s\n", nng_strerror(rv)); + errs++; + break; + } + recvs++; + memcpy(&got, nng_msg_body(msg), sizeof(got)); + nng_msg_free(msg); + if (got != expect) { + gaps++; + if (got > expect) { + drops += (got - expect); + } else { + die("Misordered delivery"); + } + } + expect = got + 1; + } + + nng_mtx_lock(pa->mtx); + pa->drops += drops; + pa->errs += errs; + pa->recvs += recvs; + pa->gaps += gaps; + nng_mtx_unlock(pa->mtx); +} + +static void +do_pubdrop(int argc, char **argv) +{ + nng_thread ** thrs; + struct pubdrop_args pa; + int rv; + int nsubs; + + if (argc != 5) { + die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> " + "<interval>"); + } + + memset(&pa, 0, sizeof(pa)); + pa.addr = argv[0]; + pa.msgsize = parse_int(argv[1], "message size"); + pa.count = parse_int(argv[2], "count"); + pa.intvl = parse_int(argv[4], "interval"); + nsubs = parse_int(argv[3], "#subscribers"); + + if (pa.msgsize < sizeof(uint64_t)) { + die("Message size too small."); + } + + thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1); + if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) || + ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) { + die("Startup: %s\n", nng_strerror(rv)); + }; + + if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) { + die("Cannot create pub thread: %s", nng_strerror(rv)); + } + + nng_msleep(100); // give time for listener to start... + + for (int i = 0; i < nsubs; i++) { + if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) != + 0) { + die("Cannot create sub thread: %s", nng_strerror(rv)); + } + } + + // Sleep a bit for conns to establish. + nng_msleep(2000); + nng_mtx_lock(pa.mtx); + pa.start = true; + nng_cv_wake(pa.cv); + nng_mtx_unlock(pa.mtx); + + for (int i = 0; i < nsubs + 1; i++) { + nng_thread_destroy(thrs[i]); + } + + nng_mtx_lock(pa.mtx); + + unsigned long long expect = nsubs * pa.count; + unsigned long long missing = nsubs ? expect - pa.recvs : 0; + double dur = (pa.end - pa.beg) / 1000.0; + + printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n", + pa.count, dur, pa.count / dur); + printf("Expected %llu messages total\n", expect); + printf("Received %llu messages total\n", pa.recvs); + printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur); + printf("Errors %llu total\n", pa.errs); + printf("Reported %llu dropped messages in %llu gaps\n", pa.drops, + pa.gaps); + printf("Dropped %llu messages (missing)\n", missing); + printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0); + + nng_mtx_unlock(pa.mtx); +} |
