From ed542ac45e00c9b2faa0b41f3c00de6e291e5678 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 1 Jan 2021 11:30:03 -0800 Subject: fixes #1345 Restructure the source tree This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project. --- perf/pubdrop.c | 325 --------------------------------------------------------- 1 file changed, 325 deletions(-) delete mode 100644 perf/pubdrop.c (limited to 'perf/pubdrop.c') diff --git a/perf/pubdrop.c b/perf/pubdrop.c deleted file mode 100644 index be51bb2f..00000000 --- a/perf/pubdrop.c +++ /dev/null @@ -1,325 +0,0 @@ -// -// Copyright 2019 Staysail Systems, Inc. -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include -#include -#include -#include -#include -#include - -#include -#include - -// pubdrop - this is a simple testing utility that lets us measure PUB/SUB -// performance, including dropped messages, delivery across multiple threads, -// etc. It actually uses a wild card subscription for now. - -#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0) -#include -#include - -#else - -static void die(const char *, ...); - -static int -nng_pub0_open(nng_socket *arg) -{ - (void) arg; - die("Pub protocol enabled in this build!"); - return (NNG_ENOTSUP); -} - -static int -nng_sub0_open(nng_socket *arg) -{ - (void) arg; - die("Sub protocol enabled in this build!"); - return (NNG_ENOTSUP); -} - -#endif // NNG_HAVE_PUB0.... - -static void die(const char *, ...); -static void do_pubdrop(int argc, char **argv); -static uint64_t nperusec; -static volatile int x; - -void -work(void) -{ - x = rand(); -} - -void -usdelay(unsigned long long nusec) -{ - nusec *= nperusec; - while (nusec > 0) { - work(); - nusec--; - } -} - -int -main(int argc, char **argv) -{ - argc--; - argv++; - - // We calculate a delay factor to roughly delay 1 usec. We don't - // need this to be perfect, just reproducible on the same host. - unsigned long cnt = 1000000; - - nng_time beg = nng_clock(); - for (unsigned long i = 0; i < cnt; i++) { - work(); - } - nng_time end = nng_clock(); - nperusec = cnt / (1000 * (end - beg)); - - do_pubdrop(argc, argv); -} - -static void -die(const char *fmt, ...) -{ - va_list ap; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - exit(2); -} - -static int -parse_int(const char *arg, const char *what) -{ - long val; - char *eptr; - - val = strtol(arg, &eptr, 10); - // Must be a postive number less than around a billion. - if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { - die("Invalid %s", what); - } - return ((int) val); -} - -struct pubdrop_args { - const char * addr; - bool start; - unsigned long long msgsize; - unsigned long long count; - unsigned long long intvl; - unsigned long long drops; - unsigned long long gaps; - unsigned long long errs; - unsigned long long recvs; - nng_time beg; - nng_time end; - nng_mtx * mtx; - nng_cv * cv; -}; - -static void -pub_server(void *arg) -{ - struct pubdrop_args *pa = arg; - nng_socket sock; - int rv; - nng_msg * msg; - nng_time start; - nng_time end; - - if ((rv = nng_pub0_open(&sock)) != 0) { - die("Cannot open sub: %s", nng_strerror(rv)); - } - if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) { - die("Cannot listen: %s", nng_strerror(rv)); - } - - nng_mtx_lock(pa->mtx); - while (!pa->start) { - nng_cv_wait(pa->cv); - } - nng_mtx_unlock(pa->mtx); - - start = nng_clock(); - for (uint64_t i = 0; i < pa->count; i++) { - // Unfortunately we need to allocate messages dynamically as we - // go. The other option would be to allocate them all up front, - // but that could be a rather excessive amount of memory. - if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) { - die("Message alloc failed"); - } - memcpy(nng_msg_body(msg), &i, sizeof(i)); - if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { - die("Sendmsg: %s", nng_strerror(rv)); - } - // It sure would be nice if we had a usec granularity option - // here. - if (pa->intvl > 0) { - usdelay((unsigned long long) pa->intvl); - } - } - - end = nng_clock(); - - nng_msleep(1000); // drain the queue - nng_close(sock); - - nng_mtx_lock(pa->mtx); - pa->beg = start; - pa->end = end; - nng_mtx_unlock(pa->mtx); -} - -static void -sub_client(void *arg) -{ - struct pubdrop_args *pa = arg; - nng_socket sock; - int rv; - nng_msg * msg; - unsigned long long recvs; - unsigned long long drops; - unsigned long long gaps; - unsigned long long errs; - unsigned long long expect; - - if ((rv = nng_sub0_open(&sock)) != 0) { - die("Cannot open sub: %s", nng_strerror(rv)); - } - if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) { - die("Cannot listen: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { - die("setopt: %s", nng_strerror(rv)); - } - - expect = 0; - recvs = drops = gaps = errs = 0; - - while (expect < pa->count) { - uint64_t got; - if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { - if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) { - // Closed without receiving the last message - drops += (pa->count - expect); - gaps++; - break; - } - printf("ERROR: %s\n", nng_strerror(rv)); - errs++; - break; - } - recvs++; - memcpy(&got, nng_msg_body(msg), sizeof(got)); - nng_msg_free(msg); - if (got != expect) { - gaps++; - if (got > expect) { - drops += (got - expect); - } else { - die("Misordered delivery"); - } - } - expect = got + 1; - } - - nng_mtx_lock(pa->mtx); - pa->drops += drops; - pa->errs += errs; - pa->recvs += recvs; - pa->gaps += gaps; - nng_mtx_unlock(pa->mtx); -} - -static void -do_pubdrop(int argc, char **argv) -{ - nng_thread ** thrs; - struct pubdrop_args pa; - int rv; - int nsubs; - - if (argc != 5) { - die("Usage: pubdrop " - ""); - } - - memset(&pa, 0, sizeof(pa)); - pa.addr = argv[0]; - pa.msgsize = parse_int(argv[1], "message size"); - pa.count = parse_int(argv[2], "count"); - pa.intvl = parse_int(argv[4], "interval"); - nsubs = parse_int(argv[3], "#subscribers"); - - if (pa.msgsize < sizeof(uint64_t)) { - die("Message size too small."); - } - - thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1); - if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) || - ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) { - die("Startup: %s\n", nng_strerror(rv)); - }; - - if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) { - die("Cannot create pub thread: %s", nng_strerror(rv)); - } - - nng_msleep(100); // give time for listener to start... - - for (int i = 0; i < nsubs; i++) { - if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) != - 0) { - die("Cannot create sub thread: %s", nng_strerror(rv)); - } - } - - // Sleep a bit for conns to establish. - nng_msleep(2000); - nng_mtx_lock(pa.mtx); - pa.start = true; - nng_cv_wake(pa.cv); - nng_mtx_unlock(pa.mtx); - - for (int i = 0; i < nsubs + 1; i++) { - nng_thread_destroy(thrs[i]); - } - - nng_mtx_lock(pa.mtx); - - unsigned long long expect = nsubs * pa.count; - unsigned long long missing = nsubs ? expect - pa.recvs : 0; - double dur = (pa.end - pa.beg) / 1000.0; - - printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n", - pa.count, dur, pa.count / dur); - printf("Expected %llu messages total\n", expect); - printf("Received %llu messages total\n", pa.recvs); - printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur); - printf("Errors %llu total\n", pa.errs); - printf("Reported %llu dropped messages in %llu gaps\n", pa.drops, - pa.gaps); - printf("Dropped %llu messages (missing)\n", missing); - printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0); - - nng_mtx_unlock(pa.mtx); -} -- cgit v1.2.3-70-g09d2