summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-02-27 14:47:13 -0800
committerGarrett D'Amore <garrett@damore.org>2018-02-28 07:11:41 -0800
commit65e01d3c7917983d5066335cffb84c93494f1b14 (patch)
tree618260276445adfd4c9d279d59248d780f93d23a
parentfeeec674529cf563f8507c307ae833dc93044664 (diff)
downloadnng-65e01d3c7917983d5066335cffb84c93494f1b14.tar.gz
nng-65e01d3c7917983d5066335cffb84c93494f1b14.tar.bz2
nng-65e01d3c7917983d5066335cffb84c93494f1b14.zip
fixes #240 nngcat is MIA
This is intended to provide compatibility with, and has been tested against, legacy nanocat. There are a few differences though. At this time support for the alias names (where argv[0] is set to something like nngreq or somesuch) is missing. By default this library operations without NNG_FLAG_NONBLOCK on dial and listen, so that failures here are immediately diagnosable. (This behavior can be changed with the --async flag.) By default --pair means PAIRv1, but you can specify --pair0 or --pair1 explicitly. (There is also a --compat mode, and in that mode --pair means PAIRv0. The --compat mode also turns on NNG_FLAG_NONBLOCK by default.) The "quoted" mode also quotes tabs. (Legacy nanocat did not.) It is possible to connect to *multiple* peers by using the --dial or --listen (or similar) options multiple times. Shorthands can be used for long options that are not ambiguous. For example, --surv can be used to mean surveyor, but --re is invalid because it can mean req, rep, or respondent. We assume you have a reasonable standard C environment. This won't work in embedded environments without support for FILE *. TLS options are missing but to be added soon. A man page is still to be written.
-rw-r--r--CMakeLists.txt56
-rw-r--r--src/nng.c2
-rw-r--r--src/nng.h2
-rw-r--r--src/protocol/pair0/pair.c1
-rw-r--r--src/protocol/pipeline0/pull.c1
-rw-r--r--src/protocol/pipeline0/push.c1
-rw-r--r--src/supplemental/util/options.c4
-rw-r--r--tools/nngcat/CMakeLists.txt16
-rw-r--r--tools/nngcat/nngcat.c912
9 files changed, 942 insertions, 53 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ece149a9..77b00f80 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -91,7 +91,7 @@ endif()
option (NNG_ENABLE_DOC "Enable building documentation." ON)
option (NNG_TESTS "Build and run tests" ON)
-option (NNG_TOOLS "Build extra tools" OFF)
+option (NNG_TOOLS "Build extra tools" ON)
option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS})
option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF)
# Enable access to private APIs for our own use.
@@ -404,8 +404,7 @@ endif()
# Build the tools
if (NNG_ENABLE_NNGCAT)
- add_executable (nanocat tools/nngcat.c tools/options.c)
- target_link_libraries (nanocat ${PROJECT_NAME})
+ add_subdirectory (tools/nngcat)
endif ()
if (NNG_ENABLE_DOC)
@@ -456,59 +455,14 @@ if (NNG_ENABLE_DOC)
endmacro (add_libnng_man)
- if (NNG_ENABLE_NNGCAT)
- add_libnng_man (nngcat 1)
- endif ()
-
#add_libnng_man (nn_errno 3)
- #add_libnng_man (nn_strerror 3)
- #add_libnng_man (nn_symbol 3)
- #add_libnng_man (nn_symbol_info 3)
- #add_libnng_man (nn_allocmsg 3)
- #add_libnng_man (nn_reallocmsg 3)
- #add_libnng_man (nn_freemsg 3)
- #add_libnng_man (nn_socket 3)
- #add_libnng_man (nn_close 3)
- #add_libnng_man (nn_get_statistic 3)
- #add_libnng_man (nn_getsockopt 3)
- #add_libnng_man (nn_setsockopt 3)
- #add_libnng_man (nn_bind 3)
- #add_libnng_man (nn_connect 3)
- #add_libnng_man (nn_shutdown 3)
- #add_libnng_man (nn_send 3)
- #add_libnng_man (nn_recv 3)
- #add_libnng_man (nn_sendmsg 3)
- #add_libnng_man (nn_recvmsg 3)
- #add_libnng_man (nn_device 3)
- #add_libnng_man (nn_cmsg 3)
- #add_libnng_man (nn_poll 3)
- #add_libnng_man (nn_term 3)
#add_libnng_man (nanomsg 7)
- #add_libnng_man (nn_pair 7)
- #add_libnng_man (nn_reqrep 7)
- #add_libnng_man (nn_pubsub 7)
- #add_libnng_man (nn_survey 7)
- #add_libnng_man (nn_pipeline 7)
- #add_libnng_man (nn_bus 7)
- #add_libnng_man (nn_inproc 7)
- #add_libnng_man (nn_ipc 7)
- #add_libnng_man (nn_tcp 7)
- #add_libnng_man (nn_ws 7)
- #add_libnng_man (nn_env 7)
-
- add_custom_target (man ALL DEPENDS ${NNG_MANS})
- add_custom_target (html ALL DEPENDS ${NNG_HTMLS})
-
-endif ()
-#install (TARGETS LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
-#install (TARGETS ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
-#install (FILES src/nng.h DESTINATION include/nng)
+ #add_custom_target (man ALL DEPENDS ${NNG_MANS})
+ #add_custom_target (html ALL DEPENDS ${NNG_HTMLS})
-if (NNG_ENABLE_NNGCAT)
- install (TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
-endif()
+endif ()
set (CPACK_PACKAGE_NAME ${PROJECT_NAME})
set (CPACK_PACKAGE_VERSION ${NNG_PACKAGE_VERSION})
diff --git a/src/nng.c b/src/nng.c
index b906431b..c2e3161d 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -732,6 +732,8 @@ static const struct {
{ NNG_EWRITEONLY, "Write only resource" },
{ NNG_ECRYPTO, "Cryptographic error" },
{ NNG_EPEERAUTH, "Peer could not be authenticated" },
+ { NNG_ENOARG, "Option requires argument" },
+ { NNG_EAMBIGUOUS, "Ambiguous option" },
{ NNG_EINTERNAL, "Internal error detected" },
{ 0, NULL },
// clang-format on
diff --git a/src/nng.h b/src/nng.h
index 779612c7..b3acc86c 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -590,6 +590,8 @@ enum nng_errno_enum {
NNG_EWRITEONLY = 25,
NNG_ECRYPTO = 26,
NNG_EPEERAUTH = 27,
+ NNG_ENOARG = 28,
+ NNG_EAMBIGUOUS = 29,
NNG_EINTERNAL = 1000,
NNG_ESYSERR = 0x10000000,
NNG_ETRANERR = 0x20000000,
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 10498c0a..d3591df9 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -12,6 +12,7 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pair0/pair.h"
// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern.
// While a peer is connected to the server, all other peer connection
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 76c680c1..ccc1ef40 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -12,6 +12,7 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pipeline0/pull.h"
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 1e6cf30e..e77bfe99 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -12,6 +12,7 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pipeline0/push.h"
// Push protocol. The PUSH protocol is the "write" side of a pipeline.
// Push distributes fairly, or tries to, by giving messages in round-robin
diff --git a/src/supplemental/util/options.c b/src/supplemental/util/options.c
index d8f78deb..a8cc969d 100644
--- a/src/supplemental/util/options.c
+++ b/src/supplemental/util/options.c
@@ -90,7 +90,7 @@ nng_opts_parse(int argc, const char **argv, const nng_optspec *opts, int *val,
break;
default:
// Ambiguous (not match)
- return (NNG_EINVAL);
+ return (NNG_EAMBIGUOUS);
break;
}
@@ -113,7 +113,7 @@ nng_opts_parse(int argc, const char **argv, const nng_optspec *opts, int *val,
} else {
i++;
if (i >= argc) {
- return (NNG_EINVAL);
+ return (NNG_ENOARG);
}
*optarg = argv[i];
}
diff --git a/tools/nngcat/CMakeLists.txt b/tools/nngcat/CMakeLists.txt
new file mode 100644
index 00000000..3864c8e4
--- /dev/null
+++ b/tools/nngcat/CMakeLists.txt
@@ -0,0 +1,16 @@
+#
+# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2018 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+if (NNG_ENABLE_NNGCAT)
+ add_executable (nngcat nngcat.c)
+ target_include_directories (nngcat PUBLIC ${PROJECT_SOURCE_DIR}/src)
+ target_link_libraries (nngcat ${PROJECT_NAME})
+ install (TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
+endif()
diff --git a/tools/nngcat/nngcat.c b/tools/nngcat/nngcat.c
new file mode 100644
index 00000000..8c8ce34f
--- /dev/null
+++ b/tools/nngcat/nngcat.c
@@ -0,0 +1,912 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "nng.h"
+#include "protocol/bus0/bus.h"
+#include "protocol/pair0/pair.h"
+#include "protocol/pair1/pair.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "protocol/pubsub0/pub.h"
+#include "protocol/pubsub0/sub.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "supplemental/util/options.h"
+#include "supplemental/util/platform.h"
+
+// Globals. We need this to avoid passing around everything.
+int format = 0;
+int proto = 0;
+int verbose = 0;
+int delay = 0;
+nng_duration interval = NNG_DURATION_INFINITE;
+nng_duration sendtimeo = NNG_DURATION_INFINITE;
+nng_duration recvtimeo = NNG_DURATION_INFINITE;
+void * data = NULL;
+size_t datalen = 0;
+size_t datacap = 0;
+int compat = 0;
+int async = 0;
+
+// Options, must start at 1 because zero is sentinel.
+enum options {
+ OPT_HELP = 1,
+ OPT_VERBOSE,
+ OPT_SILENT,
+ OPT_REP0,
+ OPT_REQ0,
+ OPT_PUSH0,
+ OPT_PULL0,
+ OPT_PUB0,
+ OPT_SUB0,
+ OPT_SURVEY0,
+ OPT_RESPOND0,
+ OPT_PAIR0,
+ OPT_PAIR1,
+ OPT_PAIR,
+ OPT_BUS0,
+ OPT_DIAL,
+ OPT_LISTEN,
+ OPT_DATA,
+ OPT_FILE,
+ OPT_SUBSCRIBE,
+ OPT_INTERVAL,
+ OPT_DELAY,
+ OPT_FORMAT,
+ OPT_RAW,
+ OPT_ASCII,
+ OPT_QUOTED,
+ OPT_MSGPACK,
+ OPT_HEX,
+ OPT_BLANK, // no printing, not an actual option.
+ OPT_COMPAT,
+ OPT_ASYNC,
+ OPT_RCV_TIMEO,
+ OPT_SND_TIMEO,
+ OPT_SOCK_NAME,
+ OPT_LISTEN_IPC,
+ OPT_DIAL_IPC,
+ OPT_LISTEN_LOCAL,
+ OPT_DIAL_LOCAL,
+};
+
+static nng_optspec opts[] = {
+ { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP },
+ { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE },
+ { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT },
+ { .o_name = "req0", .o_val = OPT_REQ0 },
+ { .o_name = "rep0", .o_val = OPT_REP0 },
+ { .o_name = "push0", .o_val = OPT_PUSH0 },
+ { .o_name = "pull0", .o_val = OPT_PULL0 },
+ { .o_name = "pub0", .o_val = OPT_PUB0 },
+ { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias
+ { .o_name = "sub0", .o_val = OPT_SUB0 },
+ { .o_name = "surveyor0", .o_val = OPT_SURVEY0 },
+ { .o_name = "respondent0", .o_val = OPT_RESPOND0 },
+ { .o_name = "pair", .o_val = OPT_PAIR },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true },
+ { .o_name = "listen", .o_val = OPT_DIAL, .o_arg = true },
+ { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true },
+ { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true },
+ { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true },
+ { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true },
+ { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII },
+ { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED },
+ { .o_name = "raw", .o_val = OPT_RAW },
+ { .o_name = "hex", .o_val = OPT_HEX },
+ { .o_name = "compat", .o_val = OPT_COMPAT },
+ { .o_name = "async", .o_val = OPT_ASYNC },
+ {
+ .o_name = "delay",
+ .o_short = 'd',
+ .o_val = OPT_DELAY,
+ .o_arg = true,
+ },
+ {
+ .o_name = "interval",
+ .o_short = 'i',
+ .o_val = OPT_INTERVAL,
+ .o_arg = true,
+ },
+ { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true },
+ { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true },
+ { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true },
+
+ // Legacy compatibility with nanocat.
+ { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true },
+ { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true },
+ {
+ .o_name = "bind-ipc",
+ .o_short = 'X',
+ .o_val = OPT_LISTEN_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-ipc",
+ .o_short = 'x',
+ .o_val = OPT_DIAL_IPC,
+ .o_arg = true,
+ },
+ {
+ .o_name = "bind-local",
+ .o_short = 'L',
+ .o_val = OPT_LISTEN_LOCAL,
+ .o_arg = true,
+ },
+ {
+ .o_name = "connect-local",
+ .o_short = 'l',
+ .o_val = OPT_DIAL_LOCAL,
+ .o_arg = true,
+ },
+
+ // Sentinel.
+ { .o_name = NULL, .o_val = 0 },
+};
+
+static void
+fatal(const char *msg, ...)
+{
+ va_list ap;
+ va_start(ap, msg);
+ vfprintf(stderr, msg, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+static void
+help(void)
+{
+ printf(
+ "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n");
+ printf("<proto> must be one of:\n");
+ printf(" --req0 (or alias --req)\n");
+ printf(" --rep0 (or alias --rep)\n");
+ printf(" --pub0 (or alias --pub)\n");
+ printf(" --sub0 (or alias --sub)\n");
+ printf(" --push0 (or alias --push)\n");
+ printf(" --pull0 (or alias --pull)\n");
+ printf(" --surveyor0 (or alias --surveyor)\n");
+ printf(" --respondent0 (or alias --respondent)\n");
+ printf(" --pair0\n");
+ printf(" --pair1\n");
+ printf(" --pair (alias for either pair0 or pair1)\n");
+ printf("\n<addr> must be one or more of:\n");
+ printf(" --dial <url> (or alias --connect <url>)\n");
+ printf(" --listen <url> (or alias --bind <url>)\n");
+ printf(" --bind-local <port> (or alias -L <port>)\n");
+ printf(" --connect-local <port> (or alias -l <port>)\n");
+ printf(" --bind-ipc <path> (or alias -X <path>)\n");
+ printf(" --connect-ipc <path> (or alias -x <path>)\n");
+ printf("\n<fmt> may be one or none of:\n");
+ printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n");
+ printf(" --ascii (or alias -A)\n");
+ printf(" --quoted (or alias -Q)\n");
+ printf(" --hex\n");
+ printf(" --msgpack\n");
+ printf(" --raw\n");
+ printf("\n<opts> may be any of:\n");
+ printf(" --subscribe <topic> (only with --sub protocol)\n");
+ printf(" --silent (or alias -q)\n");
+ printf(" --verbose (or alias -v)\n");
+ printf(" --compat\n");
+ printf(" --async\n");
+ printf("\n<src> may be one of:\n");
+ printf(" --file <file> (or alias -F <file>)\n");
+ printf(" --data <data> (or alias -D <data>)\n");
+ exit(1);
+}
+
+static int
+intarg(const char *val, int maxv)
+{
+ int v = 0;
+
+ if (val[0] == '\0') {
+ fatal("Empty integer argument.");
+ }
+ while (*val != '\0') {
+ if (!isdigit(*val)) {
+ fatal("Integer argument expected.");
+ }
+ v *= 10;
+ v += ((*val) - '0');
+ val++;
+ if (v > maxv) {
+ fatal("Integer argument too large.");
+ }
+ }
+ if (v < 0) {
+ fatal("Integer argument overflow.");
+ }
+ return (v);
+}
+
+struct addr {
+ struct addr *next;
+ int mode;
+ char * val;
+};
+
+struct addr **
+addaddr(struct addr **endp, int mode, const char *a)
+{
+ struct addr *na;
+
+ if (((na = malloc(sizeof(*na))) == NULL) ||
+ ((na->val = malloc(strlen(a) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ na->mode = mode;
+ memcpy(na->val, a, strlen(a) + 1);
+ na->next = NULL;
+ *endp = na;
+ return (&na->next);
+}
+
+struct topic {
+ struct topic *next;
+ char * val;
+};
+
+struct topic **
+addtopic(struct topic **endp, const char *s)
+{
+ struct topic *t;
+
+ if (((t = malloc(sizeof(*t))) == NULL) ||
+ ((t->val = malloc(strlen(s) + 1)) == NULL)) {
+ fatal("Out of memory.");
+ }
+ memcpy(t->val, s, strlen(s) + 1);
+ t->next = NULL;
+ *endp = t;
+ return (&t->next);
+}
+
+void
+printmsg(char *buf, size_t len)
+{
+ switch (format) {
+ case OPT_BLANK: // Suppress contents.
+ return;
+ case OPT_RAW: // Just emit raw contents.
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_ASCII: // ASCII, but non-ASCII substituted with '.'
+ for (size_t i = 0; i < len; i++) {
+ if (isprint(buf[i])) {
+ putchar(buf[i]);
+ } else {
+ putchar('.');
+ }
+ }
+ break;
+ case OPT_QUOTED: // C style quoted strings.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ switch (buf[i]) {
+ case '\n':
+ putchar('\\');
+ putchar('n');
+ break;
+ case '\r':
+ putchar('\\');
+ putchar('r');
+ break;
+ case '\t':
+ putchar('\\');
+ putchar('t');
+ break;
+ case '"':
+ case '\\':
+ putchar('\\');
+ putchar(buf[i]);
+ break;
+ default:
+ if (isprint(buf[i])) {
+ fputc(buf[i], stdout);
+ } else {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ }
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then raw.
+ if (len < 256) {
+ putchar('\xc4');
+ putchar(len & 0xff);
+ } else if (len < 65536) {
+ putchar('\xc5');
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ } else {
+ putchar('\xc6');
+ putchar((len >> 24) & 0xff);
+ putchar((len >> 16) & 0xff);
+ putchar((len >> 8) & 0xff);
+ putchar(len & 0xff);
+ }
+ fwrite(buf, 1, len, stdout);
+ break;
+ case OPT_HEX: // Hex, quoted C string encoded with hex literals.
+ putchar('"');
+ for (size_t i = 0; i < len; i++) {
+ printf("\\x%02x", (uint8_t) buf[i]);
+ }
+ putchar('"');
+ putchar('\n');
+ break;
+ }
+ fflush(stdout);
+}
+
+void
+recvloop(nng_socket sock)
+{
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ switch (rv) {
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // Either a regular timeout, or we reached the end
+ // of an event like a survey completing.
+ return;
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ default:
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ }
+}
+
+void
+resploop(nng_socket sock)
+{
+ for (;;) {
+ int rv;
+ nng_msg *msg;
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ if (rv != 0) {
+ fatal("Receive error: %s", nng_strerror(rv));
+ }
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_clear(msg);
+ if ((rv = nng_msg_append(msg, data, datalen)) != 0) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ }
+}
+
+void
+sendloop(nng_socket sock)
+{
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ // By default, we don't loop.
+ if (interval < 0) {
+ break;
+ }
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course if we
+ // took more than the interval here, then we skip the sleep
+ // altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+}
+
+void
+sendrecv(nng_socket sock)
+{
+ nng_duration iv = 0;
+ if (data == NULL) {
+ fatal("No data to send (specify with --data or --file)");
+ }
+ if (delay > 0) {
+ nng_msleep(delay);
+ }
+ if (interval > 0) {
+ iv = interval;
+ }
+
+ // We start by sending a message, then we receive iteratively
+ // but we set a concrete timeout if interval is set, to ensure
+ // that we exit the receive loop, and can continue.
+ for (;;) {
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ nng_duration delta;
+
+ start = nng_clock();
+ if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
+ ((rv = nng_msg_append(msg, data, datalen)) != 0)) {
+ fatal(nng_strerror(rv));
+ }
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("Send error: %s", nng_strerror(rv));
+ }
+ if (interval < 0) {
+ // Only one iteration through.
+ recvloop(sock);
+ break;
+ }
+
+ // We would like to use recvloop, but we need to reset our
+ // timeout each time, as the timer counts down towards zero.
+
+ for (;;) {
+ delta = (nng_duration)(nng_clock() - start);
+
+ nng_duration expire = interval - delta;
+ if ((recvtimeo >= 0) && (expire > recvtimeo)) {
+ expire = recvtimeo;
+ }
+ rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, expire);
+ if (rv != 0) {
+ fatal("Cannot set recv timeout: %s",
+ nng_strerror(rv));
+ }
+
+ rv = nng_recvmsg(sock, &msg, 0);
+ switch (rv) {
+ case 0:
+ printmsg(nng_msg_body(msg), nng_msg_len(msg));
+ nng_msg_free(msg);
+ continue;
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ // We're done receiving
+ break;
+ default:
+ fatal("Cannot receive: %s", nng_strerror(rv));
+ break;
+ }
+
+ // We're done receiving, break out.
+ break;
+ }
+
+ end = nng_clock();
+ delta = (nng_duration)(end - start);
+
+ // We sleep, but we account for time spent, so that our
+ // interval appears more or less constant. Of course if we
+ // took more than the interval here, then we skip the sleep
+ // altogether.
+ if ((delta >= 0) && (delta < interval)) {
+ nng_msleep(interval - delta);
+ }
+ }
+}
+
+int
+main(int ac, const char **av)
+{
+ int idx;
+ const char * arg;
+ int val;
+ int rv;
+ char scratch[512];
+ struct addr * addrs = NULL;
+ struct addr ** addrend;
+ struct topic * topics = NULL;
+ struct topic **topicend;
+ nng_socket sock;
+ int port;
+ FILE * f;
+
+ idx = 1;
+ addrend = &addrs;
+ topicend = &topics;
+
+ while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) {
+ switch (val) {
+ case OPT_HELP:
+ help();
+ break;
+ case OPT_REQ0:
+ case OPT_REP0:
+ case OPT_SUB0:
+ case OPT_PUB0:
+ case OPT_BUS0:
+ case OPT_SURVEY0:
+ case OPT_RESPOND0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_PUSH0:
+ case OPT_PULL0:
+ if (proto != 0) {
+ fatal("Only one protocol may be "
+ "specified.");
+ }
+ proto = val;
+ break;
+ case OPT_DIAL:
+ case OPT_LISTEN:
+ addrend = addaddr(addrend, val, arg);
+ break;
+ case OPT_DIAL_LOCAL:
+ case OPT_LISTEN_LOCAL:
+ port = intarg(arg, 65536);
+ snprintf(scratch, sizeof(scratch),
+ "tcp://127.0.0.1:%d", port);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_DIAL_IPC:
+ case OPT_LISTEN_IPC:
+ snprintf(scratch, sizeof(scratch), "ipc:///%s", arg);
+ addrend = addaddr(addrend, val, scratch);
+ break;
+ case OPT_SUBSCRIBE:
+ topicend = addtopic(topicend, arg);
+ break;
+ case OPT_VERBOSE:
+ case OPT_SILENT:
+ verbose = val;
+ break;
+ case OPT_DELAY:
+ delay = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_INTERVAL:
+ interval = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_SND_TIMEO:
+ sendtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_RCV_TIMEO:
+ recvtimeo = intarg(arg, 86400) * 1000; // max 1 day
+ break;
+ case OPT_COMPAT:
+ compat = 1;
+ break;
+ case OPT_ASYNC:
+ async = NNG_FLAG_NONBLOCK;
+ break;
+ case OPT_ASCII:
+ case OPT_RAW:
+ case OPT_QUOTED:
+ case OPT_MSGPACK:
+ case OPT_HEX:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ format = val;
+ break;
+ case OPT_FORMAT:
+ if (format != 0) {
+ fatal("Format may be specified only "
+ "once.");
+ }
+ if (strcmp(arg, "no") == 0) {
+ format = OPT_BLANK;
+ } else if (strcmp(arg, "ascii") == 0) {
+ format = OPT_ASCII;
+ } else if (strcmp(arg, "hex") == 0) {
+ format = OPT_HEX;
+ } else if (strcmp(arg, "quoted") == 0) {
+ format = OPT_QUOTED;
+ } else if (strcmp(arg, "raw") == 0) {
+ format = OPT_RAW;
+ } else if (strcmp(arg, "msgpack") == 0) {
+ format = OPT_MSGPACK;
+ } else {
+ fatal("Invalid format specified.");
+ }
+ break;
+ case OPT_FILE:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be specified "
+ "only once.");
+ }
+ if ((f = fopen(arg, "r")) == NULL) {
+ fatal("Cannot open file %s: %s", arg,
+ strerror(errno));
+ }
+ for (;;) {
+ size_t n;
+ // Read until end of file, reallocating as
+ // needed.
+ if (datacap == 0) {
+ data = malloc(4096);
+ datacap = 4096;
+ } else if (datacap == datalen) {
+ void *odata = data;
+ datacap *= 2;
+ data = realloc(odata, datacap);
+ if (data == NULL) {
+ free(odata);
+ }
+ }
+ if (data == NULL) {
+ fatal("Out of memory.");
+ }
+ n = fread((char *) data + datalen, 1,
+ datacap - datalen, f);
+ if (n == 0) {
+ if (ferror(f)) {
+ fatal(
+ "Read file %s failed: %s",
+ arg, strerror(errno));
+ }
+ break;
+ }
+ }
+ fclose(f);
+ break;
+ case OPT_DATA:
+ if (data != NULL) {
+ fatal("Data (--file, --data) may be specified "
+ "only once.");
+ }
+ if ((data = malloc(strlen(arg) + 1)) == NULL) {
+ fatal("Out of memory.");
+ }
+ memcpy(data, arg, strlen(arg) + 1);
+ datacap = strlen(arg) + 1;
+ datalen = strlen(arg);
+ break;
+ }
+ }
+ switch (rv) {
+ case NNG_EINVAL:
+ fatal("Option %s is invalid.", av[idx]);
+ break;
+ case NNG_EAMBIGUOUS:
+ fatal("Option %s is ambiguous (specify in full).", av[idx]);
+ break;
+ case NNG_ENOARG:
+ fatal("Option %s requires argument.", av[idx]);
+ break;
+ }
+
+ if (addrs == NULL) {
+ fatal("No address specified.");
+ }
+
+ if (compat) {
+ if (async != 0) {
+ fatal("Option --async and --compat are incompatible.");
+ }
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR0;
+ }
+ if (proto == OPT_PAIR1) {
+ fatal("Protocol does not support --compat.");
+ }
+ async = NNG_FLAG_NONBLOCK;
+ } else {
+ if (proto == OPT_PAIR) {
+ proto = OPT_PAIR1;
+ }
+ }
+ if (proto == OPT_SUB0) {
+ if (topics == NULL) {
+ topicend = addtopic(topicend, ""); // subscribe to all
+ }
+ } else {
+ if (topics != NULL) {
+ fatal("Protocol does not support --subscribe.");
+ }
+ }
+
+ switch (proto) {
+ case OPT_PULL0:
+ case OPT_SUB0:
+ if (data != NULL) {
+ fatal("Protocol does not support --file or "
+ "--data.");
+ }
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_SURVEY0:
+ case OPT_PUB0:
+ case OPT_REQ0:
+ if (data == NULL) {
+ fatal("Protocol requires either --file or "
+ "--data.");
+ }
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (interval >= 0) {
+ fatal("Protocol does not support --interval.");
+ }
+ break;
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ case OPT_BUS0:
+ if ((interval >= 0) && (data == NULL)) {
+ fatal("Option --interval requires either "
+ "--file or --data.");
+ }
+ break;
+ }
+
+ switch (proto) {
+ case OPT_REQ0:
+ rv = nng_req0_open(&sock);
+ break;
+ case OPT_REP0:
+ rv = nng_rep0_open(&sock);
+ break;
+ case OPT_SUB0:
+ rv = nng_sub0_open(&sock);
+ break;
+ case OPT_PUB0:
+ rv = nng_pub0_open(&sock);
+ break;
+ case OPT_PAIR0:
+ rv = nng_pair0_open(&sock);
+ break;
+ case OPT_PAIR1:
+ rv = nng_pair1_open(&sock);
+ break;
+ case OPT_BUS0:
+ rv = nng_bus0_open(&sock);
+ break;
+ case OPT_PUSH0:
+ rv = nng_push0_open(&sock);
+ break;
+ case OPT_PULL0:
+ rv = nng_pull0_open(&sock);
+ break;
+ case OPT_SURVEY0:
+ rv = nng_surveyor0_open(&sock);
+ break;
+ case OPT_RESPOND0:
+ rv = nng_respondent0_open(&sock);
+ break;
+ case 0:
+ default:
+ fatal("No protocol specified.");
+ break;
+ }
+ if (rv != 0) {
+ fatal("Unable to open socket: %s", nng_strerror(rv));
+ }
+
+ for (struct topic *t = topics; t != NULL; t = t->next) {
+ rv = nng_setopt(
+ sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val));
+ if (rv != 0) {
+ fatal("Unable to subscribe to topic %s: %s", t->val,
+ nng_strerror(rv));
+ }
+ }
+
+ if ((sendtimeo > 0) &&
+ ((rv = nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) != 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+ if ((recvtimeo > 0) &&
+ ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) != 0)) {
+ fatal("Unable to set send timeout: %s", nng_strerror(rv));
+ }
+
+ // XXX: TBD: This is where we should add other socket options,
+ // like TLS configuration, timeouts, etc.
+
+ for (struct addr *a = addrs; a != NULL; a = a->next) {
+ char *act;
+ switch (a->mode) {
+ case OPT_DIAL:
+ case OPT_DIAL_IPC:
+ case OPT_DIAL_LOCAL:
+ rv = nng_dial(sock, a->val, NULL, async);
+ act = "dial";
+ break;
+ case OPT_LISTEN:
+ case OPT_LISTEN_IPC:
+ case OPT_LISTEN_LOCAL:
+ rv = nng_listen(sock, a->val, NULL, async);
+ act = "listen";
+ break;
+ default:
+ fatal("Invalid address mode! (Bug!)");
+ }
+
+ if (rv != 0) {
+ fatal("Unable to %s on %s: %s", act, a->val,
+ nng_strerror(rv));
+ }
+ }
+
+ switch (proto) {
+ case OPT_SUB0:
+ case OPT_PULL0:
+ recvloop(sock);
+ break;
+ case OPT_REP0:
+ case OPT_RESPOND0:
+ if (data == NULL) {
+ recvloop(sock);
+ } else {
+ resploop(sock);
+ }
+ break;
+ case OPT_PUSH0:
+ case OPT_PUB0:
+ sendloop(sock);
+ break;
+ case OPT_BUS0:
+ case OPT_PAIR0:
+ case OPT_PAIR1:
+ if (data != NULL) {
+ sendrecv(sock);
+ } else {
+ recvloop(sock);
+ }
+ break;
+ case OPT_REQ0:
+ case OPT_SURVEY0:
+ sendrecv(sock);
+ break;
+ default:
+ fatal("Protocol handling unimplmented.");
+ }
+
+ exit(0);
+}