diff options
Diffstat (limited to 'src/tools')
| -rw-r--r-- | src/tools/CMakeLists.txt | 13 | ||||
| -rw-r--r-- | src/tools/nngcat/CMakeLists.txt | 41 | ||||
| -rw-r--r-- | src/tools/nngcat/nngcat.c | 1217 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_ambiguous_test.sh | 31 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_async_test.sh | 32 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_dup_proto_test.sh | 23 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_help_test.sh | 32 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_incompat_test.sh | 73 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_need_proto_test.sh | 23 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_pubsub_test.sh | 45 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_recvmaxsz_test.sh | 46 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_stdin_pipe_test.sh | 44 | ||||
| -rwxr-xr-x | src/tools/nngcat/nngcat_unlimited_test.sh | 46 | ||||
| -rw-r--r-- | src/tools/perf/CMakeLists.txt | 35 | ||||
| -rw-r--r-- | src/tools/perf/perf.c | 662 | ||||
| -rw-r--r-- | src/tools/perf/pubdrop.c | 325 |
16 files changed, 2688 insertions, 0 deletions
diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt new file mode 100644 index 00000000..0487d9ec --- /dev/null +++ b/src/tools/CMakeLists.txt @@ -0,0 +1,13 @@ +# +# Copyright 2021 Staysail Systems, Inc. <info@staystail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +nng_directory(tools) + +add_subdirectory(nngcat) +add_subdirectory(perf) diff --git a/src/tools/nngcat/CMakeLists.txt b/src/tools/nngcat/CMakeLists.txt new file mode 100644 index 00000000..d522a045 --- /dev/null +++ b/src/tools/nngcat/CMakeLists.txt @@ -0,0 +1,41 @@ +# +# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +nng_directory(nngcat) + +if (NNG_ENABLE_NNGCAT) + add_executable(nngcat nngcat.c) + target_include_directories(nngcat PUBLIC ${PROJECT_SOURCE_DIR}/src) + target_link_libraries(nngcat nng nng_private) + install(TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + COMPONENT Tools) + + if (NNG_TESTS AND CMAKE_SYSTEM_NAME MATCHES "Linux") + include(FindUnixCommands) + endif () + # TODO: This should be refactored to use a test driver. + # We only run the tests on Linux for now, because the Darwin CI/CD is too brittle. + if (NNG_TESTS AND BASH AND CMAKE_SYSTEM_NAME MATCHES "Linux") + macro(add_nngcat_test NAME TIMEOUT) + add_test(NAME nng.${NAME} COMMAND ${BASH} ${CMAKE_CURRENT_SOURCE_DIR}/${NAME}_test.sh $<TARGET_FILE:nngcat>) + set_tests_properties(nng.${NAME} PROPERTIES TIMEOUT ${TIMEOUT}) + endmacro() + add_nngcat_test(nngcat_async 10) + add_nngcat_test(nngcat_ambiguous 10) + add_nngcat_test(nngcat_need_proto 10) + add_nngcat_test(nngcat_dup_proto 10) + add_nngcat_test(nngcat_help 10) + add_nngcat_test(nngcat_incompat 10) + add_nngcat_test(nngcat_pubsub 20) + add_nngcat_test(nngcat_recvmaxsz 20) + add_nngcat_test(nngcat_unlimited 20) + add_nngcat_test(nngcat_stdin_pipe 20) + endif () +endif () diff --git a/src/tools/nngcat/nngcat.c b/src/tools/nngcat/nngcat.c new file mode 100644 index 00000000..9e106069 --- /dev/null +++ b/src/tools/nngcat/nngcat.c @@ -0,0 +1,1217 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2020 Lager Data, Inc. <support@lagerdata.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <errno.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/protocol/bus0/bus.h> +#include <nng/protocol/pair0/pair.h> +#include <nng/protocol/pair1/pair.h> +#include <nng/protocol/pipeline0/pull.h> +#include <nng/protocol/pipeline0/push.h> +#include <nng/protocol/pubsub0/pub.h> +#include <nng/protocol/pubsub0/sub.h> +#include <nng/protocol/reqrep0/rep.h> +#include <nng/protocol/reqrep0/req.h> +#include <nng/protocol/survey0/respond.h> +#include <nng/protocol/survey0/survey.h> +#include <nng/supplemental/tls/tls.h> +#include <nng/supplemental/util/options.h> +#include <nng/supplemental/util/platform.h> +#include <nng/transport/zerotier/zerotier.h> + +// Globals. We need this to avoid passing around everything. +int format = 0; +int proto = 0; +int verbose = 0; +int delay = 0; +nng_duration interval = NNG_DURATION_INFINITE; +nng_duration sendtimeo = NNG_DURATION_INFINITE; +nng_duration recvtimeo = NNG_DURATION_INFINITE; +void * data = NULL; +size_t datalen = 0; +int compat = 0; +int async = 0; +int insecure = 0; +void * cacert = NULL; +size_t cacertlen = 0; +void * keyfile = NULL; +size_t keylen = 0; +void * certfile = NULL; +size_t certlen = 0; +const char * zthome = NULL; +int count = 0; +int recvmaxsz = -1; + +// Options, must start at 1 because zero is sentinel. +enum options { + OPT_HELP = 1, + OPT_VERBOSE, + OPT_SILENT, + OPT_REP0, + OPT_REQ0, + OPT_PUSH0, + OPT_PULL0, + OPT_PUB0, + OPT_SUB0, + OPT_SURVEY0, + OPT_RESPOND0, + OPT_PAIR0, + OPT_PAIR1, + OPT_PAIR, + OPT_BUS0, + OPT_DIAL, + OPT_LISTEN, + OPT_DATA, + OPT_FILE, + OPT_SUBSCRIBE, + OPT_INTERVAL, + OPT_DELAY, + OPT_COUNT, + OPT_FORMAT, + OPT_RAW, + OPT_ASCII, + OPT_QUOTED, + OPT_MSGPACK, + OPT_HEX, + OPT_BLANK, // no printing, not an actual option. + OPT_COMPAT, + OPT_ASYNC, + OPT_RCV_TIMEO, + OPT_SND_TIMEO, + OPT_SOCK_NAME, + OPT_LISTEN_IPC, + OPT_DIAL_IPC, + OPT_LISTEN_LOCAL, + OPT_DIAL_LOCAL, + OPT_INSECURE, + OPT_CACERT, + OPT_KEYFILE, + OPT_CERTFILE, + OPT_VERSION, + OPT_RECVMAXSZ, + OPT_ZTHOME, +}; + +static nng_optspec opts[] = { + { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP }, + { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE }, + { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT }, + { .o_name = "req0", .o_val = OPT_REQ0 }, + { .o_name = "rep0", .o_val = OPT_REP0 }, + { .o_name = "push0", .o_val = OPT_PUSH0 }, + { .o_name = "pull0", .o_val = OPT_PULL0 }, + { .o_name = "pub0", .o_val = OPT_PUB0 }, + { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias + { .o_name = "sub0", .o_val = OPT_SUB0 }, + { .o_name = "surveyor0", .o_val = OPT_SURVEY0 }, + { .o_name = "respondent0", .o_val = OPT_RESPOND0 }, + { .o_name = "pair", .o_val = OPT_PAIR }, + { .o_name = "pair0", .o_val = OPT_PAIR0 }, + { .o_name = "pair1", .o_val = OPT_PAIR1 }, + { .o_name = "bus0", .o_val = OPT_BUS0 }, + { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true }, + { .o_name = "listen", .o_val = OPT_LISTEN, .o_arg = true }, + { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true }, + { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true }, + { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true }, + { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true }, + { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII }, + { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED }, + { .o_name = "raw", .o_val = OPT_RAW }, + { .o_name = "hex", .o_val = OPT_HEX }, + { .o_name = "compat", .o_val = OPT_COMPAT }, + { .o_name = "async", .o_val = OPT_ASYNC }, + { .o_name = "msgpack", .o_val = OPT_MSGPACK }, + + { + .o_name = "recv-maxsz", + .o_short = 'Z', + .o_val = OPT_RECVMAXSZ, + .o_arg = true, + }, + { + .o_name = "count", + .o_short = 'C', + .o_val = OPT_COUNT, + .o_arg = true, + }, + { + .o_name = "delay", + .o_short = 'd', + .o_val = OPT_DELAY, + .o_arg = true, + }, + { + .o_name = "interval", + .o_short = 'i', + .o_val = OPT_INTERVAL, + .o_arg = true, + }, + { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true }, + { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true }, + { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true }, + + // Legacy compatibility with nanocat. + { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true }, + { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true }, + { + .o_name = "bind-ipc", + .o_short = 'X', + .o_val = OPT_LISTEN_IPC, + .o_arg = true, + }, + { + .o_name = "connect-ipc", + .o_short = 'x', + .o_val = OPT_DIAL_IPC, + .o_arg = true, + }, + { + .o_name = "bind-local", + .o_short = 'L', + .o_val = OPT_LISTEN_LOCAL, + .o_arg = true, + }, + { + .o_name = "connect-local", + .o_short = 'l', + .o_val = OPT_DIAL_LOCAL, + .o_arg = true, + }, + { .o_name = "insecure", .o_short = 'k', .o_val = OPT_INSECURE }, + { .o_name = "cacert", .o_val = OPT_CACERT, .o_arg = true }, + { .o_name = "key", .o_val = OPT_KEYFILE, .o_arg = true }, + { + .o_name = "cert", + .o_short = 'E', + .o_val = OPT_CERTFILE, + .o_arg = true, + }, + { + .o_name = "zt-home", + .o_val = OPT_ZTHOME, + .o_arg = true, + }, + { .o_name = "version", .o_short = 'V', .o_val = OPT_VERSION }, + + // Sentinel. + { .o_name = NULL, .o_val = 0 }, +}; + +static void +fatal(const char *msg, ...) +{ + va_list ap; + va_start(ap, msg); + vfprintf(stderr, msg, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +static void +help(void) +{ + printf( + "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n"); + printf("<proto> must be one of:\n"); + printf(" --req0 (or alias --req)\n"); + printf(" --rep0 (or alias --rep)\n"); + printf(" --pub0 (or alias --pub)\n"); + printf(" --sub0 (or alias --sub)\n"); + printf(" --push0 (or alias --push)\n"); + printf(" --pull0 (or alias --pull)\n"); + printf(" --surveyor0 (or alias --surveyor)\n"); + printf(" --respondent0 (or alias --respondent)\n"); + printf(" --pair0\n"); + printf(" --pair1\n"); + printf(" --pair (alias for either pair0 or pair1)\n"); + printf("\n<addr> must be one or more of:\n"); + printf(" --dial <url> (or alias --connect <url>)\n"); + printf(" --listen <url> (or alias --bind <url>)\n"); + printf(" --bind-local <port> (or alias -L <port>)\n"); + printf(" --connect-local <port> (or alias -l <port>)\n"); + printf(" --bind-ipc <path> (or alias -X <path>)\n"); + printf(" --connect-ipc <path> (or alias -x <path>)\n"); + printf("\n<fmt> may be one or none of:\n"); + printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n"); + printf(" --ascii (or alias -A)\n"); + printf(" --quoted (or alias -Q)\n"); + printf(" --hex\n"); + printf(" --msgpack\n"); + printf(" --raw\n"); + printf("\n<opts> may be any of:\n"); + printf(" --subscribe <topic> (only with --sub protocol)\n"); + printf(" --silent (or alias -q)\n"); + printf(" --verbose (or alias -v)\n"); + printf(" --count <num> (or alias -C <num>)\n"); + printf(" --delay <secs> (or alias -d <secs>)\n"); + printf(" --interval <secs> (or alias -i <secs>)\n"); + printf(" --recv-timeout <secs>\n"); + printf(" --send-timeout <secs>\n"); + printf(" --recv-maxsz <size> (or alias -Z <size>)\n"); + printf(" --compat\n"); + printf(" --async\n"); + printf(" --insecure (or alias -k)\n"); + printf(" --cacert <file>\n"); + printf(" --cert <file> (or alias -E <file>)\n"); + printf(" --key <file>\n"); + printf(" --zt-home <path>\n"); + printf("\n<src> may be one of:\n"); + printf(" --file <file> (or alias -F <file>). " + "Use - for standard input.\n"); + printf(" --data <data> (or alias -D <data>)\n"); + exit(1); +} + +static int +intarg(const char *val, int maxv) +{ + int v = 0; + + if (val[0] == '\0') { + fatal("Empty integer argument."); + } + while (*val != '\0') { + if (!isdigit(*val)) { + fatal("Integer argument expected."); + } + v *= 10; + v += ((*val) - '0'); + val++; + if (v > maxv) { + fatal("Integer argument too large."); + } + } + if (v < 0) { + fatal("Integer argument overflow."); + } + return (v); +} + +// This reads a file into memory. Care is taken to ensure that +// the buffer is one byte larger and contains a terminating +// NUL. (Useful for key files and such.) +static void +loadfile(const char *path, void **datap, size_t *lenp) +{ + FILE * f; + size_t total_read = 0; + size_t allocation_size = BUFSIZ; + char * fdata; + char * realloc_result; + + if (strcmp(path, "-") == 0) { + f = stdin; + } else { + if ((f = fopen(path, "rb")) == NULL) { + fatal( + "Cannot open file %s: %s", path, strerror(errno)); + } + } + + if ((fdata = malloc(allocation_size + 1)) == NULL) { + fatal("Out of memory."); + } + + while (1) { + total_read += fread( + fdata + total_read, 1, allocation_size - total_read, f); + if (ferror(f)) { + if (errno == EINTR) { + continue; + } + fatal( + "Read from %s failed: %s", path, strerror(errno)); + } + if (feof(f)) { + break; + } + if (total_read == allocation_size) { + if (allocation_size > SIZE_MAX / 2) { + fatal("Out of memory."); + } + allocation_size *= 2; + if ((realloc_result = realloc( + fdata, allocation_size + 1)) == NULL) { + free(fdata); + fatal("Out of memory."); + } + fdata = realloc_result; + } + } + if (f != stdin) { + fclose(f); + } + fdata[total_read] = '\0'; + *datap = fdata; + *lenp = total_read; +} + +static void +configtls(nng_tls_config *tls) +{ + int rv = 0; + if (insecure) { + rv = nng_tls_config_auth_mode(tls, NNG_TLS_AUTH_MODE_NONE); + } + if ((rv == 0) && (certfile != NULL)) { + keyfile = keyfile ? keyfile : certfile; + rv = nng_tls_config_own_cert(tls, certfile, keyfile, NULL); + } + if ((rv == 0) && (cacert != NULL)) { + rv = nng_tls_config_ca_chain(tls, cacert, NULL); + } + if (rv != 0) { + fatal("Unable to configure TLS: %s", nng_strerror(rv)); + } +} + +struct addr { + struct addr *next; + int mode; + char * val; +}; + +struct addr ** +addaddr(struct addr **endp, int mode, const char *a) +{ + struct addr *na; + + if (((na = malloc(sizeof(*na))) == NULL) || + ((na->val = malloc(strlen(a) + 1)) == NULL)) { + fatal("Out of memory."); + } + na->mode = mode; + memcpy(na->val, a, strlen(a) + 1); + na->next = NULL; + *endp = na; + return (&na->next); +} + +struct topic { + struct topic *next; + char * val; +}; + +struct topic ** +addtopic(struct topic **endp, const char *s) +{ + struct topic *t; + + if (((t = malloc(sizeof(*t))) == NULL) || + ((t->val = malloc(strlen(s) + 1)) == NULL)) { + fatal("Out of memory."); + } + memcpy(t->val, s, strlen(s) + 1); + t->next = NULL; + *endp = t; + return (&t->next); +} + +void +printmsg(char *buf, size_t len) +{ + switch (format) { + case OPT_BLANK: // Suppress contents. + return; + case OPT_RAW: // Just emit raw contents. + fwrite(buf, 1, len, stdout); + break; + case OPT_ASCII: // ASCII, but non-ASCII substituted with '.' + for (size_t i = 0; i < len; i++) { + if (isprint(buf[i])) { + putchar(buf[i]); + } else { + putchar('.'); + } + } + break; + case OPT_QUOTED: // C style quoted strings. + putchar('"'); + for (size_t i = 0; i < len; i++) { + switch (buf[i]) { + case '\n': + putchar('\\'); + putchar('n'); + break; + case '\r': + putchar('\\'); + putchar('r'); + break; + case '\t': + putchar('\\'); + putchar('t'); + break; + case '"': + case '\\': + putchar('\\'); + putchar(buf[i]); + break; + default: + if (isprint(buf[i])) { + fputc(buf[i], stdout); + } else { + printf("\\x%02x", (uint8_t) buf[i]); + } + } + } + putchar('"'); + putchar('\n'); + break; + case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then + // raw. + if (len < 256) { + putchar('\xc4'); + putchar(len & 0xff); + } else if (len < 65536) { + putchar('\xc5'); + putchar((len >> 8) & 0xff); + putchar(len & 0xff); + } else { + putchar('\xc6'); + putchar((len >> 24) & 0xff); + putchar((len >> 16) & 0xff); + putchar((len >> 8) & 0xff); + putchar(len & 0xff); + } + fwrite(buf, 1, len, stdout); + break; + case OPT_HEX: // Hex, quoted C string encoded with hex + // literals. + putchar('"'); + for (size_t i = 0; i < len; i++) { + printf("\\x%02x", (uint8_t) buf[i]); + } + putchar('"'); + putchar('\n'); + break; + } + fflush(stdout); +} + +void +recvloop(nng_socket sock) +{ + int iters = 0; + for (;;) { + int rv; + nng_msg *msg; + + if ((count > 0) && (iters >= count)) { + break; + } + rv = nng_recvmsg(sock, &msg, 0); + iters++; + switch (rv) { + case NNG_ETIMEDOUT: + case NNG_ESTATE: + // Either a regular timeout, or we reached the + // end of an event like a survey completing. + return; + case 0: + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_free(msg); + continue; + default: + fatal("Receive error: %s", nng_strerror(rv)); + } + } +} + +void +resploop(nng_socket sock) +{ + int iters = 0; + for (;;) { + int rv; + nng_msg *msg; + + rv = nng_recvmsg(sock, &msg, 0); + if (rv != 0) { + fatal("Receive error: %s", nng_strerror(rv)); + } + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_clear(msg); + if ((rv = nng_msg_append(msg, data, datalen)) != 0) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + + iters++; + if ((count > 0) && (iters >= count)) { + break; + } + } + + nng_msleep(200); +} + +void +sendloop(nng_socket sock) +{ + int iters = 0; + + if (data == NULL) { + fatal("No data to send (specify with --data or --file)"); + } + if (delay > 0) { + nng_msleep(delay); + } + + for (;;) { + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + nng_duration delta; + + start = nng_clock(); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, data, datalen)) != 0)) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + end = nng_clock(); + delta = (nng_duration)(end - start); + + iters++; + // By default, we don't loop. + if (((interval < 0) && (count == 0)) || + ((count > 0) && (iters >= count))) { + break; + } + + // We sleep, but we account for time spent, so that our + // interval appears more or less constant. Of course + // if we took more than the interval here, then we skip + // the sleep altogether. + if ((delta >= 0) && (delta < interval)) { + nng_msleep(interval - delta); + } + } + // Wait a bit to give queues a chance to drain. + nng_msleep(200); +} + +void +sendrecv(nng_socket sock) +{ + int iters = 0; + + if (data == NULL) { + fatal("No data to send (specify with --data or --file)"); + } + if (delay > 0) { + nng_msleep(delay); + } + + // We start by sending a message, then we receive iteratively + // but we set a concrete timeout if interval is set, to ensure + // that we exit the receive loop, and can continue. + for (;;) { + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + nng_duration delta; + + start = nng_clock(); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, data, datalen)) != 0)) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + if ((interval < 0) && (count == 0)) { + // Only one iteration through. + recvloop(sock); + break; + } + + // We would like to use recvloop, but we need to reset + // our timeout each time, as the timer counts down + // towards zero. Furthermore, with survey, we don't + // want to increment the iteration count. + + for (;;) { + delta = (nng_duration)(nng_clock() - start); + + nng_duration expire = interval - delta; + if ((recvtimeo >= 0) && (expire > recvtimeo)) { + expire = recvtimeo; + } + rv = + nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, expire); + if (rv != 0) { + fatal("Cannot set recv timeout: %s", + nng_strerror(rv)); + } + + rv = nng_recvmsg(sock, &msg, 0); + switch (rv) { + case 0: + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_free(msg); + continue; + case NNG_ETIMEDOUT: + case NNG_ESTATE: + // We're done receiving + break; + default: + fatal("Cannot receive: %s", nng_strerror(rv)); + break; + } + + // We're done receiving, break out. + break; + } + + end = nng_clock(); + delta = (nng_duration)(end - start); + + iters++; + if ((count > 0) && (iters >= count)) { + break; + } + + // We sleep, but we account for time spent, so that our + // interval appears more or less constant. Of course + // if we took more than the interval here, then we skip + // the sleep altogether. + if ((delta >= 0) && (delta < interval)) { + nng_msleep(interval - delta); + } + } +} + +int +main(int ac, char **av) +{ + int idx; + char * arg; + int val; + int rv; + char scratch[512]; + struct addr * addrs = NULL; + struct addr ** addrend; + struct topic * topics = NULL; + struct topic **topicend; + nng_socket sock; + int port; + + idx = 1; + addrend = &addrs; + topicend = &topics; + + while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) { + switch (val) { + case OPT_HELP: + help(); + break; + case OPT_REQ0: + case OPT_REP0: + case OPT_SUB0: + case OPT_PUB0: + case OPT_BUS0: + case OPT_SURVEY0: + case OPT_RESPOND0: + case OPT_PAIR0: + case OPT_PAIR1: + case OPT_PUSH0: + case OPT_PULL0: + if (proto != 0) { + fatal("Only one protocol may be " + "specified."); + } + proto = val; + break; + case OPT_DIAL: + case OPT_LISTEN: + addrend = addaddr(addrend, val, arg); + break; + case OPT_DIAL_LOCAL: + case OPT_LISTEN_LOCAL: + port = intarg(arg, 65536); + snprintf(scratch, sizeof(scratch), + "tcp://127.0.0.1:%d", port); + addrend = addaddr(addrend, val, scratch); + break; + case OPT_DIAL_IPC: + case OPT_LISTEN_IPC: + snprintf(scratch, sizeof(scratch), "ipc:///%s", arg); + addrend = addaddr(addrend, val, scratch); + break; + case OPT_COUNT: + count = intarg(arg, 0x7fffffff); + break; + case OPT_SUBSCRIBE: + topicend = addtopic(topicend, arg); + break; + case OPT_VERBOSE: + case OPT_SILENT: + verbose = val; + break; + case OPT_DELAY: + delay = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_INTERVAL: + interval = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_SND_TIMEO: + sendtimeo = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_RCV_TIMEO: + recvtimeo = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_RECVMAXSZ: + recvmaxsz = intarg(arg, 0x7fffffff); + if (recvmaxsz == 0) { + recvmaxsz = 0x7fffffff; + } + break; + case OPT_COMPAT: + compat = 1; + break; + case OPT_ASYNC: + async = NNG_FLAG_NONBLOCK; + break; + case OPT_ASCII: + case OPT_RAW: + case OPT_QUOTED: + case OPT_MSGPACK: + case OPT_HEX: + if (format != 0) { + fatal("Format may be specified only " + "once."); + } + format = val; + break; + case OPT_FORMAT: + if (format != 0) { + fatal("Format may be specified only " + "once."); + } + if (strcmp(arg, "no") == 0) { + format = OPT_BLANK; + } else if (strcmp(arg, "ascii") == 0) { + format = OPT_ASCII; + } else if (strcmp(arg, "hex") == 0) { + format = OPT_HEX; + } else if (strcmp(arg, "quoted") == 0) { + format = OPT_QUOTED; + } else if (strcmp(arg, "raw") == 0) { + format = OPT_RAW; + } else if (strcmp(arg, "msgpack") == 0) { + format = OPT_MSGPACK; + } else { + fatal("Invalid format specified."); + } + break; + case OPT_FILE: + if (data != NULL) { + fatal("Data (--file, --data) may be " + "specified " + "only once."); + } + loadfile(arg, &data, &datalen); + break; + case OPT_DATA: + if (data != NULL) { + fatal("Data (--file, --data) may be " + "specified " + "only once."); + } + if ((data = malloc(strlen(arg) + 1)) == NULL) { + fatal("Out of memory."); + } + memcpy(data, arg, strlen(arg) + 1); + datalen = strlen(arg); + break; + case OPT_CACERT: + if (cacert != NULL) { + fatal("CA Certificate (--cacert) may be " + "specified only once."); + } + loadfile(arg, &cacert, &cacertlen); + break; + case OPT_KEYFILE: + if (keyfile != NULL) { + fatal( + "Key (--key) may be specified only once."); + } + loadfile(arg, &keyfile, &keylen); + break; + case OPT_CERTFILE: + if (certfile != NULL) { + fatal("Cert (--cert) may be specified " + "only " + "once."); + } + loadfile(arg, &certfile, &certlen); + break; + case OPT_ZTHOME: + zthome = arg; + break; + case OPT_INSECURE: + insecure = 1; + break; + case OPT_VERSION: + printf("%s\n", nng_version()); + exit(0); + } + } + switch (rv) { + case NNG_EINVAL: + fatal("Option %s is invalid.", av[idx]); + break; + case NNG_EAMBIGUOUS: + fatal("Option %s is ambiguous (specify in full).", av[idx]); + break; + case NNG_ENOARG: + fatal("Option %s requires argument.", av[idx]); + break; + default: + break; + } + + if (addrs == NULL) { + fatal("No address specified."); + } + + if (compat) { + if (async != 0) { + fatal("Options --async and --compat are " + "incompatible."); + } + if (count != 0) { + fatal("Options --count and --compat are " + "incompatible."); + } + if (proto == OPT_PAIR) { + proto = OPT_PAIR0; + } + if (proto == OPT_PAIR1) { + fatal("Protocol does not support --compat."); + } + async = NNG_FLAG_NONBLOCK; + } else { + if (proto == OPT_PAIR) { + proto = OPT_PAIR1; + } + } + if (proto == OPT_SUB0) { + if (topics == NULL) { + (void) addtopic(topicend, ""); // subscribe to all + } + } else { + if (topics != NULL) { + fatal("Protocol does not support --subscribe."); + } + } + + switch (proto) { + case OPT_PULL0: + case OPT_SUB0: + if (data != NULL) { + fatal("Protocol does not support --file or " + "--data."); + } + if (interval >= 0) { + fatal("Protocol does not support --interval."); + } + break; + case OPT_PUSH0: + case OPT_PUB0: + if (format != 0) { + fatal("Protocol does not support --format " + "options."); + } + if (data == NULL) { + fatal("Protocol requires either --file or " + "--data."); + } + break; + case OPT_SURVEY0: + case OPT_REQ0: + if (data == NULL) { + fatal("Protocol requires either --file or " + "--data."); + } + break; + case OPT_REP0: + case OPT_RESPOND0: + if (interval >= 0) { + fatal("Protocol does not support --interval."); + } + break; + case OPT_PAIR0: + case OPT_PAIR1: + case OPT_BUS0: + if ((interval >= 0) && (data == NULL)) { + fatal("Option --interval requires either " + "--file or --data."); + } + break; + default: + // Will be caught in next switch statement. + break; + } + + switch (proto) { + case OPT_REQ0: +#ifdef NNG_HAVE_REQ0 + rv = nng_req0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_REP0: +#ifdef NNG_HAVE_REP0 + rv = nng_rep0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_SUB0: +#ifdef NNG_HAVE_SUB0 + rv = nng_sub0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_PUB0: +#ifdef NNG_HAVE_PUB0 + rv = nng_pub0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_PAIR0: +#ifdef NNG_HAVE_PAIR0 + rv = nng_pair0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_PAIR1: +#ifdef NNG_HAVE_PAIR1 + rv = nng_pair1_open(&sock); +#else + fatal("Protocol not supported"); +#endif + break; + case OPT_BUS0: +#ifdef NNG_HAVE_BUS0 + rv = nng_bus0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_PUSH0: +#ifdef NNG_HAVE_PUSH0 + rv = nng_push0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_PULL0: +#ifdef NNG_HAVE_PULL0 + rv = nng_pull0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_SURVEY0: +#ifdef NNG_HAVE_SURVEYOR0 + rv = nng_surveyor0_open(&sock); +#else + fatal("Protocol not supported."); +#endif + break; + case OPT_RESPOND0: +#ifdef NNG_HAVE_RESPONDENT0 + rv = nng_respondent0_open(&sock); +#else + fatal("Protocol not supported"); +#endif + break; + case 0: + default: + fatal("No protocol specified."); + break; + } + if (rv != 0) { + fatal("Unable to open socket: %s", nng_strerror(rv)); + } + + for (struct topic *t = topics; t != NULL; t = t->next) { + rv = nng_socket_set( + sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val)); + if (rv != 0) { + fatal("Unable to subscribe to topic %s: %s", t->val, + nng_strerror(rv)); + } + } + + if ((sendtimeo > 0) && + ((rv = nng_socket_set_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) != + 0)) { + fatal("Unable to set send timeout: %s", nng_strerror(rv)); + } + if ((recvtimeo > 0) && + ((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) != + 0)) { + fatal("Unable to set send timeout: %s", nng_strerror(rv)); + } + + if ((recvmaxsz >= 0) && + ((rv = nng_socket_set_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) != + 0)) { + fatal("Unable to set max receive size: %s", nng_strerror(rv)); + } + + for (struct addr *a = addrs; a != NULL; a = a->next) { + char * act; + nng_listener l; + nng_dialer d; + nng_tls_config *tls; + switch (a->mode) { + case OPT_DIAL: + case OPT_DIAL_IPC: + case OPT_DIAL_LOCAL: + rv = nng_dialer_create(&d, sock, a->val); + if (rv != 0) { + fatal("Unable to create dialer for %s: %s", + a->val, nng_strerror(rv)); + } + rv = nng_dialer_getopt_ptr( + d, NNG_OPT_TLS_CONFIG, (void **) &tls); + if (rv == 0) { + configtls(tls); + } else if (rv != NNG_ENOTSUP) { + fatal("Unable to get TLS config: %s", + nng_strerror(rv)); + } + if (zthome != NULL) { + rv = nng_dialer_set(d, NNG_OPT_ZT_HOME, + zthome, strlen(zthome) + 1); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + fatal("Unable to set ZT home: %s", + nng_strerror(rv)); + } + } + rv = nng_dialer_start(d, async); + act = "dial"; + if ((rv == 0) && (verbose == OPT_VERBOSE)) { + char ustr[256]; + size_t sz; + sz = sizeof(ustr); + if (nng_dialer_getopt( + d, NNG_OPT_URL, ustr, &sz) == 0) { + printf("Connected to: %s\n", ustr); + } + } + break; + case OPT_LISTEN: + case OPT_LISTEN_IPC: + case OPT_LISTEN_LOCAL: + rv = nng_listener_create(&l, sock, a->val); + if (rv != 0) { + fatal("Unable to create listener for %s: %s", + a->val, nng_strerror(rv)); + } + rv = nng_listener_getopt_ptr( + l, NNG_OPT_TLS_CONFIG, (void **) &tls); + if (rv == 0) { + configtls(tls); + } else if (rv != NNG_ENOTSUP) { + fatal("Unable to get TLS config: %s", + nng_strerror(rv)); + } + if (zthome != NULL) { + rv = nng_listener_set(l, NNG_OPT_ZT_HOME, + zthome, strlen(zthome) + 1); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + fatal("Unable to set ZT home: %s", + nng_strerror(rv)); + } + } + rv = nng_listener_start(l, async); + act = "listen"; + if ((rv == 0) && (verbose == OPT_VERBOSE)) { + char ustr[256]; + size_t sz; + sz = sizeof(ustr); + if (nng_listener_getopt( + l, NNG_OPT_URL, ustr, &sz) == 0) { + printf("Listening at: %s\n", ustr); + } + } + break; + default: + fatal("Invalid address mode! (Bug!)"); + } + + if (rv != 0) { + fatal("Unable to %s on %s: %s", act, a->val, + nng_strerror(rv)); + } + } + + switch (proto) { + case OPT_SUB0: + case OPT_PULL0: + recvloop(sock); + break; + case OPT_REP0: + case OPT_RESPOND0: + if (data == NULL) { + recvloop(sock); + } else { + resploop(sock); + } + break; + case OPT_PUSH0: + case OPT_PUB0: + sendloop(sock); + break; + case OPT_BUS0: + case OPT_PAIR0: + case OPT_PAIR1: + if (data != NULL) { + sendrecv(sock); + } else { + recvloop(sock); + } + break; + case OPT_REQ0: + case OPT_SURVEY0: + sendrecv(sock); + break; + default: + fatal("Protocol handling unimplemented."); + } + + exit(0); +} diff --git a/src/tools/nngcat/nngcat_ambiguous_test.sh b/src/tools/nngcat/nngcat_ambiguous_test.sh new file mode 100755 index 00000000..414b6d19 --- /dev/null +++ b/src/tools/nngcat/nngcat_ambiguous_test.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +CMD="${NNGCAT} --re --dial=tcp://127.0.0.1:27272" + +echo -n "Verify ambiguous options fail: " +if ${CMD} >/dev/null 2>&1 +then + echo "Failed: ambigous accepted" + exit 1 +fi +x=$(${CMD} 2>&1) +if [[ ${x} =~ "ambiguous" ]] +then + echo "pass" + exit 0 +fi + +echo "Failed: error did not match" +exit 1 diff --git a/src/tools/nngcat/nngcat_async_test.sh b/src/tools/nngcat/nngcat_async_test.sh new file mode 100755 index 00000000..2b03e522 --- /dev/null +++ b/src/tools/nngcat/nngcat_async_test.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +ADDR="ipc:///tmp/nngcat_async_test" + +echo -n "Verify async connect: " + +${NNGCAT} --async -d 1 --connect ${ADDR} --req0 -D "ping" & + + +answer=$( ${NNGCAT} --rep0 --recv-timeout=3 --listen ${ADDR} -D "pong" --ascii 2>/dev/null ) + +if [[ ${answer} == "ping" ]] +then + echo "pass" + exit 0 +fi + +echo "Failed: req did not match" +echo "RES: $answer" +exit 1 diff --git a/src/tools/nngcat/nngcat_dup_proto_test.sh b/src/tools/nngcat/nngcat_dup_proto_test.sh new file mode 100755 index 00000000..1513d01c --- /dev/null +++ b/src/tools/nngcat/nngcat_dup_proto_test.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} + +echo -n "Verify only a single protocol is allowed: " +if ${NNGCAT} --pub0 --sub0 --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1 +then + echo "Failed: duplicate protocols accepted" + exit 1 +fi +echo "pass" +exit 0 diff --git a/src/tools/nngcat/nngcat_help_test.sh b/src/tools/nngcat/nngcat_help_test.sh new file mode 100755 index 00000000..95ed9e3e --- /dev/null +++ b/src/tools/nngcat/nngcat_help_test.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} + +echo -n "Verify nngcat help: " +if ${NNGCAT} --help >/dev/null 2>&1 +then + echo "Failed: help didn't return 1" + exit 1 +fi +x=$(${NNGCAT} --help 2>&1) +if [[ ${x} =~ "Usage:" ]] +then + echo "pass" + exit 0 +fi + +echo "Failed: usage did not match" +echo "Output:" +echo "$x" +exit 1 diff --git a/src/tools/nngcat/nngcat_incompat_test.sh b/src/tools/nngcat/nngcat_incompat_test.sh new file mode 100755 index 00000000..128b57ba --- /dev/null +++ b/src/tools/nngcat/nngcat_incompat_test.sh @@ -0,0 +1,73 @@ +#!/bin/sh + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} + +echo "Verify incompatible options: " + +# Just bind something to this so other ones connect +${NNGCAT} --pull0 --ascii -X /tmp/bogusipc & +pid=$! + +trap "kill $pid && wait $pid 2>/dev/null" 0 + +echo -n " --subscribe doesn't work with non-sub" +if ${NNGCAT} --req0 -x /tmp/bogusipc --subscribe=oops >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo -n " --interval doesn't work with recv only: " +if ${NNGCAT} --interval 1 --pull -x /tmp/bogusipc >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo -n " --pair1 doesn't work with --compat: " +if ${NNGCAT} --compat --pair1 -x /tmp/bogusipc >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo -n " --count doesn't work with --compat: " +if ${NNGCAT} --compat --count=1 --pair0 -x /tmp/bogusipc >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo -n " --count fails with non-integer: " +if ${NNGCAT} --count=xyz --pair0 -x /tmp/bogusipc >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo -n " --file fails with non-existing file: " +if ${NNGCAT} --async --file=/nosuchfilehere --push0 -x /tmp/bogusipc >/dev/null 2>&1 +then + echo "fail" + exit 1 +fi +echo "pass" + +echo "PASS." +exit 0 diff --git a/src/tools/nngcat/nngcat_need_proto_test.sh b/src/tools/nngcat/nngcat_need_proto_test.sh new file mode 100755 index 00000000..d6733cad --- /dev/null +++ b/src/tools/nngcat/nngcat_need_proto_test.sh @@ -0,0 +1,23 @@ +#!/bin/sh + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} + +echo -n "Verify a protocol is needed: " +if ${NNGCAT} --dial=tcp://127.0.0.1:8989 >/dev/null 2>&1 +then + echo "Failed: protocol should be required" + exit 1 +fi +echo "pass" +exit 0 diff --git a/src/tools/nngcat/nngcat_pubsub_test.sh b/src/tools/nngcat/nngcat_pubsub_test.sh new file mode 100755 index 00000000..b9ba90ed --- /dev/null +++ b/src/tools/nngcat/nngcat_pubsub_test.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +ADDR="ipc:///tmp/nngcat_pub_sub_test" +OUTPUT=/tmp/nngcat_pubsub_test.$$.out + +echo -n "Verify pub sub: " + +trap "rm $OUTPUT" 0 + +${NNGCAT} --listen ${ADDR} --count=3 --recv-timeout=20 --sub0 --subscribe=one --subscribe=two --quoted > $OUTPUT 2>/dev/null & +sleep 1 +# for speed of execution, run these in the background, they should be ignored +${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "xyz" & +${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "none swam" & +# these we care about, due to ordering (checksum) so run them serially +${NNGCAT} -d 1 --connect ${ADDR} --pub0 -D "one flew" +${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "twofer test" +${NNGCAT} -d 1 --connect ${ADDR} --pub0 --data "one more" + +wait $bgid 2>/dev/null + +sum=$(cksum ${OUTPUT}) +sum=${sum%% *} +if [[ ${sum} == 3929078614 ]] +then + echo "pass" + exit 0 +fi +echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})" +echo "OUTPUT:" +cat ${OUTPUT} + +exit 1 diff --git a/src/tools/nngcat/nngcat_recvmaxsz_test.sh b/src/tools/nngcat/nngcat_recvmaxsz_test.sh new file mode 100755 index 00000000..b5d4ff4a --- /dev/null +++ b/src/tools/nngcat/nngcat_recvmaxsz_test.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +ADDR="ipc:///tmp/nngcat_recvmaxsz_test" +OUTPUT=/tmp/nngcat_recvmaxsz_test.$$.out + +echo -n "Verify maximum receive size: " + +trap "rm $OUTPUT" 0 + +${NNGCAT} --listen ${ADDR} --count=3 --recv-maxsz=5 --pull0 --quoted > $OUTPUT 2>/dev/null & +sleep 1 +# for speed of execution, run these in the background, they should be ignored +${NNGCAT} --connect ${ADDR} --push0 --data "one" +${NNGCAT} --connect ${ADDR} --push0 --data "55555" +${NNGCAT} --connect ${ADDR} --push0 --data "666666" +${NNGCAT} --connect ${ADDR} --push0 --data "7777777" +${NNGCAT} --connect ${ADDR} --push0 --data "88888" + +wait $bgid 2>/dev/null + +sum=$(cksum ${OUTPUT}) +sum=${sum%% *} + +# This matches 3 lines of "one", "55555", "88888". +if [[ ${sum} == 4122906158 ]] +then + echo "pass" + exit 0 +fi +echo "FAIL: Checksum failed (Wanted 3929078614 got ${sum})" +echo "OUTPUT:" +cat ${OUTPUT} + +exit 1 diff --git a/src/tools/nngcat/nngcat_stdin_pipe_test.sh b/src/tools/nngcat/nngcat_stdin_pipe_test.sh new file mode 100755 index 00000000..5fec0ab7 --- /dev/null +++ b/src/tools/nngcat/nngcat_stdin_pipe_test.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# Copyright 2020 Lager Data, Inc. <support@lagerdata.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +ADDR="ipc:///tmp/nngcat_stdin_pipe_test" +OUTPUT=/tmp/nngcat_stdin_pipe_test.$$.out + +echo -n "Verify reading from stdin pipe: " + +trap "rm $OUTPUT" 0 + +${NNGCAT} --listen ${ADDR} --count=1 --recv-timeout=3 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null & +bgid=$! + +sleep 1 +# for speed of execution, run these in the background, they should be ignored +echo "hello world" | ${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file - +wait "$bgid" 2>/dev/null + +sum=$(cksum ${OUTPUT}) +sum=${sum%% *} + +# This matches "hello world\n" since echo adds a trailing newline +if [[ ${sum} == 3733384285 ]] +then + echo "pass" + exit 0 +fi +echo "FAIL: Checksum failed (Wanted 3733384285 got ${sum})" +echo "OUTPUT:" +ls -la ${OUTPUT} + +exit 1 diff --git a/src/tools/nngcat/nngcat_unlimited_test.sh b/src/tools/nngcat/nngcat_unlimited_test.sh new file mode 100755 index 00000000..0486b9b1 --- /dev/null +++ b/src/tools/nngcat/nngcat_unlimited_test.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +NNGCAT=${NNGCAT:=$1} +NNGCAT=${NNGCAT:-./nngcat} +ADDR="ipc:///tmp/nngcat_unlimited_test" +INPUT=/tmp/nngcat_unlimited_test.$$.in +OUTPUT=/tmp/nngcat_unlimited_test.$$.out + +echo -n "Verify unlimited receive size: " + +trap "rm $OUTPUT $INPUT" 0 + +# 4 MB +dd if=/dev/urandom of=${INPUT} bs=1024 count=4096 >/dev/null 2>&1 +goodsum=$(cksum ${INPUT}) +goodsum=${goodsum%% *} + +${NNGCAT} --listen ${ADDR} --count=1 --recv-maxsz=0 --pull0 --raw > $OUTPUT 2>/dev/null & +sleep 1 +# for speed of execution, run these in the background, they should be ignored +${NNGCAT} --connect ${ADDR} --delay=1 --push0 --file ${INPUT} +wait $bgid 2>/dev/null + +sum=$(cksum ${OUTPUT}) +sum=${sum%% *} + +if [[ ${sum} == ${goodsum} ]] +then + echo "pass" + exit 0 +fi +echo "FAIL: Checksum failed (Wanted ${goodsum} got ${sum})" +echo "OUTPUT:" +ls -la ${OUTPUT} + +exit 1 diff --git a/src/tools/perf/CMakeLists.txt b/src/tools/perf/CMakeLists.txt new file mode 100644 index 00000000..135544bb --- /dev/null +++ b/src/tools/perf/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Build performance tests. + +nng_directory(perf) + +if (NNG_TESTS) + macro (add_nng_perf NAME) + add_executable (${NAME} perf.c) + target_link_libraries (${NAME} nng nng_private) + endmacro (add_nng_perf) + + add_nng_perf(remote_lat) + add_nng_perf(local_lat) + add_nng_perf(local_thr) + add_nng_perf(remote_thr) + add_nng_perf(inproc_thr) + add_nng_perf(inproc_lat) + + add_test (NAME nng.inproc_lat COMMAND inproc_lat 64 10000) + set_tests_properties (nng.inproc_lat PROPERTIES TIMEOUT 30) + + add_test (NAME nng.inproc_thr COMMAND inproc_thr 1400 10000) + set_tests_properties (nng.inproc_thr PROPERTIES TIMEOUT 30) + + add_executable (pubdrop pubdrop.c) + target_link_libraries(pubdrop nng nng_private) +endif () diff --git a/src/tools/perf/perf.c b/src/tools/perf/perf.c new file mode 100644 index 00000000..accac621 --- /dev/null +++ b/src/tools/perf/perf.c @@ -0,0 +1,662 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/options.h> +#include <nng/supplemental/util/platform.h> + +static void die(const char *, ...); +static int +no_open(nng_socket *arg) +{ + (void) arg; + die("Protocol not supported in this build!"); + return (NNG_ENOTSUP); +} + +typedef int (*open_func)(nng_socket *); + +static open_func open_server = no_open; +static open_func open_client = no_open; + +#if defined(NNG_HAVE_PAIR1) +#include <nng/protocol/pair1/pair.h> +#else +#define nng_pair1_open no_open +#endif + +#if defined(NNG_HAVE_PAIR0) +#include <nng/protocol/pair0/pair.h> +#else +#define nng_pair0_open no_open +#endif + +#if defined(NNG_HAVE_REQ0) +#include <nng/protocol/reqrep0/req.h> +#else +#define nng_req0_open no_open +#endif + +#if defined(NNG_HAVE_REP0) +#include <nng/protocol/reqrep0/rep.h> +#else +#define nng_rep0_open no_open +#endif + +#if defined(NNG_HAVE_BUS0) +#include <nng/protocol/bus0/bus.h> +#else +#define nng_bus0_open no_open +#endif + +#if defined(NNG_HAVE_PULL0) +#include <nng/protocol/pipeline0/pull.h> +#else +#define nng_pull0_open no_open +#endif + +#if defined(NNG_HAVE_PUSH0) +#include <nng/protocol/pipeline0/push.h> +#else +#define nng_push0_open no_open +#endif + +#if defined(NNG_HAVE_PUB0) +#include <nng/protocol/pubsub0/pub.h> +#else +#define nng_pub0_open no_open +#endif + +#if defined(NNG_HAVE_SUB0) +#include <nng/protocol/pubsub0/sub.h> +#else +#define nng_sub0_open no_open +#endif + +enum options { + OPT_PAIR0 = 1, + OPT_PAIR1, + OPT_REQREP0, + OPT_PUBSUB0, + OPT_PIPELINE0, + OPT_SURVEY0, + OPT_BUS0, + OPT_URL, +}; + +// These are not universally supported by the variants yet. +static nng_optspec opts[] = { + { .o_name = "pair1", .o_val = OPT_PAIR1 }, + { .o_name = "pair0", .o_val = OPT_PAIR0 }, + { .o_name = "reqrep0", .o_val = OPT_REQREP0 }, + { .o_name = "bus0", .o_val = OPT_BUS0 }, + { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 }, + { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 }, + { .o_name = "url", .o_val = OPT_URL, .o_arg = true }, + { .o_name = NULL, .o_val = 0 }, +}; + +static void latency_client(const char *, size_t, int); +static void latency_server(const char *, size_t, int); +static void throughput_client(const char *, size_t, int); +static void throughput_server(const char *, size_t, int); +static void do_remote_lat(int argc, char **argv); +static void do_local_lat(int argc, char **argv); +static void do_remote_thr(int argc, char **argv); +static void do_local_thr(int argc, char **argv); +static void do_inproc_thr(int argc, char **argv); +static void do_inproc_lat(int argc, char **argv); +static void die(const char *, ...); + +// perf implements the same performance tests found in the standard +// nanomsg & mangos performance tests. As with mangos, the decision +// about which test to run is determined by the program name (ARGV[0}]) +// that it is run under. +// +// Options are: +// +// - remote_lat - remote latency side (client, aka latency_client) +// - local_lat - local latency side (server, aka latency_server) +// - local_thr - local throughput side +// - remote_thr - remote throughput side +// - inproc_lat - inproc latency +// - inproc_thr - inproc throughput +// + +bool +matches(const char *arg, const char *name) +{ + const char *ptr = arg; + const char *x; + + while (((x = strchr(ptr, '/')) != NULL) || + ((x = strchr(ptr, '\\')) != NULL) || + ((x = strchr(ptr, ':')) != NULL)) { + ptr = x + 1; + } + for (;;) { + if (*name == '\0') { + break; + } + if (tolower(*ptr) != *name) { + return (false); + } + ptr++; + name++; + } + + switch (*ptr) { + case '\0': + /* FALLTHROUGH*/ + case '.': // extension; ignore it. + return (true); + default: // some other trailing bit. + return (false); + } +} + +const int PAIR0 = 0; +const int PAIR1 = 1; +const int REQREP = 2; + +int +main(int argc, char **argv) +{ + char *prog; + +#if defined(NNG_HAVE_PAIR1) + open_server = nng_pair1_open; + open_client = nng_pair1_open; +#elif defined(NNG_HAVE_PAIR0) + open_server = nng_pair0_open; + open_client = nng_pair0_open; +#endif + + // Allow -m <remote_lat> or whatever to override argv[0]. + if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) { + prog = argv[2]; + argv += 3; + argc -= 3; + } else { + prog = argv[0]; + argc--; + argv++; + } + if (matches(prog, "remote_lat") || matches(prog, "latency_client")) { + do_remote_lat(argc, argv); + } else if (matches(prog, "local_lat") || + matches(prog, "latency_server")) { + do_local_lat(argc, argv); + } else if (matches(prog, "local_thr") || + matches(prog, "throughput_server")) { + do_local_thr(argc, argv); + } else if (matches(prog, "remote_thr") || + matches(prog, "throughput_client")) { + do_remote_thr(argc, argv); + } else if (matches(prog, "inproc_thr")) { + do_inproc_thr(argc, argv); + } else if (matches(prog, "inproc_lat")) { + do_inproc_lat(argc, argv); + } else { + die("Unknown program mode? Use -m <mode>."); + } +} + +static void +die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(2); +} + +static int +parse_int(const char *arg, const char *what) +{ + long val; + char *eptr; + + val = strtol(arg, &eptr, 10); + // Must be a positive number less than around a billion. + if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) { + die("Invalid %s", what); + } + return ((int) val); +} + +void +do_local_lat(int argc, char **argv) +{ + long int msgsize; + long int trips; + + if (argc != 3) { + die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "round-trips"); + + latency_server(argv[0], msgsize, trips); +} + +void +do_remote_lat(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "round-trips"); + + latency_client(argv[0], msgsize, trips); +} + +void +do_local_thr(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: local_thr <listen-addr> <msg-size> <count>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_server(argv[0], msgsize, trips); +} + +void +do_remote_thr(int argc, char **argv) +{ + int msgsize; + int trips; + + if (argc != 3) { + die("Usage: remote_thr <connect-to> <msg-size> <count>"); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_client(argv[0], msgsize, trips); +} + +struct inproc_args { + int count; + int msgsize; + const char *addr; + void (*func)(const char *, size_t, int); +}; + +static void +do_inproc(void *args) +{ + struct inproc_args *ia = args; + + ia->func(ia->addr, ia->msgsize, ia->count); +} + +void +do_inproc_lat(int argc, char **argv) +{ + nng_thread * thr; + struct inproc_args ia; + int rv; + int val; + int optidx; + char * arg; + char * addr; + + addr = "inproc://latency_test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_BUS0: + open_client = nng_bus0_open; + open_server = nng_bus0_open; + break; + + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; + + if (argc != 2) { + die("Usage: inproc_lat <msg-size> <count>"); + } + + ia.addr = addr; + ia.msgsize = parse_int(argv[0], "message size"); + ia.count = parse_int(argv[1], "count"); + ia.func = latency_server; + + if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { + die("Cannot create thread: %s", nng_strerror(rv)); + } + + // Sleep a bit. + nng_msleep(100); + + latency_client(addr, ia.msgsize, ia.count); + nng_thread_destroy(thr); +} + +void +do_inproc_thr(int argc, char **argv) +{ + nng_thread * thr; + struct inproc_args ia; + int rv; + int optidx; + int val; + char * arg; + char * addr = "inproc://throughput-test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { +#if 0 + // For now these protocols simply do not work with + // throughput -- they don't work with backpressure properly. + // In the future we should support synchronizing in the same + // process, and alerting the sender both on completion of + // a single message, and on completion of all messages. + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; +#endif + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_PIPELINE0: + open_client = nng_pull0_open; + open_server = nng_push0_open; + break; + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; + + if (argc != 2) { + die("Usage: inproc_thr <msg-size> <count>"); + } + + ia.addr = addr; + ia.msgsize = parse_int(argv[0], "message size"); + ia.count = parse_int(argv[1], "count"); + ia.func = throughput_server; + + if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) { + die("Cannot create thread: %s", nng_strerror(rv)); + } + + // Sleep a bit. + nng_msleep(100); + + throughput_client(addr, ia.msgsize, ia.count); + nng_thread_destroy(thr); +} + +void +latency_client(const char *addr, size_t msgsize, int trips) +{ + nng_socket s; + nng_msg * msg; + nng_time start, end; + int rv; + int i; + float total; + float latency; + if ((rv = open_client(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { + die("nng_dial: %s", nng_strerror(rv)); + } + + nng_msleep(100); + + if (nng_msg_alloc(&msg, msgsize) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + + start = nng_clock(); + for (i = 0; i < trips; i++) { + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + } + end = nng_clock(); + + nng_msg_free(msg); + nng_close(s); + + total = (float) ((end - start)) / 1000; + latency = ((float) ((total * 1000000)) / (float) (trips * 2)); + printf("total time: %.3f [s]\n", total); + printf("message size: %d [B]\n", (int) msgsize); + printf("round trip count: %d\n", trips); + printf("average latency: %.3f [us]\n", latency); +} + +void +latency_server(const char *addr, size_t msgsize, int trips) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + + if ((rv = open_server(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { + die("nng_listen: %s", nng_strerror(rv)); + } + + for (i = 0; i < trips; i++) { + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + if (nng_msg_len(msg) != msgsize) { + die("wrong message size: %lu != %lu", nng_msg_len(msg), + msgsize); + } + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + } + + // Wait a bit for things to drain... linger should do this. + // 100ms ought to be enough. + nng_msleep(100); + nng_close(s); +} + +// Our throughput story is quite a mess. Mostly I think because of the poor +// caching and message reuse. We should probably implement a message pooling +// API somewhere. + +void +throughput_server(const char *addr, size_t msgsize, int count) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + uint64_t start, end; + float msgpersec, mbps, total; + + if ((rv = nng_pair_open(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128); + if (rv != 0) { + die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_listen(s, addr, NULL, 0)) != 0) { + die("nng_listen: %s", nng_strerror(rv)); + } + + // Receive first synchronization message. + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + nng_msg_free(msg); + start = nng_clock(); + + for (i = 0; i < count; i++) { + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + if (nng_msg_len(msg) != msgsize) { + die("wrong message size: %lu != %lu", nng_msg_len(msg), + msgsize); + } + nng_msg_free(msg); + } + end = nng_clock(); + // Send a synchronization message (empty) to the other side, + // and wait a bit to make sure it goes out the wire. + nng_send(s, "", 0, 0); + nng_msleep(200); + nng_close(s); + total = (float) ((end - start)) / 1000; + msgpersec = (float) (count) / total; + mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024); + printf("total time: %.3f [s]\n", total); + printf("message size: %d [B]\n", (int) msgsize); + printf("message count: %d\n", count); + printf("throughput: %.f [msg/s]\n", msgpersec); + printf("throughput: %.3f [Mb/s]\n", mbps); +} + +void +throughput_client(const char *addr, size_t msgsize, int count) +{ + nng_socket s; + nng_msg * msg; + int rv; + int i; + + // We send one extra zero length message to start the timer. + count++; + + if ((rv = nng_pair_open(&s)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128); + if (rv != 0) { + die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv)); + } + + rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000); + if (rv != 0) { + die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv)); + } + + if ((rv = nng_dial(s, addr, NULL, 0)) != 0) { + die("nng_dial: %s", nng_strerror(rv)); + } + + if ((rv = nng_msg_alloc(&msg, 0)) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + + for (i = 0; i < count; i++) { + if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + } + + // Attempt to get the completion indication from the other + // side. + if (nng_recvmsg(s, &msg, 0) == 0) { + nng_msg_free(msg); + } + + nng_close(s); +} diff --git a/src/tools/perf/pubdrop.c b/src/tools/perf/pubdrop.c new file mode 100644 index 00000000..be51bb2f --- /dev/null +++ b/src/tools/perf/pubdrop.c @@ -0,0 +1,325 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/platform.h> + +// pubdrop - this is a simple testing utility that lets us measure PUB/SUB +// performance, including dropped messages, delivery across multiple threads, +// etc. It actually uses a wild card subscription for now. + +#if defined(NNG_HAVE_PUB0) && defined(NNG_HAVE_SUB0) +#include <nng/protocol/pubsub0/pub.h> +#include <nng/protocol/pubsub0/sub.h> + +#else + +static void die(const char *, ...); + +static int +nng_pub0_open(nng_socket *arg) +{ + (void) arg; + die("Pub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +static int +nng_sub0_open(nng_socket *arg) +{ + (void) arg; + die("Sub protocol enabled in this build!"); + return (NNG_ENOTSUP); +} + +#endif // NNG_HAVE_PUB0.... + +static void die(const char *, ...); +static void do_pubdrop(int argc, char **argv); +static uint64_t nperusec; +static volatile int x; + +void +work(void) +{ + x = rand(); +} + +void +usdelay(unsigned long long nusec) +{ + nusec *= nperusec; + while (nusec > 0) { + work(); + nusec--; + } +} + +int +main(int argc, char **argv) +{ + argc--; + argv++; + + // We calculate a delay factor to roughly delay 1 usec. We don't + // need this to be perfect, just reproducible on the same host. + unsigned long cnt = 1000000; + + nng_time beg = nng_clock(); + for (unsigned long i = 0; i < cnt; i++) { + work(); + } + nng_time end = nng_clock(); + nperusec = cnt / (1000 * (end - beg)); + + do_pubdrop(argc, argv); +} + +static void +die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(2); +} + +static int +parse_int(const char *arg, const char *what) +{ + long val; + char *eptr; + + val = strtol(arg, &eptr, 10); + // Must be a postive number less than around a billion. + if ((val < 0) || (val > (1 << 30)) || (*eptr != 0) || (eptr == arg)) { + die("Invalid %s", what); + } + return ((int) val); +} + +struct pubdrop_args { + const char * addr; + bool start; + unsigned long long msgsize; + unsigned long long count; + unsigned long long intvl; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long recvs; + nng_time beg; + nng_time end; + nng_mtx * mtx; + nng_cv * cv; +}; + +static void +pub_server(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + + if ((rv = nng_pub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_listen(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + + nng_mtx_lock(pa->mtx); + while (!pa->start) { + nng_cv_wait(pa->cv); + } + nng_mtx_unlock(pa->mtx); + + start = nng_clock(); + for (uint64_t i = 0; i < pa->count; i++) { + // Unfortunately we need to allocate messages dynamically as we + // go. The other option would be to allocate them all up front, + // but that could be a rather excessive amount of memory. + if ((rv = nng_msg_alloc(&msg, (size_t) pa->msgsize)) != 0) { + die("Message alloc failed"); + } + memcpy(nng_msg_body(msg), &i, sizeof(i)); + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + die("Sendmsg: %s", nng_strerror(rv)); + } + // It sure would be nice if we had a usec granularity option + // here. + if (pa->intvl > 0) { + usdelay((unsigned long long) pa->intvl); + } + } + + end = nng_clock(); + + nng_msleep(1000); // drain the queue + nng_close(sock); + + nng_mtx_lock(pa->mtx); + pa->beg = start; + pa->end = end; + nng_mtx_unlock(pa->mtx); +} + +static void +sub_client(void *arg) +{ + struct pubdrop_args *pa = arg; + nng_socket sock; + int rv; + nng_msg * msg; + unsigned long long recvs; + unsigned long long drops; + unsigned long long gaps; + unsigned long long errs; + unsigned long long expect; + + if ((rv = nng_sub0_open(&sock)) != 0) { + die("Cannot open sub: %s", nng_strerror(rv)); + } + if ((rv = nng_dial(sock, pa->addr, NULL, 0)) != 0) { + die("Cannot listen: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECONNMINT, 51)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 10000)) != 0) { + die("setopt: %s", nng_strerror(rv)); + } + + expect = 0; + recvs = drops = gaps = errs = 0; + + while (expect < pa->count) { + uint64_t got; + if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { + if ((rv == NNG_ECLOSED) || (rv == NNG_ETIMEDOUT)) { + // Closed without receiving the last message + drops += (pa->count - expect); + gaps++; + break; + } + printf("ERROR: %s\n", nng_strerror(rv)); + errs++; + break; + } + recvs++; + memcpy(&got, nng_msg_body(msg), sizeof(got)); + nng_msg_free(msg); + if (got != expect) { + gaps++; + if (got > expect) { + drops += (got - expect); + } else { + die("Misordered delivery"); + } + } + expect = got + 1; + } + + nng_mtx_lock(pa->mtx); + pa->drops += drops; + pa->errs += errs; + pa->recvs += recvs; + pa->gaps += gaps; + nng_mtx_unlock(pa->mtx); +} + +static void +do_pubdrop(int argc, char **argv) +{ + nng_thread ** thrs; + struct pubdrop_args pa; + int rv; + int nsubs; + + if (argc != 5) { + die("Usage: pubdrop <url> <msg-size> <msg-count> <num-subs> " + "<interval>"); + } + + memset(&pa, 0, sizeof(pa)); + pa.addr = argv[0]; + pa.msgsize = parse_int(argv[1], "message size"); + pa.count = parse_int(argv[2], "count"); + pa.intvl = parse_int(argv[4], "interval"); + nsubs = parse_int(argv[3], "#subscribers"); + + if (pa.msgsize < sizeof(uint64_t)) { + die("Message size too small."); + } + + thrs = calloc(sizeof(nng_thread *), (size_t) pa.count + 1); + if (((rv = nng_mtx_alloc(&pa.mtx)) != 0) || + ((nng_cv_alloc(&pa.cv, pa.mtx)) != 0)) { + die("Startup: %s\n", nng_strerror(rv)); + }; + + if ((rv = nng_thread_create(&thrs[0], pub_server, &pa)) != 0) { + die("Cannot create pub thread: %s", nng_strerror(rv)); + } + + nng_msleep(100); // give time for listener to start... + + for (int i = 0; i < nsubs; i++) { + if ((rv = nng_thread_create(&thrs[i + 1], sub_client, &pa)) != + 0) { + die("Cannot create sub thread: %s", nng_strerror(rv)); + } + } + + // Sleep a bit for conns to establish. + nng_msleep(2000); + nng_mtx_lock(pa.mtx); + pa.start = true; + nng_cv_wake(pa.cv); + nng_mtx_unlock(pa.mtx); + + for (int i = 0; i < nsubs + 1; i++) { + nng_thread_destroy(thrs[i]); + } + + nng_mtx_lock(pa.mtx); + + unsigned long long expect = nsubs * pa.count; + unsigned long long missing = nsubs ? expect - pa.recvs : 0; + double dur = (pa.end - pa.beg) / 1000.0; + + printf("Sub Sent %llu messages in %.3f sec (%.2f msgs/sec)\n", + pa.count, dur, pa.count / dur); + printf("Expected %llu messages total\n", expect); + printf("Received %llu messages total\n", pa.recvs); + printf("Effective rate %.2f msgs/sec\n", pa.recvs / dur); + printf("Errors %llu total\n", pa.errs); + printf("Reported %llu dropped messages in %llu gaps\n", pa.drops, + pa.gaps); + printf("Dropped %llu messages (missing)\n", missing); + printf("Drop rate %.2f%%\n", expect ? 100.0 * missing / expect : 0); + + nng_mtx_unlock(pa.mtx); +} |
