aboutsummaryrefslogtreecommitdiff
path: root/perf/pubdrop.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-01 11:30:03 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-01 12:46:17 -0800
commited542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch)
tree673924ff077d468e6756529c2c204698d3faa47c /perf/pubdrop.c
parent1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff)
downloadnng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'perf/pubdrop.c')
-rw-r--r--perf/pubdrop.c325
1 files changed, 0 insertions, 325 deletions
diff --git a/perf/pubdrop.c b/perf/pubdrop.c
deleted file mode 100644
index be51bb2f..00000000
--- a/perf/pubdrop.c
+++ /dev/null
@@ -1,325 +0,0 @@
-//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <ctype.h>
-#include <stdarg.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/util/platform.h>
-
-// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
-// performance, including dropped messages, delivery across multiple threads,
-// etc. It actually uses a wild card subscription for now.
-
-#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-
-#else
-
-static void die(const char *, ...);
-
-static int
-nng_pub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Pub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-static int
-nng_sub0_open(nng_socket *arg)
-{
- (void) arg;
- die("Sub protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-
-#endif // NNG_HAVE_PUB0....
-
-static void die(const char *, ...);
-static void do_pubdrop(int argc, char **argv);
-static uint64_t nperusec;
-static volatile int x;
-
-void
-work(void)
-{
- x = rand();
-}
-
-void
-usdelay(unsigned long long nusec)
-{
- nusec *= nperusec;
- while (nusec > 0) {
- work();
- nusec--;
- }
-}
-
-int
-main(int argc, char **argv)
-{
- argc--;
- argv++;
-
- // We calculate a delay factor to roughly delay 1 usec. We don't
- // need this to be perfect, just reproducible on the same host.
- unsigned long cnt = 1000000;
-
- nng_time beg = nng_clock();
- for (unsigned long i = 0; i < cnt; i++) {
- work();
- }
- nng_time end = nng_clock();
- nperusec = cnt / (1000 * (end - beg));
-
- do_pubdrop(argc, argv);
-}
-
-static void
-die(const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(2);
-}
-
-static int
-parse_int(const char *arg, const char *what)
-{
- long val;
- char *eptr;
-
- val = strtol(arg, &eptr, 10);
- // Must be a postive number less than around a billion.
- if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
- die("Invalid %s", what);
- }
- return ((int) val);
-}
-
-struct pubdrop_args {
- const char * addr;
- bool start;
- unsigned long long msgsize;
- unsigned long long count;
- unsigned long long intvl;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long recvs;
- nng_time beg;
- nng_time end;
- nng_mtx * mtx;
- nng_cv * cv;
-};
-
-static void
-pub_server(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- nng_time start;
- nng_time end;
-
- if ((rv = nng_pub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
-
- nng_mtx_lock(pa->mtx);
- while (!pa->start) {
- nng_cv_wait(pa->cv);
- }
- nng_mtx_unlock(pa->mtx);
-
- start = nng_clock();
- for (uint64_t i = 0; i < pa->count; i++) {
- // Unfortunately we need to allocate messages dynamically as we
- // go. The other option would be to allocate them all up front,
- // but that could be a rather excessive amount of memory.
- if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
- die("Message alloc failed");
- }
- memcpy(nng_msg_body(msg), &i, sizeof(i));
- if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
- die("Sendmsg: %s", nng_strerror(rv));
- }
- // It sure would be nice if we had a usec granularity option
- // here.
- if (pa->intvl > 0) {
- usdelay((unsigned long long) pa->intvl);
- }
- }
-
- end = nng_clock();
-
- nng_msleep(1000); // drain the queue
- nng_close(sock);
-
- nng_mtx_lock(pa->mtx);
- pa->beg = start;
- pa->end = end;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-sub_client(void *arg)
-{
- struct pubdrop_args *pa = arg;
- nng_socket sock;
- int rv;
- nng_msg * msg;
- unsigned long long recvs;
- unsigned long long drops;
- unsigned long long gaps;
- unsigned long long errs;
- unsigned long long expect;
-
- if ((rv = nng_sub0_open(&sock)) != 0) {
- die("Cannot open sub: %s", nng_strerror(rv));
- }
- if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
- die("Cannot listen: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
- if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
- die("setopt: %s", nng_strerror(rv));
- }
-
- expect = 0;
- recvs = drops = gaps = errs = 0;
-
- while (expect < pa->count) {
- uint64_t got;
- if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
- if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
- // Closed without receiving the last message
- drops += (pa->count - expect);
- gaps++;
- break;
- }
- printf("ERROR: %s\n", nng_strerror(rv));
- errs++;
- break;
- }
- recvs++;
- memcpy(&got, nng_msg_body(msg), sizeof(got));
- nng_msg_free(msg);
- if (got != expect) {
- gaps++;
- if (got > expect) {
- drops += (got - expect);
- } else {
- die("Misordered delivery");
- }
- }
- expect = got + 1;
- }
-
- nng_mtx_lock(pa->mtx);
- pa->drops += drops;
- pa->errs += errs;
- pa->recvs += recvs;
- pa->gaps += gaps;
- nng_mtx_unlock(pa->mtx);
-}
-
-static void
-do_pubdrop(int argc, char **argv)
-{
- nng_thread ** thrs;
- struct pubdrop_args pa;
- int rv;
- int nsubs;
-
- if (argc != 5) {
- die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
- "<interval>");
- }
-
- memset(&pa, 0, sizeof(pa));
- pa.addr = argv[0];
- pa.msgsize = parse_int(argv[1], "message size");
- pa.count = parse_int(argv[2], "count");
- pa.intvl = parse_int(argv[4], "interval");
- nsubs = parse_int(argv[3], "#subscribers");
-
- if (pa.msgsize < sizeof(uint64_t)) {
- die("Message size too small.");
- }
-
- thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
- if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
- ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
- die("Startup: %s\n", nng_strerror(rv));
- };
-
- if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
- die("Cannot create pub thread: %s", nng_strerror(rv));
- }
-
- nng_msleep(100); // give time for listener to start...
-
- for (int i = 0; i < nsubs; i++) {
- if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
- 0) {
- die("Cannot create sub thread: %s", nng_strerror(rv));
- }
- }
-
- // Sleep a bit for conns to establish.
- nng_msleep(2000);
- nng_mtx_lock(pa.mtx);
- pa.start = true;
- nng_cv_wake(pa.cv);
- nng_mtx_unlock(pa.mtx);
-
- for (int i = 0; i < nsubs + 1; i++) {
- nng_thread_destroy(thrs[i]);
- }
-
- nng_mtx_lock(pa.mtx);
-
- unsigned long long expect = nsubs * pa.count;
- unsigned long long missing = nsubs ? expect - pa.recvs : 0;
- double dur = (pa.end - pa.beg) / 1000.0;
-
- printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
- pa.count, dur, pa.count / dur);
- printf("Expected %llu messages total\n", expect);
- printf("Received %llu messages total\n", pa.recvs);
- printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
- printf("Errors %llu total\n", pa.errs);
- printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
- pa.gaps);
- printf("Dropped %llu messages (missing)\n", missing);
- printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
-
- nng_mtx_unlock(pa.mtx);
-}