aboutsummaryrefslogtreecommitdiff
path: root/src/tools
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools')
-rw-r--r--src/tools/CMakeLists.txt13
-rw-r--r--src/tools/nngcat/CMakeLists.txt41
-rw-r--r--src/tools/nngcat/nngcat.c1217
-rwxr-xr-xsrc/tools/nngcat/nngcat_ambiguous_test.sh31
-rwxr-xr-xsrc/tools/nngcat/nngcat_async_test.sh32
-rwxr-xr-xsrc/tools/nngcat/nngcat_dup_proto_test.sh23
-rwxr-xr-xsrc/tools/nngcat/nngcat_help_test.sh32
-rwxr-xr-xsrc/tools/nngcat/nngcat_incompat_test.sh73
-rwxr-xr-xsrc/tools/nngcat/nngcat_need_proto_test.sh23
-rwxr-xr-xsrc/tools/nngcat/nngcat_pubsub_test.sh45
-rwxr-xr-xsrc/tools/nngcat/nngcat_recvmaxsz_test.sh46
-rwxr-xr-xsrc/tools/nngcat/nngcat_stdin_pipe_test.sh44
-rwxr-xr-xsrc/tools/nngcat/nngcat_unlimited_test.sh46
-rw-r--r--src/tools/perf/CMakeLists.txt35
-rw-r--r--src/tools/perf/perf.c662
-rw-r--r--src/tools/perf/pubdrop.c325
16 files changed, 2688 insertions, 0 deletions
diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
new file mode 100644
index 00000000..0487d9ec
--- /dev/null
+++ b/src/tools/CMakeLists.txt
@@ -0,0 +1,13 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staystail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+nng_directory(tools)
+
+add_subdirectory(nngcat)
+add_subdirectory(perf)
diff --git a/src/tools/nngcat/CMakeLists.txt b/src/tools/nngcat/CMakeLists.txt
new file mode 100644
index 00000000..d522a045
--- /dev/null
+++ b/src/tools/nngcat/CMakeLists.txt
@@ -0,0 +1,41 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+nng_directory(nngcat)
+
+if (NNG_ENABLE_NNGCAT)
+ add_executable(nngcat nngcat.c)
+ target_include_directories(nngcat PUBLIC ${PROJECT_SOURCE_DIR}/src)
+ target_link_libraries(nngcat nng nng_private)
+ install(TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
+ COMPONENT Tools)
+
+ if (NNG_TESTS AND CMAKE_SYSTEM_NAME MATCHES "Linux")
+ include(FindUnixCommands)
+ endif ()
+ # TODO: This should be refactored to use a test driver.
+ # We only run the tests on Linux for now, because the Darwin CI/CD is too brittle.
+ if (NNG_TESTS AND BASH AND CMAKE_SYSTEM_NAME MATCHES "Linux")
+ macro(add_nngcat_test NAME TIMEOUT)
+ add_test(NAME nng.${NAME} COMMAND ${BASH} ${CMAKE_CURRENT_SOURCE_DIR}/${NAME}_test.sh $<TARGET_FILE:nngcat>)
+ set_tests_properties(nng.${NAME} PROPERTIES TIMEOUT ${TIMEOUT})
+ endmacro()
+ add_nngcat_test(nngcat_async 10)
+ add_nngcat_test(nngcat_ambiguous 10)
+ add_nngcat_test(nngcat_need_proto 10)
+ add_nngcat_test(nngcat_dup_proto 10)
+ add_nngcat_test(nngcat_help 10)
+ add_nngcat_test(nngcat_incompat 10)
+ add_nngcat_test(nngcat_pubsub 20)
+ add_nngcat_test(nngcat_recvmaxsz 20)
+ add_nngcat_test(nngcat_unlimited 20)
+ add_nngcat_test(nngcat_stdin_pipe 20)
+ endif ()
+endif ()
diff --git a/src/tools/nngcat/nngcat.c b/src/tools/nngcat/nngcat.c
new file mode 100644
index 00000000..9e106069
--- /dev/null
+++ b/src/tools/nngcat/nngcat.c
@@ -0,0 +1,1217 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2020 Lager Data, Inc. <support@lagerdata.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/bus0/bus.h>
+#include <nng/protocol/pair0/pair.h>
+#include <nng/protocol/pair1/pair.h>
+#include <nng/protocol/pipeline0/pull.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
+#include <nng/supplemental/tls/tls.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+#include <nng/transport/zerotier/zerotier.h>
+
+// Globals. We need this to avoid passing around everything.
+int format = 0;
+int proto = 0;
+int verbose = 0;
+int delay = 0;
+nng_duration interval = NNG_DURATION_INFINITE;
+nng_duration sendtimeo = NNG_DURATION_INFINITE;
+nng_duration recvtimeo = NNG_DURATION_INFINITE;
+void * data = NULL;
+size_t datalen = 0;
+int compat = 0;
+int async = 0;
+int insecure = 0;
+void * cacert = NULL;
+size_t cacertlen = 0;
+void * keyfile = NULL;
+size_t keylen = 0;
+void * certfile = NULL;
+size_t certlen = 0;
+const char * zthome = NULL;
+int count = 0;
+int recvmaxsz = -1;
+
+// Options, must start at 1 because zero is sentinel.
+enum options {
+ OPT_HELP = 1,
+ OPT_VERBOSE,
+ OPT_SILENT,
+ OPT_REP0,
+ OPT_REQ0,
+ OPT_PUSH0,
+ OPT_PULL0,
+ OPT_PUB0,
+ OPT_SUB0,
+ OPT_SURVEY0,
+ OPT_RESPOND0,
+ OPT_PAIR0,
+ OPT_PAIR1,
+ OPT_PAIR,
+ OPT_BUS0,
+ OPT_DIAL,
+ OPT_LISTEN,
+ OPT_DATA,
+ OPT_FILE,
+ OPT_SUBSCRIBE,
+ OPT_INTERVAL,
+ OPT_DELAY,
+ OPT_COUNT,
+ OPT_FORMAT,
+ OPT_RAW,
+ OPT_ASCII,
+ OPT_QUOTED,
+ OPT_MSGPACK,
+ OPT_HEX,
+ OPT_BLANK, // no printing, not an actual option.
+ OPT_COMPAT,
+ OPT_ASYNC,
+ OPT_RCV_TIMEO,
+ OPT_SND_TIMEO,
+ OPT_SOCK_NAME,
+ OPT_LISTEN_IPC,
+ OPT_DIAL_IPC,
+ OPT_LISTEN_LOCAL,
+ OPT_DIAL_LOCAL,
+ OPT_INSECURE,
+ OPT_CACERT,
+ OPT_KEYFILE,
+ OPT_CERTFILE,
+ OPT_VERSION,
+ OPT_RECVMAXSZ,
+ OPT_ZTHOME,
+};
+
+static nng_optspec opts[] = {
+ { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP },
+ { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE },
+ { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT },
+ { .o_name = "req0", .o_val = OPT_REQ0 },
+ { .o_name = "rep0", .o_val = OPT_REP0 },
+ { .o_name = "push0", .o_val = OPT_PUSH0 },
+ { .o_name = "pull0", .o_val = OPT_PULL0 },
+ { .o_name = "pub0", .o_val = OPT_PUB0 },
+ { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias
+ { .o_name = "sub0", .o_val = OPT_SUB0 },
+ { .o_name = "surveyor0", .o_val = OPT_SURVEY0 },
+ { .o_name = "respondent0", .o_val = OPT_RESPOND0 },
+ { .o_name = "pair", .o_val = OPT_PAIR },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true },
+ { .o_name = "listen", .o_val = OPT_LISTEN, .o_arg = true },
+ { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true },
+ { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true },
+ { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true },
+ { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true },
+ { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII },
+ { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED },
+ { .o_name = "raw", .o_val = OPT_RAW },
+ { .o_name = "hex", .o_val = OPT_HEX },
+ { .o_name = "compat", .o_val = OPT_COMPAT },
+ { .o_name = "async", .o_val = OPT_ASYNC },
+ { .o_name = "msgpack", .o_val = OPT_MSGPACK },
+
+ {
+ .o_name = "recv-maxsz",
+ .o_short = 'Z',
+ .o_val = OPT_RECVMAXSZ,
+ .o_arg = true,
+ },
+ {
+ .o_name = "count",
+ .o_short = 'C',
+ .o_val = OPT_COUNT,
+ .o_arg = true,
+ },
+ {
+ .o_name = "delay",
+ .o_short = 'd',
+ .o_val = OPT_DELAY,
+ .o_arg = true,
+ },
+ {
+ .o_name = "interval",
+ .o_short = 'i',
+ .o_val = OPT_INTERVAL,
+ .o_arg = true,
+ },
+ { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true },
+ { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true },
+ { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true },
+
+ // Legacy compatibility with nanocat.
+ { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true },
+ { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true },
+ {
+ .o_name = "bind-ipc",
+ .o_short = 'X',
+ .o_val = OPT_LISTEN_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-ipc",
+ .o_short = 'x',
+ .o_val = OPT_DIAL_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "bind-local",
+ .o_short = 'L',
+ .o_val = OPT_LISTEN_LOCAL,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-local",
+ .o_short = 'l',
+ .o_val = OPT_DIAL_LOCAL,
+ .o_arg = true,
+ },
+ { .o_name = "insecure", .o_short = 'k', .o_val = OPT_INSECURE },
+ { .o_name = "cacert", .o_val = OPT_CACERT, .o_arg = true },
+ { .o_name = "key", .o_val = OPT_KEYFILE, .o_arg = true },
+ {
+ .o_name = "cert",
+ .o_short = 'E',
+ .o_val = OPT_CERTFILE,
+ .o_arg = true,
+ },
+ {
+ .o_name = "zt-home",
+ .o_val = OPT_ZTHOME,
+ .o_arg = true,
+ },
+ { .o_name = "version", .o_short = 'V', .o_val = OPT_VERSION },
+
+ // Sentinel.
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void
+fatal(const char *msg, ...)
+{
+ va_list ap;
+ va_start(ap, msg);
+ vfprintf(stderr, msg, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+static void
+help(void)
+{
+ printf(
+ "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n");
+ printf("<proto> must be one of:\n");
+ printf(" --req0 (or alias --req)\n");
+ printf(" --rep0 (or alias --rep)\n");
+ printf(" --pub0 (or alias --pub)\n");
+ printf(" --sub0 (or alias --sub)\n");
+ printf(" --push0 (or alias --push)\n");
+ printf(" --pull0 (or alias --pull)\n");
+ printf(" --surveyor0 (or alias --surveyor)\n");
+ printf(" --respondent0 (or alias --respondent)\n");
+ printf(" --pair0\n");
+ printf(" --pair1\n");
+ printf(" --pair (alias for either pair0 or pair1)\n");
+ printf("\n<addr> must be one or more of:\n");
+ printf(" --dial <url> (or alias --connect <url>)\n");
+ printf(" --listen <url> (or alias --bind <url>)\n");
+ printf(" --bind-local <port> (or alias -L <port>)\n");
+ printf(" --connect-local <port> (or alias -l <port>)\n");
+ printf(" --bind-ipc <path> (or alias -X <path>)\n");
+ printf(" --connect-ipc <path> (or alias -x <path>)\n");
+ printf("\n<fmt> may be one or none of:\n");
+ printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n");
+ printf(" --ascii (or alias -A)\n");
+ printf(" --quoted (or alias -Q)\n");
+ printf(" --hex\n");
+ printf(" --msgpack\n");
+ printf(" --raw\n");
+ printf("\n<opts> may be any of:\n");
+ printf(" --subscribe <topic> (only with --sub protocol)\n");
+ printf(" --silent (or alias -q)\n");
+ printf(" --verbose (or alias -v)\n");
+ printf(" --count <num> (or alias -C <num>)\n");
+ printf(" --delay <secs> (or alias -d <secs>)\n");
+ printf(" --interval <secs> (or alias -i <secs>)\n");
+ printf(" --recv-timeout <secs>\n");
+ printf(" --send-timeout <secs>\n");
+ printf(" --recv-maxsz <size> (or alias -Z <size>)\n");
+ printf(" --compat\n");
+ printf(" --async\n");
+ printf(" --insecure (or alias -k)\n");
+ printf(" --cacert <file>\n");
+ printf(" --cert <file> (or alias -E <file>)\n");
+ printf(" --key <file>\n");
+ printf(" --zt-home <path>\n");
+ printf("\n<src> may be one of:\n");
+ printf(" --file <file> (or alias -F <file>). "
+ "Use - for standard input.\n");
+ printf(" --data <data> (or alias -D <data>)\n");
+ exit(1);
+}
+
+static int
+intarg(const char *val, int maxv)
+{
+ int v = 0;
+
+ if (val[0] == '\0') {
+ fatal("Empty integer argument.");
+ }
+ while (*val != '\0') {
+ if (!isdigit(*val)) {
+ fatal("Integer argument expected.");
+ }
+ v *= 10;
+ v += ((*val) - '0');
+ val++;
+ if (v > maxv) {
+ fatal("Integer argument too large.");
+ }
+ }
+ if (v < 0) {
+ fatal("Integer argument overflow.");
+ }
+ return (v);
+}
+
+// This reads a file into memory. Care is taken to ensure that
+// the buffer is one byte larger and contains a terminating
+// NUL. (Useful for key files and such.)
+static void
+loadfile(const char *path, void **datap, size_t *lenp)
+{
+ FILE * f;
+ size_t total_read = 0;
+ size_t allocation_size = BUFSIZ;
+ char * fdata;
+ char * realloc_result;
+
+ if (strcmp(path, "-") == 0) {
+ f = stdin;
+ } else {
+ if ((f = fopen(path, "rb")) == NULL) {
+ fatal(
+ "Cannot open file %s: %s", path, strerror(errno));
+ }
+ }
+
+ if ((fdata = malloc(allocation_size + 1)) == NULL) {
+ fatal("Out of memory.");
+ }
+
+ while (1) {
+ total_read += fread(
+ fdata + total_read, 1, allocation_size - total_read, f);
+ if (ferror(f)) {
+ if (errno == EINTR) {
+ continue;
+ }
+ fatal(
+ "Read from %s failed: %s", path, strerror(errno));
+ }
+ if (feof(f)) {
+ break;
+ }
+ if (total_read == allocation_size) {
+ if (allocation_size > SIZE_MAX / 2) {
+ fatal("Out of memory.");
+ }
+ allocation_size *= 2;
+ if ((realloc_result = realloc(
+ fdata, allocation_size + 1)) == NULL) {
+ free(fdata);
+ fatal("Out of memory.");
+ }
+ fdata = realloc_result;
+ }
+ }
+ if (f != stdin) {
+ fclose(f);
+ }
+ fdata[total_read] = '\0';
+ *datap = fdata;
+ *lenp = total_read;
+}
+
+static void
+configtls(nng_tls_config *tls)
+{
+ int rv = 0;
+ if (insecure) {
+ rv = nng_tls_config_auth_mode(tls, NNG_TLS_AUTH_MODE_NONE);
+ }
+ if ((rv == 0) && (certfile != NULL)) {
+ keyfile = keyfile ? keyfile : certfile;
+ rv = nng_tls_config_own_cert(tls, certfile, keyfile, NULL);
+ }
+ if ((rv == 0) && (cacert != NULL)) {
+ rv = nng_tls_config_ca_chain(tls, cacert, NULL);
+ }
+ if (rv != 0) {
+ fatal("Unable to configure TLS: %s", nng_strerror(rv));
+ }
+}
+
+struct addr {
+ struct addr *next;
+ int mode;
+ char * val;
+};
+
+struct addr **
+addaddr(struct addr **endp, int mode, const char *a)
+{
+ struct addr *na;
+
+ if (((na = malloc(sizeof(*na))) == NULL) ||
+ ((na->val = malloc(strlen(a) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ na->mode = mode;
+ memcpy(na->val, a, strlen(a) + 1);
+ na->next = NULL;
+ *endp = na;
+ return (&na->next);
+}
+
+struct topic {
+ struct topic *next;
+ char * val;
+};
+
+struct topic **
+addtopic(struct topic **endp, const char *s)
+{
+ struct topic *t;
+
+ if (((t = malloc(sizeof(*t))) == NULL) ||
+ ((t->val = malloc(strlen(s) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ memcpy(t->val, s, strlen(s) + 1);
+ t->next = NULL;
+ *endp = t;
+ return (&t->next);
+}
+
+void
+printmsg(char *buf, size_t len)
+{
+ switch (format) {
+ case OPT_BLANK: // Suppress contents.
+ return;
+ case OPT_RAW: // Just emit raw contents.
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_ASCII: // ASCII, but non-ASCII substituted with '.'
+ for (size_t i = 0; i < len; i++) {
+ if (isprint(buf[i])) {
+ putchar(buf[i]);
+ } else {
+ putchar('.');
+ }
+ }
+ break;
+ case OPT_QUOTED: // C style quoted strings.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ switch (buf[i]) {
+ case '\n':
+ putchar('\\');
+ putchar('n');
+ break;
+ case '\r':
+ putchar('\\');
+ putchar('r');
+ break;
+ case '\t':
+ putchar('\\');
+ putchar('t');
+ break;
+ case '"':
+ case '\\':
+ putchar('\\');
+ putchar(buf[i]);
+ break;
+ default:
+ if (isprint(buf[i])) {
+ fputc(buf[i], stdout);
+ } else {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ }
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then
+ // raw.
+ if (len < 256) {
+ putchar('\xc4');
+ putchar(len & 0xff);
+ } else if (len < 65536) {
+ putchar('\xc5');
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ } else {
+ putchar('\xc6');
+ putchar((len >> 24) & 0xff);
+ putchar((len >> 16) & 0xff);
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ }
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_HEX: // Hex, quoted C string encoded with hex
+ // literals.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ }
+ fflush(stdout);
+}
+
+void
+recvloop(nng_socket sock)
+{
+ int iters = 0;
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+ rv = nng_recvmsg(sock, &msg, 0);
+ iters++;
+ switch (rv) {
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // Either a regular timeout, or we reached the
+ // end of an event like a survey completing.
+ return;
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ default:
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ }
+}
+
+void
+resploop(nng_socket sock)
+{
+ int iters = 0;
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ if (rv != 0) {
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_clear(msg);
+ if ((rv = nng_msg_append(msg, data, datalen)) != 0) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+ }
+
+ nng_msleep(200);
+}
+
+void
+sendloop(nng_socket sock)
+{
+ int iters = 0;
+
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ iters++;
+ // By default, we don't loop.
+ if (((interval < 0) && (count == 0)) ||
+ ((count > 0) && (iters >= count))) {
+ break;
+ }
+
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course
+ // if we took more than the interval here, then we skip
+ // the sleep altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+ // Wait a bit to give queues a chance to drain.
+ nng_msleep(200);
+}
+
+void
+sendrecv(nng_socket sock)
+{
+ int iters = 0;
+
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+
+ // We start by sending a message, then we receive iteratively
+ // but we set a concrete timeout if interval is set, to ensure
+ // that we exit the receive loop, and can continue.
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ if ((interval < 0) && (count == 0)) {
+ // Only one iteration through.
+ recvloop(sock);
+ break;
+ }
+
+ // We would like to use recvloop, but we need to reset
+ // our timeout each time, as the timer counts down
+ // towards zero. Furthermore, with survey, we don't
+ // want to increment the iteration count.
+
+ for (;;) {
+ delta = (nng_duration)(nng_clock() - start);
+
+ nng_duration expire = interval - delta;
+ if ((recvtimeo >= 0) && (expire > recvtimeo)) {
+ expire = recvtimeo;
+ }
+ rv =
+ nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, expire);
+ if (rv != 0) {
+ fatal("Cannot set recv timeout: %s",
+ nng_strerror(rv));
+ }
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ switch (rv) {
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // We're done receiving
+ break;
+ default:
+ fatal("Cannot receive: %s", nng_strerror(rv));
+ break;
+ }
+
+ // We're done receiving, break out.
+ break;
+ }
+
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course
+ // if we took more than the interval here, then we skip
+ // the sleep altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+}
+
+int
+main(int ac, char **av)
+{
+ int idx;
+ char * arg;
+ int val;
+ int rv;
+ char scratch[512];
+ struct addr * addrs = NULL;
+ struct addr ** addrend;
+ struct topic * topics = NULL;
+ struct topic **topicend;
+ nng_socket sock;
+ int port;
+
+ idx = 1;
+ addrend = &addrs;
+ topicend = &topics;
+
+ while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) {
+ switch (val) {
+ case OPT_HELP:
+ help();
+ break;
+ case OPT_REQ0:
+ case OPT_REP0:
+ case OPT_SUB0:
+ case OPT_PUB0:
+ case OPT_BUS0:
+ case OPT_SURVEY0:
+ case OPT_RESPOND0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_PUSH0:
+ case OPT_PULL0:
+ if (proto != 0) {
+ fatal("Only one protocol may be "
+ "specified.");
+ }
+ proto = val;
+ break;
+ case OPT_DIAL:
+ case OPT_LISTEN:
+ addrend = addaddr(addrend, val, arg);
+ break;
+ case OPT_DIAL_LOCAL:
+ case OPT_LISTEN_LOCAL:
+ port = intarg(arg, 65536);
+ snprintf(scratch, sizeof(scratch),
+ "tcp://127.0.0.1:%d", port);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_DIAL_IPC:
+ case OPT_LISTEN_IPC:
+ snprintf(scratch, sizeof(scratch), "ipc:///%s", arg);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_COUNT:
+ count = intarg(arg, 0x7fffffff);
+ break;
+ case OPT_SUBSCRIBE:
+ topicend = addtopic(topicend, arg);
+ break;
+ case OPT_VERBOSE:
+ case OPT_SILENT:
+ verbose = val;
+ break;
+ case OPT_DELAY:
+ delay = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_INTERVAL:
+ interval = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_SND_TIMEO:
+ sendtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_RCV_TIMEO:
+ recvtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_RECVMAXSZ:
+ recvmaxsz = intarg(arg, 0x7fffffff);
+ if (recvmaxsz == 0) {
+ recvmaxsz = 0x7fffffff;
+ }
+ break;
+ case OPT_COMPAT:
+ compat = 1;
+ break;
+ case OPT_ASYNC:
+ async = NNG_FLAG_NONBLOCK;
+ break;
+ case OPT_ASCII:
+ case OPT_RAW:
+ case OPT_QUOTED:
+ case OPT_MSGPACK:
+ case OPT_HEX:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ format = val;
+ break;
+ case OPT_FORMAT:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ if (strcmp(arg, "no") == 0) {
+ format = OPT_BLANK;
+ } else if (strcmp(arg, "ascii") == 0) {
+ format = OPT_ASCII;
+ } else if (strcmp(arg, "hex") == 0) {
+ format = OPT_HEX;
+ } else if (strcmp(arg, "quoted") == 0) {
+ format = OPT_QUOTED;
+ } else if (strcmp(arg, "raw") == 0) {
+ format = OPT_RAW;
+ } else if (strcmp(arg, "msgpack") == 0) {
+ format = OPT_MSGPACK;
+ } else {
+ fatal("Invalid format specified.");
+ }
+ break;
+ case OPT_FILE:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be "
+ "specified "
+ "only once.");
+ }
+ loadfile(arg, &data, &datalen);
+ break;
+ case OPT_DATA:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be "
+ "specified "
+ "only once.");
+ }
+ if ((data = malloc(strlen(arg) + 1)) == NULL) {
+ fatal("Out of memory.");
+ }
+ memcpy(data, arg, strlen(arg) + 1);
+ datalen = strlen(arg);
+ break;
+ case OPT_CACERT:
+ if (cacert != NULL) {
+ fatal("CA Certificate (--cacert) may be "
+ "specified only once.");
+ }
+ loadfile(arg, &cacert, &cacertlen);
+ break;
+ case OPT_KEYFILE:
+ if (keyfile != NULL) {
+ fatal(
+ "Key (--key) may be specified only once.");
+ }
+ loadfile(arg, &keyfile, &keylen);
+ break;
+ case OPT_CERTFILE:
+ if (certfile != NULL) {
+ fatal("Cert (--cert) may be specified "
+ "only "
+ "once.");
+ }
+ loadfile(arg, &certfile, &certlen);
+ break;
+ case OPT_ZTHOME:
+ zthome = arg;
+ break;
+ case OPT_INSECURE:
+ insecure = 1;
+ break;
+ case OPT_VERSION:
+ printf("%s\n", nng_version());
+ exit(0);
+ }
+ }
+ switch (rv) {
+ case NNG_EINVAL:
+ fatal("Option %s is invalid.", av[idx]);
+ break;
+ case NNG_EAMBIGUOUS:
+ fatal("Option %s is ambiguous (specify in full).", av[idx]);
+ break;
+ case NNG_ENOARG:
+ fatal("Option %s requires argument.", av[idx]);
+ break;
+ default:
+ break;
+ }
+
+ if (addrs == NULL) {
+ fatal("No address specified.");
+ }
+
+ if (compat) {
+ if (async != 0) {
+ fatal("Options --async and --compat are "
+ "incompatible.");
+ }
+ if (count != 0) {
+ fatal("Options --count and --compat are "
+ "incompatible.");
+ }
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR0;
+ }
+ if (proto == OPT_PAIR1) {
+ fatal("Protocol does not support --compat.");
+ }
+ async = NNG_FLAG_NONBLOCK;
+ } else {
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR1;
+ }
+ }
+ if (proto == OPT_SUB0) {
+ if (topics == NULL) {
+ (void) addtopic(topicend, ""); // subscribe to all
+ }
+ } else {
+ if (topics != NULL) {
+ fatal("Protocol does not support --subscribe.");
+ }
+ }
+
+ switch (proto) {
+ case OPT_PULL0:
+ case OPT_SUB0:
+ if (data != NULL) {
+ fatal("Protocol does not support --file or "
+ "--data.");
+ }
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_PUB0:
+ if (format != 0) {
+ fatal("Protocol does not support --format "
+ "options.");
+ }
+ if (data == NULL) {
+ fatal("Protocol requires either --file or "
+ "--data.");
+ }
+ break;
+ case OPT_SURVEY0:
+ case OPT_REQ0:
+ if (data == NULL) {
+ fatal("Protocol requires either --file or "
+ "--data.");
+ }
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_BUS0:
+ if ((interval >= 0) && (data == NULL)) {
+ fatal("Option --interval requires either "
+ "--file or --data.");
+ }
+ break;
+ default:
+ // Will be caught in next switch statement.
+ break;
+ }
+
+ switch (proto) {
+ case OPT_REQ0:
+#ifdef NNG_HAVE_REQ0
+ rv = nng_req0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_REP0:
+#ifdef NNG_HAVE_REP0
+ rv = nng_rep0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_SUB0:
+#ifdef NNG_HAVE_SUB0
+ rv = nng_sub0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PUB0:
+#ifdef NNG_HAVE_PUB0
+ rv = nng_pub0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PAIR0:
+#ifdef NNG_HAVE_PAIR0
+ rv = nng_pair0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PAIR1:
+#ifdef NNG_HAVE_PAIR1
+ rv = nng_pair1_open(&sock);
+#else
+ fatal("Protocol not supported");
+#endif
+ break;
+ case OPT_BUS0:
+#ifdef NNG_HAVE_BUS0
+ rv = nng_bus0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PUSH0:
+#ifdef NNG_HAVE_PUSH0
+ rv = nng_push0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_PULL0:
+#ifdef NNG_HAVE_PULL0
+ rv = nng_pull0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_SURVEY0:
+#ifdef NNG_HAVE_SURVEYOR0
+ rv = nng_surveyor0_open(&sock);
+#else
+ fatal("Protocol not supported.");
+#endif
+ break;
+ case OPT_RESPOND0:
+#ifdef NNG_HAVE_RESPONDENT0
+ rv = nng_respondent0_open(&sock);
+#else
+ fatal("Protocol not supported");
+#endif
+ break;
+ case 0:
+ default:
+ fatal("No protocol specified.");
+ break;
+ }
+ if (rv != 0) {
+ fatal("Unable to open socket: %s", nng_strerror(rv));
+ }
+
+ for (struct topic *t = topics; t != NULL; t = t->next) {
+ rv = nng_socket_set(
+ sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val));
+ if (rv != 0) {
+ fatal("Unable to subscribe to topic %s: %s", t->val,
+ nng_strerror(rv));
+ }
+ }
+
+ if ((sendtimeo > 0) &&
+ ((rv = nng_socket_set_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) !=
+ 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+ if ((recvtimeo > 0) &&
+ ((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) !=
+ 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+
+ if ((recvmaxsz >= 0) &&
+ ((rv = nng_socket_set_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) !=
+ 0)) {
+ fatal("Unable to set max receive size: %s", nng_strerror(rv));
+ }
+
+ for (struct addr *a = addrs; a != NULL; a = a->next) {
+ char * act;
+ nng_listener l;
+ nng_dialer d;
+ nng_tls_config *tls;
+ switch (a->mode) {
+ case OPT_DIAL:
+ case OPT_DIAL_IPC:
+ case OPT_DIAL_LOCAL:
+ rv = nng_dialer_create(&d, sock, a->val);
+ if (rv != 0) {
+ fatal("Unable to create dialer for %s: %s",
+ a->val, nng_strerror(rv));
+ }
+ rv = nng_dialer_getopt_ptr(
+ d, NNG_OPT_TLS_CONFIG, (void **) &tls);
+ if (rv == 0) {
+ configtls(tls);
+ } else if (rv != NNG_ENOTSUP) {
+ fatal("Unable to get TLS config: %s",
+ nng_strerror(rv));
+ }
+ if (zthome != NULL) {
+ rv = nng_dialer_set(d, NNG_OPT_ZT_HOME,
+ zthome, strlen(zthome) + 1);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ fatal("Unable to set ZT home: %s",
+ nng_strerror(rv));
+ }
+ }
+ rv = nng_dialer_start(d, async);
+ act = "dial";
+ if ((rv == 0) && (verbose == OPT_VERBOSE)) {
+ char ustr[256];
+ size_t sz;
+ sz = sizeof(ustr);
+ if (nng_dialer_getopt(
+ d, NNG_OPT_URL, ustr, &sz) == 0) {
+ printf("Connected to: %s\n", ustr);
+ }
+ }
+ break;
+ case OPT_LISTEN:
+ case OPT_LISTEN_IPC:
+ case OPT_LISTEN_LOCAL:
+ rv = nng_listener_create(&l, sock, a->val);
+ if (rv != 0) {
+ fatal("Unable to create listener for %s: %s",
+ a->val, nng_strerror(rv));
+ }
+ rv = nng_listener_getopt_ptr(
+ l, NNG_OPT_TLS_CONFIG, (void **) &tls);
+ if (rv == 0) {
+ configtls(tls);
+ } else if (rv != NNG_ENOTSUP) {
+ fatal("Unable to get TLS config: %s",
+ nng_strerror(rv));
+ }
+ if (zthome != NULL) {
+ rv = nng_listener_set(l, NNG_OPT_ZT_HOME,
+ zthome, strlen(zthome) + 1);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ fatal("Unable to set ZT home: %s",
+ nng_strerror(rv));
+ }
+ }
+ rv = nng_listener_start(l, async);
+ act = "listen";
+ if ((rv == 0) && (verbose == OPT_VERBOSE)) {
+ char ustr[256];
+ size_t sz;
+ sz = sizeof(ustr);
+ if (nng_listener_getopt(
+ l, NNG_OPT_URL, ustr, &sz) == 0) {
+ printf("Listening at: %s\n", ustr);
+ }
+ }
+ break;
+ default:
+ fatal("Invalid address mode! (Bug!)");
+ }
+
+ if (rv != 0) {
+ fatal("Unable to %s on %s: %s", act, a->val,
+ nng_strerror(rv));
+ }
+ }
+
+ switch (proto) {
+ case OPT_SUB0:
+ case OPT_PULL0:
+ recvloop(sock);
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (data == NULL) {
+ recvloop(sock);
+ } else {
+ resploop(sock);
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_PUB0:
+ sendloop(sock);
+ break;
+ case OPT_BUS0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ if (data != NULL) {
+ sendrecv(sock);
+ } else {
+ recvloop(sock);
+ }
+ break;
+ case OPT_REQ0:
+ case OPT_SURVEY0:
+ sendrecv(sock);
+ break;
+ default:
+ fatal("Protocol handling unimplemented.");
+ }
+
+ exit(0);
+}
diff --git a/src/tools/nngcat/nngcat_ambiguous_test.sh b/src/tools/nngcat/nngcat_ambiguous_test.sh
new file mode 100755
index 00000000..414b6d19
--- /dev/null
+++ b/src/tools/nngcat/nngcat_ambiguous_test.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+CMD="${NNGCAT} --re --dial=tcp://127.0.0.1:27272"
+
+echo -n "Verify ambiguous options fail: "
+if ${CMD} >/dev/null 2>&1
+then
+ echo "Failed: ambigous accepted"
+ exit 1
+fi
+x=$(${CMD} 2>&1)
+if [[ ${x} =~ "ambiguous" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: error did not match"
+exit 1
diff --git a/src/tools/nngcat/nngcat_async_test.sh b/src/tools/nngcat/nngcat_async_test.sh
new file mode 100755
index 00000000..2b03e522
--- /dev/null
+++ b/src/tools/nngcat/nngcat_async_test.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_async_test"
+
+echo -n "Verify async connect: "
+
+${NNGCAT} --async -d 1 --connect ${ADDR} --req0 -D "ping" &
+
+
+answer=$( ${NNGCAT} --rep0 --recv-timeout=3 --listen ${ADDR} -D "pong" --ascii 2>/dev/null )
+
+if [[ ${answer} == "ping" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: req did not match"
+echo "RES: $answer"
+exit 1
diff --git a/src/tools/nngcat/nngcat_dup_proto_test.sh b/src/tools/nngcat/nngcat_dup_proto_test.sh
new file mode 100755
index 00000000..1513d01c
--- /dev/null
+++ b/src/tools/nngcat/nngcat_dup_proto_test.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify only a single protocol is allowed: "
+if ${NNGCAT} --pub0 --sub0 --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1
+then
+ echo "Failed: duplicate protocols accepted"
+ exit 1
+fi
+echo "pass"
+exit 0
diff --git a/src/tools/nngcat/nngcat_help_test.sh b/src/tools/nngcat/nngcat_help_test.sh
new file mode 100755
index 00000000..95ed9e3e
--- /dev/null
+++ b/src/tools/nngcat/nngcat_help_test.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify nngcat help: "
+if ${NNGCAT} --help >/dev/null 2>&1
+then
+ echo "Failed: help didn't return 1"
+ exit 1
+fi
+x=$(${NNGCAT} --help 2>&1)
+if [[ ${x} =~ "Usage:" ]]
+then
+ echo "pass"
+ exit 0
+fi
+
+echo "Failed: usage did not match"
+echo "Output:"
+echo "$x"
+exit 1
diff --git a/src/tools/nngcat/nngcat_incompat_test.sh b/src/tools/nngcat/nngcat_incompat_test.sh
new file mode 100755
index 00000000..128b57ba
--- /dev/null
+++ b/src/tools/nngcat/nngcat_incompat_test.sh
@@ -0,0 +1,73 @@
+#!/bin/sh
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo "Verify incompatible options: "
+
+# Just bind something to this so other ones connect
+${NNGCAT} --pull0 --ascii -X /tmp/bogusipc &
+pid=$!
+
+trap "kill $pid && wait $pid 2>/dev/null" 0
+
+echo -n " --subscribe doesn't work with non-sub"
+if ${NNGCAT} --req0 -x /tmp/bogusipc --subscribe=oops >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --interval doesn't work with recv only: "
+if ${NNGCAT} --interval 1 --pull -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --pair1 doesn't work with --compat: "
+if ${NNGCAT} --compat --pair1 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --count doesn't work with --compat: "
+if ${NNGCAT} --compat --count=1 --pair0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --count fails with non-integer: "
+if ${NNGCAT} --count=xyz --pair0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo -n " --file fails with non-existing file: "
+if ${NNGCAT} --async --file=/nosuchfilehere --push0 -x /tmp/bogusipc >/dev/null 2>&1
+then
+ echo "fail"
+ exit 1
+fi
+echo "pass"
+
+echo "PASS."
+exit 0
diff --git a/src/tools/nngcat/nngcat_need_proto_test.sh b/src/tools/nngcat/nngcat_need_proto_test.sh
new file mode 100755
index 00000000..d6733cad
--- /dev/null
+++ b/src/tools/nngcat/nngcat_need_proto_test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+
+echo -n "Verify a protocol is needed: "
+if ${NNGCAT} --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1
+then
+ echo "Failed: protocol should be required"
+ exit 1
+fi
+echo "pass"
+exit 0
diff --git a/src/tools/nngcat/nngcat_pubsub_test.sh b/src/tools/nngcat/nngcat_pubsub_test.sh
new file mode 100755
index 00000000..b9ba90ed
--- /dev/null
+++ b/src/tools/nngcat/nngcat_pubsub_test.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_pub_sub_test"
+OUTPUT=/tmp/nngcat_pubsub_test.$$.out
+
+echo -n "Verify pub sub: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=3 --recv-timeout=20 --sub0 --subscribe=one --subscribe=two --quoted > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "xyz" &
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "none swam" &
+# these we care about, due to ordering (checksum) so run them serially
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "one flew"
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "twofer test"
+${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "one more"
+
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+if [[ ${sum} == 3929078614 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})"
+echo "OUTPUT:"
+cat ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_recvmaxsz_test.sh b/src/tools/nngcat/nngcat_recvmaxsz_test.sh
new file mode 100755
index 00000000..b5d4ff4a
--- /dev/null
+++ b/src/tools/nngcat/nngcat_recvmaxsz_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_recvmaxsz_test"
+OUTPUT=/tmp/nngcat_recvmaxsz_test.$$.out
+
+echo -n "Verify maximum receive size: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=3 --recv-maxsz=5 --pull0 --quoted > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} --connect ${ADDR} --push0 --data "one"
+${NNGCAT} --connect ${ADDR} --push0 --data "55555"
+${NNGCAT} --connect ${ADDR} --push0 --data "666666"
+${NNGCAT} --connect ${ADDR} --push0 --data "7777777"
+${NNGCAT} --connect ${ADDR} --push0 --data "88888"
+
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+# This matches 3 lines of "one", "55555", "88888".
+if [[ ${sum} == 4122906158 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})"
+echo "OUTPUT:"
+cat ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_stdin_pipe_test.sh b/src/tools/nngcat/nngcat_stdin_pipe_test.sh
new file mode 100755
index 00000000..5fec0ab7
--- /dev/null
+++ b/src/tools/nngcat/nngcat_stdin_pipe_test.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+# Copyright 2020 Lager Data, Inc. <support@lagerdata.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_stdin_pipe_test"
+OUTPUT=/tmp/nngcat_stdin_pipe_test.$$.out
+
+echo -n "Verify reading from stdin pipe: "
+
+trap "rm $OUTPUT" 0
+
+${NNGCAT} --listen ${ADDR} --count=1 --recv-timeout=3 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null &
+bgid=$!
+
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+echo "hello world" | ${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file -
+wait "$bgid" 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+# This matches "hello world\n" since echo adds a trailing newline
+if [[ ${sum} == 3733384285 ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted 3733384285 got ${sum})"
+echo "OUTPUT:"
+ls -la ${OUTPUT}
+
+exit 1
diff --git a/src/tools/nngcat/nngcat_unlimited_test.sh b/src/tools/nngcat/nngcat_unlimited_test.sh
new file mode 100755
index 00000000..0486b9b1
--- /dev/null
+++ b/src/tools/nngcat/nngcat_unlimited_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+NNGCAT=${NNGCAT:=$1}
+NNGCAT=${NNGCAT:-./nngcat}
+ADDR="ipc:///tmp/nngcat_unlimited_test"
+INPUT=/tmp/nngcat_unlimited_test.$$.in
+OUTPUT=/tmp/nngcat_unlimited_test.$$.out
+
+echo -n "Verify unlimited receive size: "
+
+trap "rm $OUTPUT $INPUT" 0
+
+# 4 MB
+dd if=/dev/urandom of=${INPUT} bs=1024 count=4096 >/dev/null 2>&1
+goodsum=$(cksum ${INPUT})
+goodsum=${goodsum%% *}
+
+${NNGCAT} --listen ${ADDR} --count=1 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null &
+sleep 1
+# for speed of execution, run these in the background, they should be ignored
+${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file ${INPUT}
+wait $bgid 2>/dev/null
+
+sum=$(cksum ${OUTPUT})
+sum=${sum%% *}
+
+if [[ ${sum} == ${goodsum} ]]
+then
+ echo "pass"
+ exit 0
+fi
+echo "FAIL: Checksum failed (Wanted ${goodsum} got ${sum})"
+echo "OUTPUT:"
+ls -la ${OUTPUT}
+
+exit 1
diff --git a/src/tools/perf/CMakeLists.txt b/src/tools/perf/CMakeLists.txt
new file mode 100644
index 00000000..135544bb
--- /dev/null
+++ b/src/tools/perf/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Build performance tests.
+
+nng_directory(perf)
+
+if (NNG_TESTS)
+ macro (add_nng_perf NAME)
+ add_executable (${NAME} perf.c)
+ target_link_libraries (${NAME} nng nng_private)
+ endmacro (add_nng_perf)
+
+ add_nng_perf(remote_lat)
+ add_nng_perf(local_lat)
+ add_nng_perf(local_thr)
+ add_nng_perf(remote_thr)
+ add_nng_perf(inproc_thr)
+ add_nng_perf(inproc_lat)
+
+ add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000)
+ set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30)
+
+ add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000)
+ set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30)
+
+ add_executable (pubdrop pubdrop.c)
+ target_link_libraries(pubdrop nng nng_private)
+endif ()
diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c
new file mode 100644
index 00000000..accac621
--- /dev/null
+++ b/src/tools/perf/perf.c
@@ -0,0 +1,662 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/options.h>
+#include <nng/supplemental/util/platform.h>
+
+static void die(const char *, ...);
+static int
+no_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Protocol not supported in this build!");
+ return (NNG_ENOTSUP);
+}
+
+typedef int (*open_func)(nng_socket *);
+
+static open_func open_server = no_open;
+static open_func open_client = no_open;
+
+#if defined(NNG_HAVE_PAIR1)
+#include <nng/protocol/pair1/pair.h>
+#else
+#define nng_pair1_open no_open
+#endif
+
+#if defined(NNG_HAVE_PAIR0)
+#include <nng/protocol/pair0/pair.h>
+#else
+#define nng_pair0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REQ0)
+#include <nng/protocol/reqrep0/req.h>
+#else
+#define nng_req0_open no_open
+#endif
+
+#if defined(NNG_HAVE_REP0)
+#include <nng/protocol/reqrep0/rep.h>
+#else
+#define nng_rep0_open no_open
+#endif
+
+#if defined(NNG_HAVE_BUS0)
+#include <nng/protocol/bus0/bus.h>
+#else
+#define nng_bus0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PULL0)
+#include <nng/protocol/pipeline0/pull.h>
+#else
+#define nng_pull0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUSH0)
+#include <nng/protocol/pipeline0/push.h>
+#else
+#define nng_push0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#else
+#define nng_pub0_open no_open
+#endif
+
+#if defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/sub.h>
+#else
+#define nng_sub0_open no_open
+#endif
+
+enum options {
+ OPT_PAIR0 = 1,
+ OPT_PAIR1,
+ OPT_REQREP0,
+ OPT_PUBSUB0,
+ OPT_PIPELINE0,
+ OPT_SURVEY0,
+ OPT_BUS0,
+ OPT_URL,
+};
+
+// These are not universally supported by the variants yet.
+static nng_optspec opts[] = {
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
+ { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
+ { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void latency_client(const char *, size_t, int);
+static void latency_server(const char *, size_t, int);
+static void throughput_client(const char *, size_t, int);
+static void throughput_server(const char *, size_t, int);
+static void do_remote_lat(int argc, char **argv);
+static void do_local_lat(int argc, char **argv);
+static void do_remote_thr(int argc, char **argv);
+static void do_local_thr(int argc, char **argv);
+static void do_inproc_thr(int argc, char **argv);
+static void do_inproc_lat(int argc, char **argv);
+static void die(const char *, ...);
+
+// perf implements the same performance tests found in the standard
+// nanomsg & mangos performance tests. As with mangos, the decision
+// about which test to run is determined by the program name (ARGV[0}])
+// that it is run under.
+//
+// Options are:
+//
+// - remote_lat - remote latency side (client, aka latency_client)
+// - local_lat - local latency side (server, aka latency_server)
+// - local_thr - local throughput side
+// - remote_thr - remote throughput side
+// - inproc_lat - inproc latency
+// - inproc_thr - inproc throughput
+//
+
+bool
+matches(const char *arg, const char *name)
+{
+ const char *ptr = arg;
+ const char *x;
+
+ while (((x = strchr(ptr, '/')) != NULL) ||
+ ((x = strchr(ptr, '\\')) != NULL) ||
+ ((x = strchr(ptr, ':')) != NULL)) {
+ ptr = x + 1;
+ }
+ for (;;) {
+ if (*name == '\0') {
+ break;
+ }
+ if (tolower(*ptr) != *name) {
+ return (false);
+ }
+ ptr++;
+ name++;
+ }
+
+ switch (*ptr) {
+ case '\0':
+ /* FALLTHROUGH*/
+ case '.': // extension; ignore it.
+ return (true);
+ default: // some other trailing bit.
+ return (false);
+ }
+}
+
+const int PAIR0 = 0;
+const int PAIR1 = 1;
+const int REQREP = 2;
+
+int
+main(int argc, char **argv)
+{
+ char *prog;
+
+#if defined(NNG_HAVE_PAIR1)
+ open_server = nng_pair1_open;
+ open_client = nng_pair1_open;
+#elif defined(NNG_HAVE_PAIR0)
+ open_server = nng_pair0_open;
+ open_client = nng_pair0_open;
+#endif
+
+ // Allow -m <remote_lat> or whatever to override argv[0].
+ if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
+ prog = argv[2];
+ argv += 3;
+ argc -= 3;
+ } else {
+ prog = argv[0];
+ argc--;
+ argv++;
+ }
+ if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
+ do_remote_lat(argc, argv);
+ } else if (matches(prog, "local_lat") ||
+ matches(prog, "latency_server")) {
+ do_local_lat(argc, argv);
+ } else if (matches(prog, "local_thr") ||
+ matches(prog, "throughput_server")) {
+ do_local_thr(argc, argv);
+ } else if (matches(prog, "remote_thr") ||
+ matches(prog, "throughput_client")) {
+ do_remote_thr(argc, argv);
+ } else if (matches(prog, "inproc_thr")) {
+ do_inproc_thr(argc, argv);
+ } else if (matches(prog, "inproc_lat")) {
+ do_inproc_lat(argc, argv);
+ } else {
+ die("Unknown program mode? Use -m <mode>.");
+ }
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a positive number less than around a billion.
+ if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+void
+do_local_lat(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_lat(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "round-trips");
+
+ latency_client(argv[0], msgsize, trips);
+}
+
+void
+do_local_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: local_thr <listen-addr> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_server(argv[0], msgsize, trips);
+}
+
+void
+do_remote_thr(int argc, char **argv)
+{
+ int msgsize;
+ int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_thr <connect-to> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_client(argv[0], msgsize, trips);
+}
+
+struct inproc_args {
+ int count;
+ int msgsize;
+ const char *addr;
+ void (*func)(const char *, size_t, int);
+};
+
+static void
+do_inproc(void *args)
+{
+ struct inproc_args *ia = args;
+
+ ia->func(ia->addr, ia->msgsize, ia->count);
+}
+
+void
+do_inproc_lat(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int val;
+ int optidx;
+ char * arg;
+ char * addr;
+
+ addr = "inproc://latency_test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_BUS0:
+ open_client = nng_bus0_open;
+ open_server = nng_bus0_open;
+ break;
+
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_lat <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = latency_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ latency_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+do_inproc_thr(int argc, char **argv)
+{
+ nng_thread * thr;
+ struct inproc_args ia;
+ int rv;
+ int optidx;
+ int val;
+ char * arg;
+ char * addr = "inproc://throughput-test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+#if 0
+ // For now these protocols simply do not work with
+ // throughput -- they don't work with backpressure properly.
+ // In the future we should support synchronizing in the same
+ // process, and alerting the sender both on completion of
+ // a single message, and on completion of all messages.
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+#endif
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_PIPELINE0:
+ open_client = nng_pull0_open;
+ open_server = nng_push0_open;
+ break;
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
+
+ if (argc != 2) {
+ die("Usage: inproc_thr <msg-size> <count>");
+ }
+
+ ia.addr = addr;
+ ia.msgsize = parse_int(argv[0], "message size");
+ ia.count = parse_int(argv[1], "count");
+ ia.func = throughput_server;
+
+ if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
+ die("Cannot create thread: %s", nng_strerror(rv));
+ }
+
+ // Sleep a bit.
+ nng_msleep(100);
+
+ throughput_client(addr, ia.msgsize, ia.count);
+ nng_thread_destroy(thr);
+}
+
+void
+latency_client(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ nng_time start, end;
+ int rv;
+ int i;
+ float total;
+ float latency;
+ if ((rv = open_client(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100);
+
+ if (nng_msg_alloc(&msg, msgsize) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ start = nng_clock();
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ }
+ end = nng_clock();
+
+ nng_msg_free(msg);
+ nng_close(s);
+
+ total = (float) ((end - start)) / 1000;
+ latency = ((float) ((total * 1000000)) / (float) (trips * 2));
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("round trip count: %d\n", trips);
+ printf("average latency: %.3f [us]\n", latency);
+}
+
+void
+latency_server(const char *addr, size_t msgsize, int trips)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ if ((rv = open_server(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < trips; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Wait a bit for things to drain... linger should do this.
+ // 100ms ought to be enough.
+ nng_msleep(100);
+ nng_close(s);
+}
+
+// Our throughput story is quite a mess. Mostly I think because of the poor
+// caching and message reuse. We should probably implement a message pooling
+// API somewhere.
+
+void
+throughput_server(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+ uint64_t start, end;
+ float msgpersec, mbps, total;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+ rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ // Receive first synchronization message.
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ nng_msg_free(msg);
+ start = nng_clock();
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (nng_msg_len(msg) != msgsize) {
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
+ msgsize);
+ }
+ nng_msg_free(msg);
+ }
+ end = nng_clock();
+ // Send a synchronization message (empty) to the other side,
+ // and wait a bit to make sure it goes out the wire.
+ nng_send(s, "", 0, 0);
+ nng_msleep(200);
+ nng_close(s);
+ total = (float) ((end - start)) / 1000;
+ msgpersec = (float) (count) / total;
+ mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
+ printf("total time: %.3f [s]\n", total);
+ printf("message size: %d [B]\n", (int) msgsize);
+ printf("message count: %d\n", count);
+ printf("throughput: %.f [msg/s]\n", msgpersec);
+ printf("throughput: %.3f [Mb/s]\n", mbps);
+}
+
+void
+throughput_client(const char *addr, size_t msgsize, int count)
+{
+ nng_socket s;
+ nng_msg * msg;
+ int rv;
+ int i;
+
+ // We send one extra zero length message to start the timer.
+ count++;
+
+ if ((rv = nng_pair_open(&s)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
+ }
+
+ rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
+ if (rv != 0) {
+ die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Attempt to get the completion indication from the other
+ // side.
+ if (nng_recvmsg(s, &msg, 0) == 0) {
+ nng_msg_free(msg);
+ }
+
+ nng_close(s);
+}
diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c
new file mode 100644
index 00000000..be51bb2f
--- /dev/null
+++ b/src/tools/perf/pubdrop.c
@@ -0,0 +1,325 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/platform.h>
+
+// pubdrop - this is a simple testing utility that lets us measure PUB/SUB
+// performance, including dropped messages, delivery across multiple threads,
+// etc. It actually uses a wild card subscription for now.
+
+#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+
+#else
+
+static void die(const char *, ...);
+
+static int
+nng_pub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Pub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+static int
+nng_sub0_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Sub protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+
+#endif // NNG_HAVE_PUB0....
+
+static void die(const char *, ...);
+static void do_pubdrop(int argc, char **argv);
+static uint64_t nperusec;
+static volatile int x;
+
+void
+work(void)
+{
+ x = rand();
+}
+
+void
+usdelay(unsigned long long nusec)
+{
+ nusec *= nperusec;
+ while (nusec > 0) {
+ work();
+ nusec--;
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ argc--;
+ argv++;
+
+ // We calculate a delay factor to roughly delay 1 usec. We don't
+ // need this to be perfect, just reproducible on the same host.
+ unsigned long cnt = 1000000;
+
+ nng_time beg = nng_clock();
+ for (unsigned long i = 0; i < cnt; i++) {
+ work();
+ }
+ nng_time end = nng_clock();
+ nperusec = cnt / (1000 * (end - beg));
+
+ do_pubdrop(argc, argv);
+}
+
+static void
+die(const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(2);
+}
+
+static int
+parse_int(const char *arg, const char *what)
+{
+ long val;
+ char *eptr;
+
+ val = strtol(arg, &eptr, 10);
+ // Must be a postive number less than around a billion.
+ if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) {
+ die("Invalid %s", what);
+ }
+ return ((int) val);
+}
+
+struct pubdrop_args {
+ const char * addr;
+ bool start;
+ unsigned long long msgsize;
+ unsigned long long count;
+ unsigned long long intvl;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long recvs;
+ nng_time beg;
+ nng_time end;
+ nng_mtx * mtx;
+ nng_cv * cv;
+};
+
+static void
+pub_server(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+
+ if ((rv = nng_pub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+
+ nng_mtx_lock(pa->mtx);
+ while (!pa->start) {
+ nng_cv_wait(pa->cv);
+ }
+ nng_mtx_unlock(pa->mtx);
+
+ start = nng_clock();
+ for (uint64_t i = 0; i < pa->count; i++) {
+ // Unfortunately we need to allocate messages dynamically as we
+ // go. The other option would be to allocate them all up front,
+ // but that could be a rather excessive amount of memory.
+ if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) {
+ die("Message alloc failed");
+ }
+ memcpy(nng_msg_body(msg), &i, sizeof(i));
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ die("Sendmsg: %s", nng_strerror(rv));
+ }
+ // It sure would be nice if we had a usec granularity option
+ // here.
+ if (pa->intvl > 0) {
+ usdelay((unsigned long long) pa->intvl);
+ }
+ }
+
+ end = nng_clock();
+
+ nng_msleep(1000); // drain the queue
+ nng_close(sock);
+
+ nng_mtx_lock(pa->mtx);
+ pa->beg = start;
+ pa->end = end;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+sub_client(void *arg)
+{
+ struct pubdrop_args *pa = arg;
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ unsigned long long recvs;
+ unsigned long long drops;
+ unsigned long long gaps;
+ unsigned long long errs;
+ unsigned long long expect;
+
+ if ((rv = nng_sub0_open(&sock)) != 0) {
+ die("Cannot open sub: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) {
+ die("Cannot listen: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+ if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) {
+ die("setopt: %s", nng_strerror(rv));
+ }
+
+ expect = 0;
+ recvs = drops = gaps = errs = 0;
+
+ while (expect < pa->count) {
+ uint64_t got;
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) {
+ // Closed without receiving the last message
+ drops += (pa->count - expect);
+ gaps++;
+ break;
+ }
+ printf("ERROR: %s\n", nng_strerror(rv));
+ errs++;
+ break;
+ }
+ recvs++;
+ memcpy(&got, nng_msg_body(msg), sizeof(got));
+ nng_msg_free(msg);
+ if (got != expect) {
+ gaps++;
+ if (got > expect) {
+ drops += (got - expect);
+ } else {
+ die("Misordered delivery");
+ }
+ }
+ expect = got + 1;
+ }
+
+ nng_mtx_lock(pa->mtx);
+ pa->drops += drops;
+ pa->errs += errs;
+ pa->recvs += recvs;
+ pa->gaps += gaps;
+ nng_mtx_unlock(pa->mtx);
+}
+
+static void
+do_pubdrop(int argc, char **argv)
+{
+ nng_thread ** thrs;
+ struct pubdrop_args pa;
+ int rv;
+ int nsubs;
+
+ if (argc != 5) {
+ die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> "
+ "<interval>");
+ }
+
+ memset(&pa, 0, sizeof(pa));
+ pa.addr = argv[0];
+ pa.msgsize = parse_int(argv[1], "message size");
+ pa.count = parse_int(argv[2], "count");
+ pa.intvl = parse_int(argv[4], "interval");
+ nsubs = parse_int(argv[3], "#subscribers");
+
+ if (pa.msgsize < sizeof(uint64_t)) {
+ die("Message size too small.");
+ }
+
+ thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1);
+ if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) ||
+ ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) {
+ die("Startup: %s\n", nng_strerror(rv));
+ };
+
+ if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) {
+ die("Cannot create pub thread: %s", nng_strerror(rv));
+ }
+
+ nng_msleep(100); // give time for listener to start...
+
+ for (int i = 0; i < nsubs; i++) {
+ if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) !=
+ 0) {
+ die("Cannot create sub thread: %s", nng_strerror(rv));
+ }
+ }
+
+ // Sleep a bit for conns to establish.
+ nng_msleep(2000);
+ nng_mtx_lock(pa.mtx);
+ pa.start = true;
+ nng_cv_wake(pa.cv);
+ nng_mtx_unlock(pa.mtx);
+
+ for (int i = 0; i < nsubs + 1; i++) {
+ nng_thread_destroy(thrs[i]);
+ }
+
+ nng_mtx_lock(pa.mtx);
+
+ unsigned long long expect = nsubs * pa.count;
+ unsigned long long missing = nsubs ? expect - pa.recvs : 0;
+ double dur = (pa.end - pa.beg) / 1000.0;
+
+ printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n",
+ pa.count, dur, pa.count / dur);
+ printf("Expected %llu messages total\n", expect);
+ printf("Received %llu messages total\n", pa.recvs);
+ printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur);
+ printf("Errors %llu total\n", pa.errs);
+ printf("Reported %llu dropped messages in %llu gaps\n", pa.drops,
+ pa.gaps);
+ printf("Dropped %llu messages (missing)\n", missing);
+ printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0);
+
+ nng_mtx_unlock(pa.mtx);
+}