aboutsummaryrefslogtreecommitdiff
path: root/src/tools
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-01 11:30:03 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-01 12:46:17 -0800
commited542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch)
tree673924ff077d468e6756529c2c204698d3faa47c /src/tools
parent1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff)
downloadnng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2
nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/tools')
-rw-r--r--src/tools/CMakeLists.txt13
-rw-r--r--src/tools/nngcat/CMakeLists.txt41
-rw-r--r--src/tools/nngcat/nngcat.c1217
-rwxr-xr-xsrc/tools/nngcat/nngcat_ambiguous_test.sh31
-rwxr-xr-xsrc/tools/nngcat/nngcat_async_test.sh32
-rwxr-xr-xsrc/tools/nngcat/nngcat_dup_proto_test.sh23
-rwxr-xr-xsrc/tools/nngcat/nngcat_help_test.sh32
-rwxr-xr-xsrc/tools/nngcat/nngcat_incompat_test.sh73
-rwxr-xr-xsrc/tools/nngcat/nngcat_need_proto_test.sh23
-rwxr-xr-xsrc/tools/nngcat/nngcat_pubsub_test.sh45
-rwxr-xr-xsrc/tools/nngcat/nngcat_recvmaxsz_test.sh46
-rwxr-xr-xsrc/tools/nngcat/nngcat_stdin_pipe_test.sh44
-rwxr-xr-xsrc/tools/nngcat/nngcat_unlimited_test.sh46
-rw-r--r--src/tools/perf/CMakeLists.txt35
-rw-r--r--src/tools/perf/perf.c662
-rw-r--r--src/tools/perf/pubdrop.c325
16 files changed, 2688 insertions, 0 deletions
diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
new file mode 100644
index 00000000..0487d9ec
--- /dev/null
+++ b/src/tools/CMakeLists.txt
@@ -0,0 +1,13 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staystail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+nng_directory(tools)
+
+add_subdirectory(nngcat)
+add_subdirectory(perf)
diff --git a/src/tools/nngcat/CMakeLists.txt b/src/tools/nngcat/CMakeLists.txt
new file mode 100644
index 00000000..d522a045
--- /dev/null
+++ b/src/tools/nngcat/CMakeLists.txt
@@ -0,0 +1,41 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+nng_directory(nngcat)
+
+if (NNG_ENABLE_NNGCAT)
+ add_executable(nngcat nngcat.c)
+ target_include_directories(nngcat PUBLIC ${PROJECT_SOURCE_DIR}/src)
+ target_link_libraries(nngcat nng nng_private)
+ install(TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
+ COMPONENT Tools)
+
+ if (NNG_TESTS AND CMAKE_SYSTEM_NAME MATCHES "Linux")
+ include(FindUnixCommands)
+ endif ()
+ # TODO: This should be refactored to use a test driver.
+ # We only run the tests on Linux for now, because the Darwin CI/CD is too brittle.
+ if (NNG_TESTS AND BASH AND CMAKE_SYSTEM_NAME MATCHES "Linux")
+ macro(add_nngcat_test NAME TIMEOUT)
+ add_test(NAME nng.${NAME} COMMAND ${BASH} ${CMAKE_CURRENT_SOURCE_DIR}/${NAME}_test.sh $<TARGET_FILE:nngcat>)
+ set_tests_properties(nng.${NAME} PROPERTIES TIMEOUT ${TIMEOUT})
+ endmacro()
+ add_nngcat_test(nngcat_async 10)
+ add_nngcat_test(nngcat_ambiguous 10)
+ add_nngcat_test(nngcat_need_proto 10)
+ add_nngcat_test(nngcat_dup_proto 10)
+ add_nngcat_test(nngcat_help 10)
+ add_nngcat_test(nngcat_incompat 10)
+ add_nngcat_test(nngcat_pubsub 20)
+ add_nngcat_test(nngcat_recvmaxsz 20)
+ add_nngcat_test(nngcat_unlimited 20)
+ add_nngcat_test(nngcat_stdin_pipe 20)
+ endif ()
+endif ()
diff --git a/src/tools/nngcat/nngcat.c b/src/tools/nngcat/nngcat.c
new file mode 100644
index 00000000..9e106069
--- /dev/null
+++ b/src/tools/nngcat/nngcat.c
@@ -0,0 +1,1217 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2020 Lager Data, Inc. <support@lagerdata.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/bus0/bus.h>
+#include <nng/protocol/pair0/pair.h>
+#include <nng/protocol/pair1/pair.h>
+#include <nng/protocol/pipeline0/pull.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
+#include <nng/supplemental/tls/tls.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+#include <nng/transport/zerotier/zerotier.h>
+
+// Globals. We need this to avoid passing around everything.
+int format = 0;
+int proto = 0;
+int verbose = 0;
+int delay = 0;
+nng_duration interval = NNG_DURATION_INFINITE;
+nng_duration sendtimeo = NNG_DURATION_INFINITE;
+nng_duration recvtimeo = NNG_DURATION_INFINITE;
+void * data = NULL;
+size_t datalen = 0;
+int compat = 0;
+int async = 0;
+int insecure = 0;
+void * cacert = NULL;
+size_t cacertlen = 0;
+void * keyfile = NULL;
+size_t keylen = 0;
+void * certfile = NULL;
+size_t certlen = 0;
+const char * zthome = NULL;
+int count = 0;
+int recvmaxsz = -1;
+
+// Options, must start at 1 because zero is sentinel.
+enum options {
+ OPT_HELP = 1,
+ OPT_VERBOSE,
+ OPT_SILENT,
+ OPT_REP0,
+ OPT_REQ0,
+ OPT_PUSH0,
+ OPT_PULL0,
+ OPT_PUB0,
+ OPT_SUB0,
+ OPT_SURVEY0,
+ OPT_RESPOND0,
+ OPT_PAIR0,
+ OPT_PAIR1,
+ OPT_PAIR,
+ OPT_BUS0,
+ OPT_DIAL,
+ OPT_LISTEN,
+ OPT_DATA,
+ OPT_FILE,
+ OPT_SUBSCRIBE,
+ OPT_INTERVAL,
+ OPT_DELAY,
+ OPT_COUNT,
+ OPT_FORMAT,
+ OPT_RAW,
+ OPT_ASCII,
+ OPT_QUOTED,
+ OPT_MSGPACK,
+ OPT_HEX,
+ OPT_BLANK, // no printing, not an actual option.
+ OPT_COMPAT,
+ OPT_ASYNC,
+ OPT_RCV_TIMEO,
+ OPT_SND_TIMEO,
+ OPT_SOCK_NAME,
+ OPT_LISTEN_IPC,
+ OPT_DIAL_IPC,
+ OPT_LISTEN_LOCAL,
+ OPT_DIAL_LOCAL,
+ OPT_INSECURE,
+ OPT_CACERT,
+ OPT_KEYFILE,
+ OPT_CERTFILE,
+ OPT_VERSION,
+ OPT_RECVMAXSZ,
+ OPT_ZTHOME,
+};
+
+static nng_optspec opts[] = {
+ { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP },
+ { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE },
+ { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT },
+ { .o_name = "req0", .o_val = OPT_REQ0 },
+ { .o_name = "rep0", .o_val = OPT_REP0 },
+ { .o_name = "push0", .o_val = OPT_PUSH0 },
+ { .o_name = "pull0", .o_val = OPT_PULL0 },
+ { .o_name = "pub0", .o_val = OPT_PUB0 },
+ { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias
+ { .o_name = "sub0", .o_val = OPT_SUB0 },
+ { .o_name = "surveyor0", .o_val = OPT_SURVEY0 },
+ { .o_name = "respondent0", .o_val = OPT_RESPOND0 },
+ { .o_name = "pair", .o_val = OPT_PAIR },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true },
+ { .o_name = "listen", .o_val = OPT_LISTEN, .o_arg = true },
+ { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true },
+ { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true },
+ { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true },
+ { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true },
+ { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII },
+ { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED },
+ { .o_name = "raw", .o_val = OPT_RAW },
+ { .o_name = "hex", .o_val = OPT_HEX },
+ { .o_name = "compat", .o_val = OPT_COMPAT },
+ { .o_name = "async", .o_val = OPT_ASYNC },
+ { .o_name = "msgpack", .o_val = OPT_MSGPACK },
+
+ {
+ .o_name = "recv-maxsz",
+ .o_short = 'Z',
+ .o_val = OPT_RECVMAXSZ,
+ .o_arg = true,
+ },
+ {
+ .o_name = "count",
+ .o_short = 'C',
+ .o_val = OPT_COUNT,
+ .o_arg = true,
+ },
+ {
+ .o_name = "delay",
+ .o_short = 'd',
+ .o_val = OPT_DELAY,
+ .o_arg = true,
+ },
+ {
+ .o_name = "interval",
+ .o_short = 'i',
+ .o_val = OPT_INTERVAL,
+ .o_arg = true,
+ },
+ { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true },
+ { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true },
+ { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true },
+
+ // Legacy compatibility with nanocat.
+ { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true },
+ { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true },
+ {
+ .o_name = "bind-ipc",
+ .o_short = 'X',
+ .o_val = OPT_LISTEN_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-ipc",
+ .o_short = 'x',
+ .o_val = OPT_DIAL_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "bind-local",
+ .o_short = 'L',
+ .o_val = OPT_LISTEN_LOCAL,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-local",
+ .o_short = 'l',
+ .o_val = OPT_DIAL_LOCAL,
+ .o_arg = true,
+ },
+ { .o_name = "insecure", .o_short = 'k', .o_val = OPT_INSECURE },
+ { .o_name = "cacert", .o_val = OPT_CACERT, .o_arg = true },
+ { .o_name = "key", .o_val = OPT_KEYFILE, .o_arg = true },
+ {
+ .o_name = "cert",
+ .o_short = 'E',
+ .o_val = OPT_CERTFILE,
+ .o_arg = true,
+ },
+ {
+ .o_name = "zt-home",
+ .o_val = OPT_ZTHOME,
+ .o_arg = true,
+ },
+ { .o_name = "version", .o_short = 'V', .o_val = OPT_VERSION },
+
+ // Sentinel.
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void
+fatal(const char *msg, ...)
+{
+ va_list ap;
+ va_start(ap, msg);
+ vfprintf(stderr, msg, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+static void
+help(void)
+{
+ printf(
+ "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n");
+ printf("<proto> must be one of:\n");
+ printf(" --req0 (or alias --req)\n");
+ printf(" --rep0 (or alias --rep)\n");
+ printf(" --pub0 (or alias --pub)\n");
+ printf(" --sub0 (or alias --sub)\n");
+ printf(" --push0 (or alias --push)\n");
+ printf(" --pull0 (or alias --pull)\n");
+ printf(" --surveyor0 (or alias --surveyor)\n");
+ printf(" --respondent0 (or alias --respondent)\n");
+ printf(" --pair0\n");
+ printf(" --pair1\n");
+ printf(" --pair (alias for either pair0 or pair1)\n");
+ printf("\n<addr> must be one or more of:\n");
+ printf(" --dial <url> (or alias --connect <url>)\n");
+ printf(" --listen <url> (or alias --bind <url>)\n");
+ printf(" --bind-local <port> (or alias -L <port>)\n");
+ printf(" --connect-local <port> (or alias -l <port>)\n");
+ printf(" --bind-ipc <path> (or alias -X <path>)\n");
+ printf(" --connect-ipc <path> (or alias -x <path>)\n");
+ printf("\n<fmt> may be one or none of:\n");
+ printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n");
+ printf(" --ascii (or alias -A)\n");
+ printf(" --quoted (or alias -Q)\n");
+ printf(" --hex\n");
+ printf(" --msgpack\n");
+ printf(" --raw\n");
+ printf("\n<opts> may be any of:\n");
+ printf(" --subscribe <topic> (only with --sub protocol)\n");
+ printf(" --silent (or alias -q)\n");
+ printf(" --verbose (or alias -v)\n");
+ printf(" --count <num> (or alias -C <num>)\n");
+ printf(" --delay <secs> (or alias -d <secs>)\n");
+ printf(" --interval <secs> (or alias -i <secs>)\n");
+ printf(" --recv-timeout <secs>\n");
+ printf(" --send-timeout <secs>\n");
+ printf(" --recv-maxsz <size> (or alias -Z <size>)\n");
+ printf(" --compat\n");
+ printf(" --async\n");
+ printf(" --insecure (or alias -k)\n");
+ printf(" --cacert <file>\n");
+ printf(" --cert <file> (or alias -E <file>)\n");
+ printf(" --key <file>\n");
+ printf(" --zt-home <path>\n");
+ printf("\n<src> may be one of:\n");
+ printf(" --file <file> (or alias -F <file>). "
+ "Use - for standard input.\n");
+ printf(" --data <data> (or alias -D <data>)\n");
+ exit(1);
+}
+
+static int
+intarg(const char *val, int maxv)
+{
+ int v = 0;
+
+ if (val[0] == '\0') {
+ fatal("Empty integer argument.");
+ }
+ while (*val != '\0') {
+ if (!isdigit(*val)) {
+ fatal("Integer argument expected.");
+ }
+ v *= 10;
+ v += ((*val) - '0');
+ val++;
+ if (v > maxv) {
+ fatal("Integer argument too large.");
+ }
+ }
+ if (v < 0) {
+ fatal("Integer argument overflow.");
+ }
+ return (v);
+}
+
+// This reads a file into memory. Care is taken to ensure that
+// the buffer is one byte larger and contains a terminating
+// NUL. (Useful for key files and such.)
+static void
+loadfile(const char *path, void **datap, size_t *lenp)
+{
+ FILE * f;
+ size_t total_read = 0;
+ size_t allocation_size = BUFSIZ;
+ char * fdata;
+ char * realloc_result;
+
+ if (strcmp(path, "-") == 0) {
+ f = stdin;
+ } else {
+ if ((f = fopen(path, "rb")) == NULL) {
+ fatal(
+ "Cannot open file %s: %s", path, strerror(errno));
+ }
+ }
+
+ if ((fdata = malloc(allocation_size + 1)) == NULL) {
+ fatal("Out of memory.");
+ }
+
+ while (1) {
+ total_read += fread(
+ fdata + total_read, 1, allocation_size - total_read, f);
+ if (ferror(f)) {
+ if (errno == EINTR) {
+ continue;
+ }
+ fatal(
+ "Read from %s failed: %s", path, strerror(errno));
+ }
+ if (feof(f)) {
+ break;
+ }
+ if (total_read == allocation_size) {
+ if (allocation_size > SIZE_MAX / 2) {
+ fatal("Out of memory.");
+ }
+ allocation_size *= 2;
+ if ((realloc_result = realloc(
+ fdata, allocation_size + 1)) == NULL) {
+ free(fdata);
+ fatal("Out of memory.");
+ }
+ fdata = realloc_result;
+ }
+ }
+ if (f != stdin) {
+ fclose(f);
+ }
+ fdata[total_read] = '\0';
+ *datap = fdata;
+ *lenp = total_read;
+}
+
+static void
+configtls(nng_tls_config *tls)
+{
+ int rv = 0;
+ if (insecure) {
+ rv = nng_tls_config_auth_mode(tls, NNG_TLS_AUTH_MODE_NONE);
+ }
+ if ((rv == 0) && (certfile != NULL)) {
+ keyfile = keyfile ? keyfile : certfile;
+ rv = nng_tls_config_own_cert(tls, certfile, keyfile, NULL);
+ }
+ if ((rv == 0) && (cacert != NULL)) {
+ rv = nng_tls_config_ca_chain(tls, cacert, NULL);
+ }
+ if (rv != 0) {
+ fatal("Unable to configure TLS: %s", nng_strerror(rv));
+ }
+}
+
+struct addr {
+ struct addr *next;
+ int mode;
+ char * val;
+};
+
+struct addr **
+addaddr(struct addr **endp, int mode, const char *a)
+{
+ struct addr *na;
+
+ if (((na = malloc(sizeof(*na))) == NULL) ||
+ ((na->val = malloc(strlen(a) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ na->mode = mode;
+ memcpy(na->val, a, strlen(a) + 1);
+ na->next = NULL;
+ *endp = na;
+ return (&na->next);
+}
+
+struct topic {
+ struct topic *next;
+ char * val;
+};
+
+struct topic **
+addtopic(struct topic **endp, const char *s)
+{
+ struct topic *t;
+
+ if (((t = malloc(sizeof(*t))) == NULL) ||
+ ((t->val = malloc(strlen(s) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ memcpy(t->val, s, strlen(s) + 1);
+ t->next = NULL;
+ *endp = t;
+ return (&t->next);
+}
+
+void
+printmsg(char *buf, size_t len)
+{
+ switch (format) {
+ case OPT_BLANK: // Suppress contents.
+ return;
+ case OPT_RAW: // Just emit raw contents.
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_ASCII: // ASCII, but non-ASCII substituted with '.'
+ for (size_t i = 0; i < len; i++) {
+ if (isprint(buf[i])) {
+ putchar(buf[i]);
+ } else {
+ putchar('.');
+ }
+ }
+ break;
+ case OPT_QUOTED: // C style quoted strings.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ switch (buf[i]) {
+ case '\n':
+ putchar('\\');
+ putchar('n');
+ break;
+ case '\r':
+ putchar('\\');
+ putchar('r');
+ break;
+ case '\t':
+ putchar('\\');
+ putchar('t');
+ break;
+ case '"':
+ case '\\':
+ putchar('\\');
+ putchar(buf[i]);
+ break;
+ default:
+ if (isprint(buf[i])) {
+ fputc(buf[i], stdout);
+ } else {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ }
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then
+ // raw.
+ if (len < 256) {
+ putchar('\xc4');
+ putchar(len & 0xff);
+ } else if (len < 65536) {
+ putchar('\xc5');
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ } else {
+ putchar('\xc6');
+ putchar((len >> 24) & 0xff);
+ putchar((len >> 16) & 0xff);
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ }
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_HEX: // Hex, quoted C string encoded with hex
+ // literals.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ }
+ fflush(stdout);
+}
+
+void
+recvloop(nng_socket sock)
+{
+ int iters = 0;
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+ rv = nng_recvmsg(sock, &msg, 0);
+ iters++;
+ switch (rv) {
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // Either a regular timeout, or we reached the
+ // end of an event like a survey completing.
+ return;
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ default:
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ }
+}
+
+void
+resploop(nng_socket sock)
+{
+ int iters = 0;
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ if (rv != 0) {
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_clear(msg);
+ if ((rv = nng_msg_append(msg, data, datalen)) != 0) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+ }
+
+ nng_msleep(200);
+}
+
+void
+sendloop(nng_socket sock)
+{
+ int iters = 0;
+
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ iters++;
+ // By default, we don't loop.
+ if (((interval < 0) && (count == 0)) ||
+ ((count > 0) && (iters >= count))) {
+ break;
+ }
+
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course
+ // if we took more than the interval here, then we skip
+ // the sleep altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+ // Wait a bit to give queues a chance to drain.
+ nng_msleep(200);
+}
+
+void
+sendrecv(nng_socket sock)
+{
+ int iters = 0;
+
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+
+ // We start by sending a message, then we receive iteratively
+ // but we set a concrete timeout if interval is set, to ensure
+ // that we exit the receive loop, and can continue.
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ if ((interval < 0) && (count == 0)) {
+ // Only one iteration through.
+ recvloop(sock);
+ break;
+ }
+
+ // We would like to use recvloop, but we need to reset
+ // our timeout each time, as the timer counts down
+ // towards zero. Furthermore, with survey, we don't
+ // want to increment the iteration count.
+
+ for (;;) {
+ delta = (nng_duration)(nng_clock() - start);
+
+ nng_duration expire = interval - delta;
+ if ((recvtimeo >= 0) && (expire > recvtimeo)) {
+ expire = recvtimeo;
+ }
+ rv =
+ nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, expire);
+ if (rv != 0) {
+ fatal("Cannot set recv timeout: %s",
+ nng_strerror(rv));
+ }
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ switch (rv) {
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // We're done receiving
+ break;
+ default:
+ fatal("Cannot receive: %s", nng_strerror(rv));
+ break;
+ }
+
+ // We're done receiving, break out.
+ break;
+ }
+
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course
+ // if we took more than the interval here, then we skip
+ // the sleep altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+}
+
+int
+main(int ac, char **av)
+{
+ int idx;
+ char * arg;
+ int val;
+ int rv;
+ char scratch[512];
+ struct addr * addrs = NULL;
+ struct addr ** addrend;
+ struct topic * topics = NULL;
+ struct topic **topicend;
+ nng_socket sock;
+ int port;
+
+ idx = 1;
+ addrend = &addrs;
+ topicend = &topics;
+
+ while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) {
+ switch (val) {
+ case OPT_HELP:
+ help();
+ break;
+ case OPT_REQ0:
+ case OPT_REP0:
+ case OPT_SUB0:
+ case OPT_PUB0:
+ case OPT_BUS0:
+ case OPT_SURVEY0:
+ case OPT_RESPOND0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_PUSH0:
+ case OPT_PULL0:
+ if (proto != 0) {
+ fatal("Only one protocol may be "
+ "specified.");
+ }
+ proto = val;
+ break;
+ case OPT_DIAL:
+ case OPT_LISTEN:
+ addrend = addaddr(addrend, val, arg);
+ break;
+ case OPT_DIAL_LOCAL:
+ case OPT_LISTEN_LOCAL:
+ port = intarg(arg, 65536);
+ snprintf(scratch, sizeof(scratch),
+ "tcp://127.0.0.1:%d", port);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_DIAL_IPC:
+ case OPT_LISTEN_IPC:
+ snprintf(scratch, sizeof(scratch), "ipc:///%s", arg);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_COUNT:
+ count = intarg(arg, 0x7fffffff);
+ break;
+ case OPT_SUBSCRIBE:
+ topicend = addtopic(topicend, arg);
+ break;
+ case OPT_VERBOSE:
+ case OPT_SILENT:
+ verbose = val;
+ break;
+ case OPT_DELAY:
+ delay = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_INTERVAL:
+ interval = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_SND_TIMEO:
+ sendtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_RCV_TIMEO:
+ recvtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_RECVMAXSZ:
+ recvmaxsz = intarg(arg, 0x7fffffff);
+ if (recvmaxsz == 0) {
+ recvmaxsz = 0x7fffffff;
+ }
+ break;
+ case OPT_COMPAT:
+ compat = 1;
+ break;
+ case OPT_ASYNC:
+ async = NNG_FLAG_NONBLOCK;
+ break;
+ case OPT_ASCII:
+ case OPT_RAW:
+ case OPT_QUOTED:
+ case OPT_MSGPACK:
+ case OPT_HEX:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ format = val;
+ break;
+ case OPT_FORMAT:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ if (strcmp(arg, "no") == 0) {
+ format = OPT_BLANK;
+ } else if (strcmp(arg, "ascii") == 0) {
+ format = OPT_ASCII;
+ } else if (strcmp(arg, "hex") == 0) {
+ format = OPT_HEX;
+ } else if (strcmp(arg, "quoted") == 0) {
+ format = OPT_QUOTED;
+ } else if (strcmp(arg, "raw") == 0) {
+ format = OPT_RAW;
+ } else if (strcmp(arg, "msgpack") == 0) {
+ format = OPT_MSGPACK;
+ } else {
+ fatal("Invalid format specified.");
+ }
+ break;
+ case OPT_FILE:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be "
+ "specified "
+ "only once.");
+ }
+ loadfile(arg, &data, &datalen);
+ break;
+ case OPT_DATA:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be "
+ "specified "
+ "only once.");
+ }
+ if ((data = malloc(strlen(arg) + 1)) == NULL) {
+ fatal("Out of memory.");
+ }
+ memcpy(data, arg, strlen(arg) + 1);
+ datalen = strlen(arg);
+ break;
+ case OPT_CACERT:
+ if (cacert != NULL) {
+ fatal("CA Certificate (--cacert) may be "
+ "specified only once.");
+ }
+ loadfile(arg, &cacert, &cacertlen);
+ break;
+ case OPT_KEYFILE:
+ if (keyfile != NULL) {
+ fatal(
+ "Key (--key) may be specified only once.");
+ }
+ loadfile(arg, &keyfile, &keylen);
+ break;
+ case OPT_CERTFILE:
+ if (certfile != NULL) {
+ fatal("Cert (--cert) may be specified "
+ "only "
+ "once.");
+ }
+ loadfile(arg, &certfile, &certlen);
+ break;
+ case OPT_ZTHOME:
+ zthome = arg;
+ break;
+ case OPT_INSECURE:
+ insecure = 1;
+ break;
+ case OPT_VERSION:
+ printf("%s\n", nng_version());
+ exit(0);
+ }
+ }
+ switch (rv) {
+ case NNG_EINVAL:
+ fatal("Option %s is invalid.", av[idx]);
+ break;
+ case NNG_EAMBIGUOUS:
+ fatal("Option %s is ambiguous (specify in full).", av[idx]);
+ break;
+ case NNG_ENOARG:
+ fatal("Option %s requires argument.", av[idx]);
+ break;
+ default:
+ break;
+ }
+
+ if (addrs == NULL) {
+ fatal("No address specified.");
+ }
+
+ if (compat) {
+ if (async != 0) {
+ fatal("Options --async and --compat are "
+ "incompatible.");
+ }
+ if (count != 0) {
+ fatal("Options --count and --compat are "
+ "incompatible.");
+ }
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR0;
+ }
+ if (proto == OPT_PAIR1) {
+ fatal("Protocol does not support --compat.");
+ }
+ async = NNG_FLAG_NONBLOCK;
+ } else {
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR1;
+ }
+ }
+ if (proto == OPT_SUB0) {
+ if (topics == NULL) {
+ (void) addtopic(topicend, ""); // subscribe to all
+ }
+ } else {
+ if (topics != NULL) {
+ fatal("Protocol does not support --subscribe.");
+ }
+ }
+
+ switch (proto) {
+ case OPT_PULL0:
+ case OPT_SUB0:
+ if (data != NULL) {
+ fatal("Protocol does not support --file or "
+ "--data.");
+ }
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_PUB0:
+ if (format != 0) {
+ fatal("Protocol does not support --format "
+ "options.");
+ }
+ if (data == NULL) {
+ fatal("Protocol requires either --file or "
+ "--data.");
+ }
+ break;
+ case OPT_SURVEY0:
+ case OPT_REQ0:
+ if (data == NULL) {
+ fatal("Protocol requires either --file or "
+ "--data.");
+ }
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_BUS0:
+ if ((interval >= 0) && (data == NULL)) {
+ fatal("Option --interval requires either "
+ "--file or --data.");
+ }
+ break;
+ default:
+ // Will be caught in next switch statement.
+ break;
+ }
+
+ switch (proto) {
+ case OPT_REQ0:
+#ifdef NNG_HAVE_REQ0
+ rv = nng_req0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_REP0:
+#ifdef NNG_HAVE_REP0
+ rv = nng_rep0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_SUB0:
+#ifdef NNG_HAVE_SUB0
+ rv = nng_sub0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PUB0:
+#ifdef NNG_HAVE_PUB0
+ rv = nng_pub0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PAIR0:
+#ifdef NNG_HAVE_PAIR0
+ rv = nng_pair0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PAIR1:
+#ifdef NNG_HAVE_PAIR1
+ rv = nng_pair1_open(&sock);
+#else
+ fatal("Protocol not supported");
+#endif
+ break;
+ case OPT_BUS0:
+#ifdef NNG_HAVE_BUS0
+ rv = nng_bus0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PUSH0:
+#ifdef NNG_HAVE_PUSH0
+ rv = nng_push0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PULL0:
+#ifdef NNG_HAVE_PULL0
+ rv = nng_pull0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_SURVEY0:
+#ifdef NNG_HAVE_SURVEYOR0
+ rv = nng_surveyor0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_RESPOND0:
+#ifdef NNG_HAVE_RESPONDENT0
+ rv = nng_respondent0_open(&sock);
+#else
+ fatal("Protocol not supported");
+#endif
+ break;
+ case 0:
+ default:
+ fatal("No protocol specified.");
+ break;
+ }
+ if (rv != 0) {
+ fatal("Unable to open socket: %s", nng_strerror(rv));
+ }
+
+ for (struct topic *t = topics; t != NULL; t = t->next) {
+ rv = nng_socket_set(
+ sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val));
+ if (rv != 0) {
+ fatal("Unable to subscribe to topic %s: %s", t->val,
+ nng_strerror(rv));
+ }
+ }
+
+ if ((sendtimeo > 0) &&
+ ((rv = nng_socket_set_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) !=
+ 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+ if ((recvtimeo > 0) &&
+ ((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) !=
+ 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+
+ if ((recvmaxsz >= 0) &&
+ ((rv = nng_socket_set_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) !=
+ 0)) {
+ fatal("Unable to set max receive size: %s", nng_strerror(rv));
+ }
+
+ for (struct addr *a = addrs; a != NULL; a = a->next) {
+ char * act;
+ nng_listener l;
+ nng_dialer d;
+ nng_tls_config *tls;
+ switch (a->mode) {
+ case OPT_DIAL:
+ case OPT_DIAL_IPC:
+ case OPT_DIAL_LOCAL:
+ rv = nng_dialer_create(&d, sock, a->val);
+ if (rv != 0) {
+ fatal("Unable to create dialer for %s: %s",
+ a->val, nng_strerror(rv));
+ }
+ rv = nng_dialer_getopt_ptr(
+ d, NNG_OPT_TLS_CONFIG, (void **) &tls);
+ if (rv == 0) {
+ configtls(tls);
+ } else if (rv != NNG_ENOTSUP) {
+ fatal("Unable to get TLS config: %s",
+ nng_strerror(rv));
+ }
+ if (zthome != NULL) {
+ rv = nng_dialer_set(d, NNG_OPT_ZT_HOME,
+ zthome, strlen(zthome) + 1);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ fatal("Unable to set ZT home: %s",
+ nng_strerror(rv));
+ }
+ }
+ rv = nng_dialer_start(d, async);
+ act = "dial";
+ if ((rv == 0) && (verbose == OPT_VERBOSE)) {
+ char ustr[256];
+ size_t sz;
+ sz = sizeof(ustr);
+ if (nng_dialer_getopt(
+ d, NNG_OPT_URL, ustr, &sz) == 0) {
+ printf("Connected to: %s\n", ustr);
+ }
+ }
+ break;
+ case OPT_LISTEN:
+ case OPT_LISTEN_IPC:
+ case OPT_LISTEN_LOCAL:
+ rv = nng_listener_create(&l, sock, a->val);
+ if (rv != 0) {
+ fatal("Unable to create listener for %s: %s",
+ a->val, nng_strerror(rv));
+ }
+ rv = nng_listener_getopt_ptr(
+ l, NNG_OPT_TLS_CONFIG, (void **) &tls);
+ if (rv == 0) {
+ configtls(tls);
+ } else if (rv != NNG_ENOTSUP) {
+ fatal("Unable to get TLS config: %s",
+ nng_strerror(rv));
+ }
+ if (zthome != NULL) {
+ rv = nng_listener_set(l, NNG_OPT_ZT_HOME,
+ zthome, strlen(zthome) + 1);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ fatal("Unable to set ZT home: %s",
+ nng_strerror(rv));
+ }
+ }
+ rv = nng_listener_start(l, async);
+ act = "listen";
+ if ((rv == 0) && (verbose == OPT_VERBOSE)) {
+ char ustr[256];
+ size_t sz;
+ sz = sizeof(ustr);
+ if (nng_listener_getopt(
+ l, NNG_OPT_URL, ustr, &sz) == 0) {
+ printf("Listening at: %s\n", ustr);
+ }
+ }
+ break;
+ default:
+ fatal("Invalid address mode! (Bug!)");
+ }
+
+ if (rv != 0) {
+ fatal("Unable to %s on %s: %s", act, a->val,
+ nng_strerror(rv));
+ }
+ }
+
+ switch (proto) {
+ case OPT_SUB0:
+ case OPT_PULL0:
+ recvloop(sock);
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (data == NULL) {
+ recvloop(sock);
+ } else {
+ resploop(sock);
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_PUB0:
+ sendloop(sock);
+ break;
+ case OPT_BUS0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ if (data != NULL) {
+ sendrecv(sock);
+ } else {
+ recvloop(sock);
+ }
+ break;
+ case OPT_REQ0:
+ case OPT_SURVEY0:
+ sendrecv(sock);
+ break;
+ default:
+ fatal("Protocol handling unimplemented.");
+ }
+
+ exit(0);
+}
diff --git a/src/tools/nngcat/nngcat_ambiguous_test.sh b/src/tools/nngcat/nngcat_ambiguous_test.sh
new file mode 100755
index 00000000..414b6d19
--- /dev/null
+++ b/src/tools/nngcat/nngcat_ambiguous_test.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+CMD="${NNGCAT} --re --dial=tcp://127.0.0.1:27272"
+
+echo -n "Verify ambiguous options fail: "
+if ${CMD} >/dev/null 2>&1
+then
+ echo "Failed: ambigous accepted"
+ exit 1
+fi
+x=$(${CMD} 2>&1)
+if [[ ${x} =~ "ambiguous" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: error did not match"
+exit 1
diff --git a/src/tools/nngcat/nngcat_async_test.sh b/src/tools/nngcat/nngcat_async_test.sh
new file mode 100755
index 00000000..2b03e522
--- /dev/null
+++ b/src/tools/nngcat/nngcat_async_test.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_async_test"
+
+echo -n "Verify async connect: "
+
+${NNGCAT} --async -d 1 --connect ${ADDR} --req0 -D "ping" &
+
+
+answer=$( ${NNGCAT} --rep0 --recv-timeout=3 --listen ${ADDR} -D "pong" --ascii 2>/dev/null )
+
+if [[ ${answer} == "ping" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: req did not match"
+echo "RES: $answer"
+exit 1
diff --git a/src/tools/nngcat/nngcat_dup_proto_test.sh b/src/tools/nngcat/nngcat_dup_proto_test.sh
new file mode 100755
index 00000000..1513d01c
--- /dev/null
+++ b/src/tools/nngcat/nngcat_dup_proto_test.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify only a single protocol is allowed: "
+if ${NNGCAT} --pub0 --sub0 --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1
+then
+ echo "Failed: duplicate protocols accepted"
+ exit 1
+fi
+echo "pass"
+exit 0
diff --git a/src/tools/nngcat/nngcat_help_test.sh b/src/tools/nngcat/nngcat_help_test.sh
new file mode 100755
index 00000000..95ed9e3e
--- /dev/null
+++ b/src/tools/nngcat/nngcat_help_test.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify nngcat help: "
+if ${NNGCAT} --help >/dev/null 2>&1
+then
+ echo "Failed: help didn't return 1"
+ exit 1
+fi
+x=$(${NNGCAT} --help 2>&1)
+if [[ ${x} =~ "Usage:" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: usage did not match"
+echo "Output:"
+echo "$x"
+exit 1
diff --git a/src/tools/nngcat/nngcat_incompat_test.sh b/src/tools/nngcat/nngcat_incompat_test.sh
new file mode 100755
index 00000000..128b57ba
--- /dev/null
+++ b/src/tools/nngcat/nngcat_incompat_test.sh
@@ -0,0 +1,73 @@
+#!/bin/sh
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo "Verify incompatible options: "
+
+# Just bind something to this so other ones connect
+${NNGCAT} --pull0 --ascii -X /tmp/bogusipc &
+pid=$!
+
+trap "kill $pid && wait $pid 2>/dev/null" 0
+
+echo -n " --subscribe doesn't work with non-sub"
+if ${NNGCAT} --req0 -x /tmp/bogusipc --subscribe=oops >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --interval doesn't work with recv only: "
+if ${NNGCAT} --interval 1 --pull -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --pair1 doesn't work with --compat: "
+if ${NNGCAT} --compat --pair1 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --count doesn't work with --compat: "
+if ${NNGCAT} --compat --count=1 --pair0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --count fails with non-integer: "
+if ${NNGCAT} --count=xyz --pair0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --file fails with non-existing file: "
+if ${NNGCAT} --async --file=/nosuchfilehere --push0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo "PASS."
+exit 0
diff --git a/src/tools/nngcat/nngcat_need_proto_test.sh b/src/tools/nngcat/nngcat_need_proto_test.sh
new file mode 100755
index 00000000..d6733cad
--- /dev/null
+++ b/src/tools/nngcat/nngcat_need_proto_test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify a protocol is needed: "
+if ${NNGCAT} --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1
+then
+ echo "Failed: protocol should be required"
+ exit 1
+fi
+echo "pass"
+exit 0
diff --git a/src/tools/nngcat/nngcat_pubsub_test.sh b/src/tools/nngcat/nngcat_pubsub_test.sh
new file mode 100755
index 00000000..b9ba90ed
--- /dev/null
+++ b/src/tools/nngcat/nngcat_pubsub_test.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_pub_sub_test"
+OUTPUT=/tmp/nngcat_pubsub_test.$$.out
+
+echo -n "Verify pub sub: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=3 --recv-timeout=20 --sub0 --subscribe=one --subscribe=two --quoted > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "xyz" &
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "none swam" &
+# these we care about, due to ordering (checksum) so run them serially
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "one flew"
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "twofer test"
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "one more"
+
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+if [[ ${sum} == 3929078614 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})"
+echo "OUTPUT:"
+cat ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_recvmaxsz_test.sh b/src/tools/nngcat/nngcat_recvmaxsz_test.sh
new file mode 100755
index 00000000..b5d4ff4a
--- /dev/null
+++ b/src/tools/nngcat/nngcat_recvmaxsz_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_recvmaxsz_test"
+OUTPUT=/tmp/nngcat_recvmaxsz_test.$$.out
+
+echo -n "Verify maximum receive size: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=3 --recv-maxsz=5 --pull0 --quoted > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} --connect ${ADDR} --push0 --data "one"
+${NNGCAT} --connect ${ADDR} --push0 --data "55555"
+${NNGCAT} --connect ${ADDR} --push0 --data "666666"
+${NNGCAT} --connect ${ADDR} --push0 --data "7777777"
+${NNGCAT} --connect ${ADDR} --push0 --data "88888"
+
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+# This matches 3 lines of "one", "55555", "88888".
+if [[ ${sum} == 4122906158 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})"
+echo "OUTPUT:"
+cat ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_stdin_pipe_test.sh b/src/tools/nngcat/nngcat_stdin_pipe_test.sh
new file mode 100755
index 00000000..5fec0ab7
--- /dev/null
+++ b/src/tools/nngcat/nngcat_stdin_pipe_test.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+# Copyright 2020 Lager Data, Inc. <support@lagerdata.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_stdin_pipe_test"
+OUTPUT=/tmp/nngcat_stdin_pipe_test.$$.out
+
+echo -n "Verify reading from stdin pipe: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=1 --recv-timeout=3 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null &
+bgid=$!
+
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+echo "hello world" | ${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file -
+wait "$bgid" 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+# This matches "hello world\n" since echo adds a trailing newline
+if [[ ${sum} == 3733384285 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3733384285 got ${sum})"
+echo "OUTPUT:"
+ls -la ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_unlimited_test.sh b/src/tools/nngcat/nngcat_unlimited_test.sh
new file mode 100755
index 00000000..0486b9b1
--- /dev/null
+++ b/src/tools/nngcat/nngcat_unlimited_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_unlimited_test"
+INPUT=/tmp/nngcat_unlimited_test.$$.in
+OUTPUT=/tmp/nngcat_unlimited_test.$$.out
+
+echo -n "Verify unlimited receive size: "
+
+trap "rm $OUTPUT $INPUT" 0
+
+# 4 MB
+dd if=/dev/urandom of=${INPUT} bs=1024 count=4096 >/dev/null 2>&1
+goodsum=$(cksum ${INPUT})
+goodsum=${goodsum%% *}
+
+${NNGCAT} --listen ${ADDR} --count=1 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file ${INPUT}
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+if [[ ${sum} == ${goodsum} ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted ${goodsum} got ${sum})"
+echo "OUTPUT:"
+ls -la ${OUTPUT}
+
+exit 1
diff --git a/src/tools/perf/CMakeLists.txt b/src/tools/perf/CMakeLists.txt
new file mode 100644
index 00000000..135544bb
--- /dev/null
+++ b/src/tools/perf/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Build performance tests.
+
+nng_directory(perf)
+
+if (NNG_TESTS)
+ macro (add_nng_perf NAME)
+ add_executable (${NAME} perf.c)
+ target_link_libraries (${NAME} nng nng_private)
+ endmacro (add_nng_perf)
+
+ add_nng_perf(remote_lat)
+ add_nng_perf(local_lat)
+ add_nng_perf(local_thr)
+ add_nng_perf(remote_thr)
+ add_nng_perf(inproc_thr)
+ add_nng_perf(inproc_lat)
+
+ add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000)
+ set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30)
+
+ add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000)
+ set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30)
+
+ add_executable (pubdrop pubdrop.c)
+ target_link_libraries(pubdrop nng nng_private)
+endif ()
diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c
new file mode 100644
index 00000000..accac621
--- /dev/null
+++ b/src/tools/perf/perf.c
@@ -0,0 +1,662 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+
+static void die(const char *, ...);
+static int
+no_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Protocol not supported in this build!");
+ return (NNG_ENOTSUP);
+}
+
+typedef int (*open_func)(nng_socket *);
+
+static open_func open_server = no_open;
+static open_func open_client = no_open;
+
+#if defined(NNG_HAVE_PAIR1)
+#include <nng/protocol/pair1/pair.h>
+#else
+#define nng_pair1_open no_open
+#endif
+
+#if defined(NNG_HAVE_PAIR0)
+#include <nng/protocol/pair0/pair.h>
+#else
+#define nng_pair0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REQ0)
+#include <nng/protocol/reqrep0/req.h>
+#else
+#define nng_req0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REP0)
+#include <nng/protocol/reqrep0/rep.h>
+#else
+#define nng_rep0_open no_open
+#endif
+
+#if defined(NNG_HAVE_BUS0)
+#include <nng/protocol/bus0/bus.h>
+#else
+#define nng_bus0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PULL0)
+#include <nng/protocol/pipeline0/pull.h>
+#else
+#define nng_pull0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUSH0)
+#include <nng/protocol/pipeline0/push.h>
+#else
+#define nng_push0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#else
+#define nng_pub0_open no_open
+#endif
+
+#if defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/sub.h>
+#else
+#define nng_sub0_open no_open
+#endif
+
+enum options {
+ OPT_PAIR0 = 1,
+ OPT_PAIR1,
+ OPT_REQREP0,
+ OPT_PUBSUB0,
+ OPT_PIPELINE0,
+ OPT_SURVEY0,
+ OPT_BUS0,
+ OPT_URL,
+};
+
+// These are not universally supported by the variants yet.
+static nng_optspec opts[] = {
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
+ { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
+ { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void latency_client(const char *, size_t, int);
+static void latency_server(const char *, size_t, int);
+static void throughput_client(const char *, size_t, int);
+static void throughput_server(const char *, size_t, int);
+static void do_remote_lat(int argc, char **argv);
+static void do_local_lat(int argc, char **argv);
+static void do_remote_thr(int argc, char **argv);
+static void do_local_thr(int argc, char **argv);
+static void do_inproc_thr(int argc, char **argv);
+static void do_inproc_lat(int argc, char **argv);
+static void die(const char *, ...);
+
+// perf implements the same performance tests found in the standard
+// nanomsg & mangos performance tests. As with mangos, the decision
+// about which test to run is determined by the program name (ARGV[0}])
+// that it is run under.
+//
+// Options are:
+//
+// - remote_lat - remote latency side (client, aka latency_client)
+// - local_lat - local latency side (server, aka latency_server)
+// - local_thr - local throughput side
+// - remote_thr - remote throughput side
+// - inproc_lat - inproc latency
+// - inproc_thr - inproc throughput
+//
+
+bool
+matches(const char *arg, const char *name)
+{
+ const char *ptr = arg;
+ const char *x;
+
+ while (((x = strchr(ptr, '/')) != NULL) ||
+ ((x = strchr(ptr, '\\')) != NULL) ||
+ ((x = strchr(ptr, ':')) != NULL)) {
+ ptr = x + 1;
+ }
+ for (;;) {
+ if (*name == '\0') {
+ break;
+ }
+ if (tolower(*ptr) != *name) {
+ return (false);
+ }
+ ptr++;
+ name++;
+ }
+
+ switch (*ptr) {
+ case '\0':
+ /* FALLTHROUGH*/
+ case '.': // extension; ignore it.
+ return (true);
+ default: // some other trailing bit.
+ return (false);
+ }
+}
+
+const int PAIR0 = 0;
+const int PAIR1 = 1;
+const int REQREP = 2;
+
+int
+main(int argc, char **argv)
+{
+ char *prog;
+
+#if defined(NNG_HAVE_PAIR1)
+ open_server = nng_pair1_open;
+ open_client = nng_pair1_open;
+#elif defined(NNG_HAVE_PAIR0)
+ open_server = nng_pair0_open;
+ open_client = nng_pair0_open;
+#endif
+
+ // Allow -m <remote_lat> or whatever to override argv[0].
+ if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
+ prog = argv[2];
+ argv += 3;
+ argc -= 3;
+ } else {
+ prog = argv[0];
+ argc--;
+ argv++;
+ }
+ if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
+ do_remote_lat(argc, argv);
+ } else if (matches(prog, "local_lat") ||
+ matches(prog, "latency_server")) {
+ do_local_lat(argc, argv);
+ } else if (matches(prog, "local_thr") ||
+ matches(prog, "throughput_server")) {
+ do_local_thr(argc, argv);
+ } else if (matches(prog, "remote_thr") ||
+ matches(prog, "throughput_client")) {
+ do_remote_thr(argc, argv);
+ } else if (matches(prog, "inproc_thr")) {
+ do_inproc_thr(argc, argv);
+ } else if (matches(prog, "inproc_lat")) {
+ do_inproc_lat(argc, argv);
+ } else {
+ die("Unknown program mode? Use -m <mode>.");
+ }
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a positive number less than around a billion.
+ if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+void
+do_local_lat(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_lat(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_client(argv[0], msgsize, trips);
+}
+
+void
+do_local_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: local_thr <listen-addr> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_thr <connect-to> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_client(argv[0], msgsize, trips);
+}
+
+struct inproc_args {
+ int count;
+ int msgsize;
+ const char *addr;
+ void (*func)(const char *, size_t, int);
+};
+
+static void
+do_inproc(void *args)
+{
+ struct inproc_args *ia = args;
+
+ ia->func(ia->addr, ia->msgsize, ia->count);
+}
+
+void
+do_inproc_lat(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int val;
+ int optidx;
+ char * arg;
+ char * addr;
+
+ addr = "inproc://latency_test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_BUS0:
+ open_client = nng_bus0_open;
+ open_server = nng_bus0_open;
+ break;
+
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_lat <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = latency_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ latency_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+do_inproc_thr(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int optidx;
+ int val;
+ char * arg;
+ char * addr = "inproc://throughput-test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+#if 0
+ // For now these protocols simply do not work with
+ // throughput -- they don't work with backpressure properly.
+ // In the future we should support synchronizing in the same
+ // process, and alerting the sender both on completion of
+ // a single message, and on completion of all messages.
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+#endif
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_PIPELINE0:
+ open_client = nng_pull0_open;
+ open_server = nng_push0_open;
+ break;
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_thr <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = throughput_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ throughput_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+latency_client(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ nng_time start, end;
+ int rv;
+ int i;
+ float total;
+ float latency;
+ if ((rv = open_client(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100);
+
+ if (nng_msg_alloc(&msg, msgsize) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ start = nng_clock();
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ }
+ end = nng_clock();
+
+ nng_msg_free(msg);
+ nng_close(s);
+
+ total = (float) ((end - start)) / 1000;
+ latency = ((float) ((total * 1000000)) / (float) (trips * 2));
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("round trip count: %d\n", trips);
+ printf("average latency: %.3f [us]\n", latency);
+}
+
+void
+latency_server(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ if ((rv = open_server(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Wait a bit for things to drain... linger should do this.
+ // 100ms ought to be enough.
+ nng_msleep(100);
+ nng_close(s);
+}
+
+// Our throughput story is quite a mess. Mostly I think because of the poor
+// caching and message reuse. We should probably implement a message pooling
+// API somewhere.
+
+void
+throughput_server(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+ uint64_t start, end;
+ float msgpersec, mbps, total;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+ rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ // Receive first synchronization message.
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ nng_msg_free(msg);
+ start = nng_clock();
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ nng_msg_free(msg);
+ }
+ end = nng_clock();
+ // Send a synchronization message (empty) to the other side,
+ // and wait a bit to make sure it goes out the wire.
+ nng_send(s, "", 0, 0);
+ nng_msleep(200);
+ nng_close(s);
+ total = (float) ((end - start)) / 1000;
+ msgpersec = (float) (count) / total;
+ mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("message count: %d\n", count);
+ printf("throughput: %.f [msg/s]\n", msgpersec);
+ printf("throughput: %.3f [Mb/s]\n", mbps);
+}
+
+void
+throughput_client(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ // We send one extra zero length message to start the timer.
+ count++;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
+ }
+
+ rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Attempt to get the completion indication from the other
+ // side.
+ if (nng_recvmsg(s, &msg, 0) == 0) {
+ nng_msg_free(msg);
+ }
+
+ nng_close(s);
+}
diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c
new file mode 100644
index 00000000..be51bb2f
--- /dev/null
+++ b/src/tools/perf/pubdrop.c
@@ -0,0 +1,325 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/platform.h>
+
+// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
+// performance, including dropped messages, delivery across multiple threads,
+// etc. It actually uses a wild card subscription for now.
+
+#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+
+#else
+
+static void die(const char *, ...);
+
+static int
+nng_pub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Pub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+static int
+nng_sub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Sub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+#endif // NNG_HAVE_PUB0....
+
+static void die(const char *, ...);
+static void do_pubdrop(int argc, char **argv);
+static uint64_t nperusec;
+static volatile int x;
+
+void
+work(void)
+{
+ x = rand();
+}
+
+void
+usdelay(unsigned long long nusec)
+{
+ nusec *= nperusec;
+ while (nusec > 0) {
+ work();
+ nusec--;
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ argc--;
+ argv++;
+
+ // We calculate a delay factor to roughly delay 1 usec. We don't
+ // need this to be perfect, just reproducible on the same host.
+ unsigned long cnt = 1000000;
+
+ nng_time beg = nng_clock();
+ for (unsigned long i = 0; i < cnt; i++) {
+ work();
+ }
+ nng_time end = nng_clock();
+ nperusec = cnt / (1000 * (end - beg));
+
+ do_pubdrop(argc, argv);
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a postive number less than around a billion.
+ if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+struct pubdrop_args {
+ const char * addr;
+ bool start;
+ unsigned long long msgsize;
+ unsigned long long count;
+ unsigned long long intvl;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long recvs;
+ nng_time beg;
+ nng_time end;
+ nng_mtx * mtx;
+ nng_cv * cv;
+};
+
+static void
+pub_server(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+
+ if ((rv = nng_pub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+
+ nng_mtx_lock(pa->mtx);
+ while (!pa->start) {
+ nng_cv_wait(pa->cv);
+ }
+ nng_mtx_unlock(pa->mtx);
+
+ start = nng_clock();
+ for (uint64_t i = 0; i < pa->count; i++) {
+ // Unfortunately we need to allocate messages dynamically as we
+ // go. The other option would be to allocate them all up front,
+ // but that could be a rather excessive amount of memory.
+ if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
+ die("Message alloc failed");
+ }
+ memcpy(nng_msg_body(msg), &i, sizeof(i));
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ die("Sendmsg: %s", nng_strerror(rv));
+ }
+ // It sure would be nice if we had a usec granularity option
+ // here.
+ if (pa->intvl > 0) {
+ usdelay((unsigned long long) pa->intvl);
+ }
+ }
+
+ end = nng_clock();
+
+ nng_msleep(1000); // drain the queue
+ nng_close(sock);
+
+ nng_mtx_lock(pa->mtx);
+ pa->beg = start;
+ pa->end = end;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+sub_client(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ unsigned long long recvs;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long expect;
+
+ if ((rv = nng_sub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+
+ expect = 0;
+ recvs = drops = gaps = errs = 0;
+
+ while (expect < pa->count) {
+ uint64_t got;
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
+ // Closed without receiving the last message
+ drops += (pa->count - expect);
+ gaps++;
+ break;
+ }
+ printf("ERROR: %s\n", nng_strerror(rv));
+ errs++;
+ break;
+ }
+ recvs++;
+ memcpy(&got, nng_msg_body(msg), sizeof(got));
+ nng_msg_free(msg);
+ if (got != expect) {
+ gaps++;
+ if (got > expect) {
+ drops += (got - expect);
+ } else {
+ die("Misordered delivery");
+ }
+ }
+ expect = got + 1;
+ }
+
+ nng_mtx_lock(pa->mtx);
+ pa->drops += drops;
+ pa->errs += errs;
+ pa->recvs += recvs;
+ pa->gaps += gaps;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+do_pubdrop(int argc, char **argv)
+{
+ nng_thread ** thrs;
+ struct pubdrop_args pa;
+ int rv;
+ int nsubs;
+
+ if (argc != 5) {
+ die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
+ "<interval>");
+ }
+
+ memset(&pa, 0, sizeof(pa));
+ pa.addr = argv[0];
+ pa.msgsize = parse_int(argv[1], "message size");
+ pa.count = parse_int(argv[2], "count");
+ pa.intvl = parse_int(argv[4], "interval");
+ nsubs = parse_int(argv[3], "#subscribers");
+
+ if (pa.msgsize < sizeof(uint64_t)) {
+ die("Message size too small.");
+ }
+
+ thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
+ if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
+ ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
+ die("Startup: %s\n", nng_strerror(rv));
+ };
+
+ if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
+ die("Cannot create pub thread: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100); // give time for listener to start...
+
+ for (int i = 0; i < nsubs; i++) {
+ if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
+ 0) {
+ die("Cannot create sub thread: %s", nng_strerror(rv));
+ }
+ }
+
+ // Sleep a bit for conns to establish.
+ nng_msleep(2000);
+ nng_mtx_lock(pa.mtx);
+ pa.start = true;
+ nng_cv_wake(pa.cv);
+ nng_mtx_unlock(pa.mtx);
+
+ for (int i = 0; i < nsubs + 1; i++) {
+ nng_thread_destroy(thrs[i]);
+ }
+
+ nng_mtx_lock(pa.mtx);
+
+ unsigned long long expect = nsubs * pa.count;
+ unsigned long long missing = nsubs ? expect - pa.recvs : 0;
+ double dur = (pa.end - pa.beg) / 1000.0;
+
+ printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
+ pa.count, dur, pa.count / dur);
+ printf("Expected %llu messages total\n", expect);
+ printf("Received %llu messages total\n", pa.recvs);
+ printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
+ printf("Errors %llu total\n", pa.errs);
+ printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
+ pa.gaps);
+ printf("Dropped %llu messages (missing)\n", missing);
+ printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
+
+ nng_mtx_unlock(pa.mtx);
+}