aboutsummaryrefslogtreecommitdiff
path: root/perf
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-02-24 22:04:16 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-26 21:09:54 -0800
commit5803db08e55ed9287dc59b3adc281b89c52c530f (patch)
tree9d2d65ed86be5c7b976fc3bdfc5ed5b375143641 /perf
parent9cf967e9d7fdab6ccf38f80d83e4bf3d1a5e1a67 (diff)
downloadnng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.gz
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.bz2
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.zip
fixes #461 Context support for SUB
fixes #762 Pub/Sub very slow compared with nanomsg This introduces contexts for SUB, and converts both the cooked SUB and PUB protocols to use a new lightweight message queue that has significant performance benefits over the heavy-weight message queue. We've also added a test program, pubdrop, in the perf directory, which can be used for measuring pub/sub message rates and drop rates. Note that its quite easy to overwhelm a subscriber still. The SUB socket performance is still not completely where it needs to be. There are two remainging things to improve. Firsst we need to replace the naive linked list of topics with a proper PATRICIA trie. Second, we need to work on the low level POSIX poller code. (The Windows code is already quite good, and we outperform nanomsg on Windows.)
Diffstat (limited to 'perf')
-rw-r--r--perf/CMakeLists.txt5
-rw-r--r--perf/pubdrop.c325
2 files changed, 330 insertions, 0 deletions
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt
index 64bc3b34..ff340034 100644
--- a/perf/CMakeLists.txt
+++ b/perf/CMakeLists.txt
@@ -49,4 +49,9 @@ if (NNG_TESTS)
add_test (NAME inproc_thr COMMAND inproc_thr 1400 10000)
set_tests_properties (inproc_thr PROPERTIES TIMEOUT 30)
+
+ add_executable (pubdrop pubdrop.c)
+ target_link_libraries(pubdrop ${PROJECT_NAME})
+ target_compile_definitions(pubdrop PUBLIC)
+
endif ()
diff --git a/perf/pubdrop.c b/perf/pubdrop.c
new file mode 100644
index 00000000..be51bb2f
--- /dev/null
+++ b/perf/pubdrop.c
@@ -0,0 +1,325 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/platform.h>
+
+// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
+// performance, including dropped messages, delivery across multiple threads,
+// etc. It actually uses a wild card subscription for now.
+
+#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+
+#else
+
+static void die(const char *, ...);
+
+static int
+nng_pub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Pub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+static int
+nng_sub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Sub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+#endif // NNG_HAVE_PUB0....
+
+static void die(const char *, ...);
+static void do_pubdrop(int argc, char **argv);
+static uint64_t nperusec;
+static volatile int x;
+
+void
+work(void)
+{
+ x = rand();
+}
+
+void
+usdelay(unsigned long long nusec)
+{
+ nusec *= nperusec;
+ while (nusec > 0) {
+ work();
+ nusec--;
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ argc--;
+ argv++;
+
+ // We calculate a delay factor to roughly delay 1 usec. We don't
+ // need this to be perfect, just reproducible on the same host.
+ unsigned long cnt = 1000000;
+
+ nng_time beg = nng_clock();
+ for (unsigned long i = 0; i < cnt; i++) {
+ work();
+ }
+ nng_time end = nng_clock();
+ nperusec = cnt / (1000 * (end - beg));
+
+ do_pubdrop(argc, argv);
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a postive number less than around a billion.
+ if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+struct pubdrop_args {
+ const char * addr;
+ bool start;
+ unsigned long long msgsize;
+ unsigned long long count;
+ unsigned long long intvl;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long recvs;
+ nng_time beg;
+ nng_time end;
+ nng_mtx * mtx;
+ nng_cv * cv;
+};
+
+static void
+pub_server(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+
+ if ((rv = nng_pub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+
+ nng_mtx_lock(pa->mtx);
+ while (!pa->start) {
+ nng_cv_wait(pa->cv);
+ }
+ nng_mtx_unlock(pa->mtx);
+
+ start = nng_clock();
+ for (uint64_t i = 0; i < pa->count; i++) {
+ // Unfortunately we need to allocate messages dynamically as we
+ // go. The other option would be to allocate them all up front,
+ // but that could be a rather excessive amount of memory.
+ if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
+ die("Message alloc failed");
+ }
+ memcpy(nng_msg_body(msg), &i, sizeof(i));
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ die("Sendmsg: %s", nng_strerror(rv));
+ }
+ // It sure would be nice if we had a usec granularity option
+ // here.
+ if (pa->intvl > 0) {
+ usdelay((unsigned long long) pa->intvl);
+ }
+ }
+
+ end = nng_clock();
+
+ nng_msleep(1000); // drain the queue
+ nng_close(sock);
+
+ nng_mtx_lock(pa->mtx);
+ pa->beg = start;
+ pa->end = end;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+sub_client(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ unsigned long long recvs;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long expect;
+
+ if ((rv = nng_sub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+
+ expect = 0;
+ recvs = drops = gaps = errs = 0;
+
+ while (expect < pa->count) {
+ uint64_t got;
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
+ // Closed without receiving the last message
+ drops += (pa->count - expect);
+ gaps++;
+ break;
+ }
+ printf("ERROR: %s\n", nng_strerror(rv));
+ errs++;
+ break;
+ }
+ recvs++;
+ memcpy(&got, nng_msg_body(msg), sizeof(got));
+ nng_msg_free(msg);
+ if (got != expect) {
+ gaps++;
+ if (got > expect) {
+ drops += (got - expect);
+ } else {
+ die("Misordered delivery");
+ }
+ }
+ expect = got + 1;
+ }
+
+ nng_mtx_lock(pa->mtx);
+ pa->drops += drops;
+ pa->errs += errs;
+ pa->recvs += recvs;
+ pa->gaps += gaps;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+do_pubdrop(int argc, char **argv)
+{
+ nng_thread ** thrs;
+ struct pubdrop_args pa;
+ int rv;
+ int nsubs;
+
+ if (argc != 5) {
+ die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
+ "<interval>");
+ }
+
+ memset(&pa, 0, sizeof(pa));
+ pa.addr = argv[0];
+ pa.msgsize = parse_int(argv[1], "message size");
+ pa.count = parse_int(argv[2], "count");
+ pa.intvl = parse_int(argv[4], "interval");
+ nsubs = parse_int(argv[3], "#subscribers");
+
+ if (pa.msgsize < sizeof(uint64_t)) {
+ die("Message size too small.");
+ }
+
+ thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
+ if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
+ ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
+ die("Startup: %s\n", nng_strerror(rv));
+ };
+
+ if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
+ die("Cannot create pub thread: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100); // give time for listener to start...
+
+ for (int i = 0; i < nsubs; i++) {
+ if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
+ 0) {
+ die("Cannot create sub thread: %s", nng_strerror(rv));
+ }
+ }
+
+ // Sleep a bit for conns to establish.
+ nng_msleep(2000);
+ nng_mtx_lock(pa.mtx);
+ pa.start = true;
+ nng_cv_wake(pa.cv);
+ nng_mtx_unlock(pa.mtx);
+
+ for (int i = 0; i < nsubs + 1; i++) {
+ nng_thread_destroy(thrs[i]);
+ }
+
+ nng_mtx_lock(pa.mtx);
+
+ unsigned long long expect = nsubs * pa.count;
+ unsigned long long missing = nsubs ? expect - pa.recvs : 0;
+ double dur = (pa.end - pa.beg) / 1000.0;
+
+ printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
+ pa.count, dur, pa.count / dur);
+ printf("Expected %llu messages total\n", expect);
+ printf("Received %llu messages total\n", pa.recvs);
+ printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
+ printf("Errors %llu total\n", pa.errs);
+ printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
+ pa.gaps);
+ printf("Dropped %llu messages (missing)\n", missing);
+ printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
+
+ nng_mtx_unlock(pa.mtx);
+}