diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-02-27 14:47:13 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-02-28 07:11:41 -0800 |
| commit | 65e01d3c7917983d5066335cffb84c93494f1b14 (patch) | |
| tree | 618260276445adfd4c9d279d59248d780f93d23a | |
| parent | feeec674529cf563f8507c307ae833dc93044664 (diff) | |
| download | nng-65e01d3c7917983d5066335cffb84c93494f1b14.tar.gz nng-65e01d3c7917983d5066335cffb84c93494f1b14.tar.bz2 nng-65e01d3c7917983d5066335cffb84c93494f1b14.zip | |
fixes #240 nngcat is MIA
This is intended to provide compatibility with, and has been tested
against, legacy nanocat. There are a few differences though.
At this time support for the alias names (where argv[0] is set to
something like nngreq or somesuch) is missing.
By default this library operations without NNG_FLAG_NONBLOCK on
dial and listen, so that failures here are immediately diagnosable.
(This behavior can be changed with the --async flag.)
By default --pair means PAIRv1, but you can specify --pair0
or --pair1 explicitly. (There is also a --compat mode, and in
that mode --pair means PAIRv0. The --compat mode also turns on
NNG_FLAG_NONBLOCK by default.)
The "quoted" mode also quotes tabs. (Legacy nanocat did not.)
It is possible to connect to *multiple* peers by using the --dial
or --listen (or similar) options multiple times.
Shorthands can be used for long options that are not ambiguous. For
example, --surv can be used to mean surveyor, but --re is invalid because
it can mean req, rep, or respondent.
We assume you have a reasonable standard C environment. This won't work
in embedded environments without support for FILE *.
TLS options are missing but to be added soon.
A man page is still to be written.
| -rw-r--r-- | CMakeLists.txt | 56 | ||||
| -rw-r--r-- | src/nng.c | 2 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c | 1 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 1 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c | 1 | ||||
| -rw-r--r-- | src/supplemental/util/options.c | 4 | ||||
| -rw-r--r-- | tools/nngcat/CMakeLists.txt | 16 | ||||
| -rw-r--r-- | tools/nngcat/nngcat.c | 912 |
9 files changed, 942 insertions, 53 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index ece149a9..77b00f80 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,7 +91,7 @@ endif() option (NNG_ENABLE_DOC "Enable building documentation." ON) option (NNG_TESTS "Build and run tests" ON) -option (NNG_TOOLS "Build extra tools" OFF) +option (NNG_TOOLS "Build extra tools" ON) option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS}) option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF) # Enable access to private APIs for our own use. @@ -404,8 +404,7 @@ endif() # Build the tools if (NNG_ENABLE_NNGCAT) - add_executable (nanocat tools/nngcat.c tools/options.c) - target_link_libraries (nanocat ${PROJECT_NAME}) + add_subdirectory (tools/nngcat) endif () if (NNG_ENABLE_DOC) @@ -456,59 +455,14 @@ if (NNG_ENABLE_DOC) endmacro (add_libnng_man) - if (NNG_ENABLE_NNGCAT) - add_libnng_man (nngcat 1) - endif () - #add_libnng_man (nn_errno 3) - #add_libnng_man (nn_strerror 3) - #add_libnng_man (nn_symbol 3) - #add_libnng_man (nn_symbol_info 3) - #add_libnng_man (nn_allocmsg 3) - #add_libnng_man (nn_reallocmsg 3) - #add_libnng_man (nn_freemsg 3) - #add_libnng_man (nn_socket 3) - #add_libnng_man (nn_close 3) - #add_libnng_man (nn_get_statistic 3) - #add_libnng_man (nn_getsockopt 3) - #add_libnng_man (nn_setsockopt 3) - #add_libnng_man (nn_bind 3) - #add_libnng_man (nn_connect 3) - #add_libnng_man (nn_shutdown 3) - #add_libnng_man (nn_send 3) - #add_libnng_man (nn_recv 3) - #add_libnng_man (nn_sendmsg 3) - #add_libnng_man (nn_recvmsg 3) - #add_libnng_man (nn_device 3) - #add_libnng_man (nn_cmsg 3) - #add_libnng_man (nn_poll 3) - #add_libnng_man (nn_term 3) #add_libnng_man (nanomsg 7) - #add_libnng_man (nn_pair 7) - #add_libnng_man (nn_reqrep 7) - #add_libnng_man (nn_pubsub 7) - #add_libnng_man (nn_survey 7) - #add_libnng_man (nn_pipeline 7) - #add_libnng_man (nn_bus 7) - #add_libnng_man (nn_inproc 7) - #add_libnng_man (nn_ipc 7) - #add_libnng_man (nn_tcp 7) - #add_libnng_man (nn_ws 7) - #add_libnng_man (nn_env 7) - - add_custom_target (man ALL DEPENDS ${NNG_MANS}) - add_custom_target (html ALL DEPENDS ${NNG_HTMLS}) - -endif () -#install (TARGETS LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) -#install (TARGETS ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) -#install (FILES src/nng.h DESTINATION include/nng) + #add_custom_target (man ALL DEPENDS ${NNG_MANS}) + #add_custom_target (html ALL DEPENDS ${NNG_HTMLS}) -if (NNG_ENABLE_NNGCAT) - install (TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) -endif() +endif () set (CPACK_PACKAGE_NAME ${PROJECT_NAME}) set (CPACK_PACKAGE_VERSION ${NNG_PACKAGE_VERSION}) @@ -732,6 +732,8 @@ static const struct { { NNG_EWRITEONLY, "Write only resource" }, { NNG_ECRYPTO, "Cryptographic error" }, { NNG_EPEERAUTH, "Peer could not be authenticated" }, + { NNG_ENOARG, "Option requires argument" }, + { NNG_EAMBIGUOUS, "Ambiguous option" }, { NNG_EINTERNAL, "Internal error detected" }, { 0, NULL }, // clang-format on @@ -590,6 +590,8 @@ enum nng_errno_enum { NNG_EWRITEONLY = 25, NNG_ECRYPTO = 26, NNG_EPEERAUTH = 27, + NNG_ENOARG = 28, + NNG_EAMBIGUOUS = 29, NNG_EINTERNAL = 1000, NNG_ESYSERR = 0x10000000, NNG_ETRANERR = 0x20000000, diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 10498c0a..d3591df9 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -12,6 +12,7 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pair0/pair.h" // Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern. // While a peer is connected to the server, all other peer connection diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 76c680c1..ccc1ef40 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -12,6 +12,7 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pipeline0/pull.h" // Pull protocol. The PULL protocol is the "read" side of a pipeline. diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 1e6cf30e..e77bfe99 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -12,6 +12,7 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pipeline0/push.h" // Push protocol. The PUSH protocol is the "write" side of a pipeline. // Push distributes fairly, or tries to, by giving messages in round-robin diff --git a/src/supplemental/util/options.c b/src/supplemental/util/options.c index d8f78deb..a8cc969d 100644 --- a/src/supplemental/util/options.c +++ b/src/supplemental/util/options.c @@ -90,7 +90,7 @@ nng_opts_parse(int argc, const char **argv, const nng_optspec *opts, int *val, break; default: // Ambiguous (not match) - return (NNG_EINVAL); + return (NNG_EAMBIGUOUS); break; } @@ -113,7 +113,7 @@ nng_opts_parse(int argc, const char **argv, const nng_optspec *opts, int *val, } else { i++; if (i >= argc) { - return (NNG_EINVAL); + return (NNG_ENOARG); } *optarg = argv[i]; } diff --git a/tools/nngcat/CMakeLists.txt b/tools/nngcat/CMakeLists.txt new file mode 100644 index 00000000..3864c8e4 --- /dev/null +++ b/tools/nngcat/CMakeLists.txt @@ -0,0 +1,16 @@ +# +# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +if (NNG_ENABLE_NNGCAT) + add_executable (nngcat nngcat.c) + target_include_directories (nngcat PUBLIC ${PROJECT_SOURCE_DIR}/src) + target_link_libraries (nngcat ${PROJECT_NAME}) + install (TARGETS nngcat RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) +endif() diff --git a/tools/nngcat/nngcat.c b/tools/nngcat/nngcat.c new file mode 100644 index 00000000..8c8ce34f --- /dev/null +++ b/tools/nngcat/nngcat.c @@ -0,0 +1,912 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <errno.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "nng.h" +#include "protocol/bus0/bus.h" +#include "protocol/pair0/pair.h" +#include "protocol/pair1/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "supplemental/util/options.h" +#include "supplemental/util/platform.h" + +// Globals. We need this to avoid passing around everything. +int format = 0; +int proto = 0; +int verbose = 0; +int delay = 0; +nng_duration interval = NNG_DURATION_INFINITE; +nng_duration sendtimeo = NNG_DURATION_INFINITE; +nng_duration recvtimeo = NNG_DURATION_INFINITE; +void * data = NULL; +size_t datalen = 0; +size_t datacap = 0; +int compat = 0; +int async = 0; + +// Options, must start at 1 because zero is sentinel. +enum options { + OPT_HELP = 1, + OPT_VERBOSE, + OPT_SILENT, + OPT_REP0, + OPT_REQ0, + OPT_PUSH0, + OPT_PULL0, + OPT_PUB0, + OPT_SUB0, + OPT_SURVEY0, + OPT_RESPOND0, + OPT_PAIR0, + OPT_PAIR1, + OPT_PAIR, + OPT_BUS0, + OPT_DIAL, + OPT_LISTEN, + OPT_DATA, + OPT_FILE, + OPT_SUBSCRIBE, + OPT_INTERVAL, + OPT_DELAY, + OPT_FORMAT, + OPT_RAW, + OPT_ASCII, + OPT_QUOTED, + OPT_MSGPACK, + OPT_HEX, + OPT_BLANK, // no printing, not an actual option. + OPT_COMPAT, + OPT_ASYNC, + OPT_RCV_TIMEO, + OPT_SND_TIMEO, + OPT_SOCK_NAME, + OPT_LISTEN_IPC, + OPT_DIAL_IPC, + OPT_LISTEN_LOCAL, + OPT_DIAL_LOCAL, +}; + +static nng_optspec opts[] = { + { .o_name = "help", .o_short = 'h', .o_val = OPT_HELP }, + { .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE }, + { .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT }, + { .o_name = "req0", .o_val = OPT_REQ0 }, + { .o_name = "rep0", .o_val = OPT_REP0 }, + { .o_name = "push0", .o_val = OPT_PUSH0 }, + { .o_name = "pull0", .o_val = OPT_PULL0 }, + { .o_name = "pub0", .o_val = OPT_PUB0 }, + { .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias + { .o_name = "sub0", .o_val = OPT_SUB0 }, + { .o_name = "surveyor0", .o_val = OPT_SURVEY0 }, + { .o_name = "respondent0", .o_val = OPT_RESPOND0 }, + { .o_name = "pair", .o_val = OPT_PAIR }, + { .o_name = "pair0", .o_val = OPT_PAIR0 }, + { .o_name = "pair1", .o_val = OPT_PAIR1 }, + { .o_name = "bus0", .o_val = OPT_BUS0 }, + { .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true }, + { .o_name = "listen", .o_val = OPT_DIAL, .o_arg = true }, + { .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true }, + { .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true }, + { .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true }, + { .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true }, + { .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII }, + { .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED }, + { .o_name = "raw", .o_val = OPT_RAW }, + { .o_name = "hex", .o_val = OPT_HEX }, + { .o_name = "compat", .o_val = OPT_COMPAT }, + { .o_name = "async", .o_val = OPT_ASYNC }, + { + .o_name = "delay", + .o_short = 'd', + .o_val = OPT_DELAY, + .o_arg = true, + }, + { + .o_name = "interval", + .o_short = 'i', + .o_val = OPT_INTERVAL, + .o_arg = true, + }, + { .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true }, + { .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true }, + { .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true }, + + // Legacy compatibility with nanocat. + { .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true }, + { .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true }, + { + .o_name = "bind-ipc", + .o_short = 'X', + .o_val = OPT_LISTEN_IPC, + .o_arg = true, + }, + { + .o_name = "connect-ipc", + .o_short = 'x', + .o_val = OPT_DIAL_IPC, + .o_arg = true, + }, + { + .o_name = "bind-local", + .o_short = 'L', + .o_val = OPT_LISTEN_LOCAL, + .o_arg = true, + }, + { + .o_name = "connect-local", + .o_short = 'l', + .o_val = OPT_DIAL_LOCAL, + .o_arg = true, + }, + + // Sentinel. + { .o_name = NULL, .o_val = 0 }, +}; + +static void +fatal(const char *msg, ...) +{ + va_list ap; + va_start(ap, msg); + vfprintf(stderr, msg, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +static void +help(void) +{ + printf( + "Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n"); + printf("<proto> must be one of:\n"); + printf(" --req0 (or alias --req)\n"); + printf(" --rep0 (or alias --rep)\n"); + printf(" --pub0 (or alias --pub)\n"); + printf(" --sub0 (or alias --sub)\n"); + printf(" --push0 (or alias --push)\n"); + printf(" --pull0 (or alias --pull)\n"); + printf(" --surveyor0 (or alias --surveyor)\n"); + printf(" --respondent0 (or alias --respondent)\n"); + printf(" --pair0\n"); + printf(" --pair1\n"); + printf(" --pair (alias for either pair0 or pair1)\n"); + printf("\n<addr> must be one or more of:\n"); + printf(" --dial <url> (or alias --connect <url>)\n"); + printf(" --listen <url> (or alias --bind <url>)\n"); + printf(" --bind-local <port> (or alias -L <port>)\n"); + printf(" --connect-local <port> (or alias -l <port>)\n"); + printf(" --bind-ipc <path> (or alias -X <path>)\n"); + printf(" --connect-ipc <path> (or alias -x <path>)\n"); + printf("\n<fmt> may be one or none of:\n"); + printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n"); + printf(" --ascii (or alias -A)\n"); + printf(" --quoted (or alias -Q)\n"); + printf(" --hex\n"); + printf(" --msgpack\n"); + printf(" --raw\n"); + printf("\n<opts> may be any of:\n"); + printf(" --subscribe <topic> (only with --sub protocol)\n"); + printf(" --silent (or alias -q)\n"); + printf(" --verbose (or alias -v)\n"); + printf(" --compat\n"); + printf(" --async\n"); + printf("\n<src> may be one of:\n"); + printf(" --file <file> (or alias -F <file>)\n"); + printf(" --data <data> (or alias -D <data>)\n"); + exit(1); +} + +static int +intarg(const char *val, int maxv) +{ + int v = 0; + + if (val[0] == '\0') { + fatal("Empty integer argument."); + } + while (*val != '\0') { + if (!isdigit(*val)) { + fatal("Integer argument expected."); + } + v *= 10; + v += ((*val) - '0'); + val++; + if (v > maxv) { + fatal("Integer argument too large."); + } + } + if (v < 0) { + fatal("Integer argument overflow."); + } + return (v); +} + +struct addr { + struct addr *next; + int mode; + char * val; +}; + +struct addr ** +addaddr(struct addr **endp, int mode, const char *a) +{ + struct addr *na; + + if (((na = malloc(sizeof(*na))) == NULL) || + ((na->val = malloc(strlen(a) + 1)) == NULL)) { + fatal("Out of memory."); + } + na->mode = mode; + memcpy(na->val, a, strlen(a) + 1); + na->next = NULL; + *endp = na; + return (&na->next); +} + +struct topic { + struct topic *next; + char * val; +}; + +struct topic ** +addtopic(struct topic **endp, const char *s) +{ + struct topic *t; + + if (((t = malloc(sizeof(*t))) == NULL) || + ((t->val = malloc(strlen(s) + 1)) == NULL)) { + fatal("Out of memory."); + } + memcpy(t->val, s, strlen(s) + 1); + t->next = NULL; + *endp = t; + return (&t->next); +} + +void +printmsg(char *buf, size_t len) +{ + switch (format) { + case OPT_BLANK: // Suppress contents. + return; + case OPT_RAW: // Just emit raw contents. + fwrite(buf, 1, len, stdout); + break; + case OPT_ASCII: // ASCII, but non-ASCII substituted with '.' + for (size_t i = 0; i < len; i++) { + if (isprint(buf[i])) { + putchar(buf[i]); + } else { + putchar('.'); + } + } + break; + case OPT_QUOTED: // C style quoted strings. + putchar('"'); + for (size_t i = 0; i < len; i++) { + switch (buf[i]) { + case '\n': + putchar('\\'); + putchar('n'); + break; + case '\r': + putchar('\\'); + putchar('r'); + break; + case '\t': + putchar('\\'); + putchar('t'); + break; + case '"': + case '\\': + putchar('\\'); + putchar(buf[i]); + break; + default: + if (isprint(buf[i])) { + fputc(buf[i], stdout); + } else { + printf("\\x%02x", (uint8_t) buf[i]); + } + } + } + putchar('"'); + putchar('\n'); + break; + case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then raw. + if (len < 256) { + putchar('\xc4'); + putchar(len & 0xff); + } else if (len < 65536) { + putchar('\xc5'); + putchar((len >> 8) & 0xff); + putchar(len & 0xff); + } else { + putchar('\xc6'); + putchar((len >> 24) & 0xff); + putchar((len >> 16) & 0xff); + putchar((len >> 8) & 0xff); + putchar(len & 0xff); + } + fwrite(buf, 1, len, stdout); + break; + case OPT_HEX: // Hex, quoted C string encoded with hex literals. + putchar('"'); + for (size_t i = 0; i < len; i++) { + printf("\\x%02x", (uint8_t) buf[i]); + } + putchar('"'); + putchar('\n'); + break; + } + fflush(stdout); +} + +void +recvloop(nng_socket sock) +{ + for (;;) { + int rv; + nng_msg *msg; + + rv = nng_recvmsg(sock, &msg, 0); + switch (rv) { + case NNG_ETIMEDOUT: + case NNG_ESTATE: + // Either a regular timeout, or we reached the end + // of an event like a survey completing. + return; + case 0: + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_free(msg); + continue; + default: + fatal("Receive error: %s", nng_strerror(rv)); + } + } +} + +void +resploop(nng_socket sock) +{ + for (;;) { + int rv; + nng_msg *msg; + + rv = nng_recvmsg(sock, &msg, 0); + if (rv != 0) { + fatal("Receive error: %s", nng_strerror(rv)); + } + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_clear(msg); + if ((rv = nng_msg_append(msg, data, datalen)) != 0) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + } +} + +void +sendloop(nng_socket sock) +{ + if (data == NULL) { + fatal("No data to send (specify with --data or --file)"); + } + if (delay > 0) { + nng_msleep(delay); + } + + for (;;) { + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + nng_duration delta; + + start = nng_clock(); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, data, datalen)) != 0)) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + end = nng_clock(); + delta = (nng_duration)(end - start); + + // By default, we don't loop. + if (interval < 0) { + break; + } + // We sleep, but we account for time spent, so that our + // interval appears more or less constant. Of course if we + // took more than the interval here, then we skip the sleep + // altogether. + if ((delta >= 0) && (delta < interval)) { + nng_msleep(interval - delta); + } + } +} + +void +sendrecv(nng_socket sock) +{ + nng_duration iv = 0; + if (data == NULL) { + fatal("No data to send (specify with --data or --file)"); + } + if (delay > 0) { + nng_msleep(delay); + } + if (interval > 0) { + iv = interval; + } + + // We start by sending a message, then we receive iteratively + // but we set a concrete timeout if interval is set, to ensure + // that we exit the receive loop, and can continue. + for (;;) { + int rv; + nng_msg * msg; + nng_time start; + nng_time end; + nng_duration delta; + + start = nng_clock(); + if (((rv = nng_msg_alloc(&msg, 0)) != 0) || + ((rv = nng_msg_append(msg, data, datalen)) != 0)) { + fatal(nng_strerror(rv)); + } + if ((rv = nng_sendmsg(sock, msg, 0)) != 0) { + fatal("Send error: %s", nng_strerror(rv)); + } + if (interval < 0) { + // Only one iteration through. + recvloop(sock); + break; + } + + // We would like to use recvloop, but we need to reset our + // timeout each time, as the timer counts down towards zero. + + for (;;) { + delta = (nng_duration)(nng_clock() - start); + + nng_duration expire = interval - delta; + if ((recvtimeo >= 0) && (expire > recvtimeo)) { + expire = recvtimeo; + } + rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, expire); + if (rv != 0) { + fatal("Cannot set recv timeout: %s", + nng_strerror(rv)); + } + + rv = nng_recvmsg(sock, &msg, 0); + switch (rv) { + case 0: + printmsg(nng_msg_body(msg), nng_msg_len(msg)); + nng_msg_free(msg); + continue; + case NNG_ETIMEDOUT: + case NNG_ESTATE: + // We're done receiving + break; + default: + fatal("Cannot receive: %s", nng_strerror(rv)); + break; + } + + // We're done receiving, break out. + break; + } + + end = nng_clock(); + delta = (nng_duration)(end - start); + + // We sleep, but we account for time spent, so that our + // interval appears more or less constant. Of course if we + // took more than the interval here, then we skip the sleep + // altogether. + if ((delta >= 0) && (delta < interval)) { + nng_msleep(interval - delta); + } + } +} + +int +main(int ac, const char **av) +{ + int idx; + const char * arg; + int val; + int rv; + char scratch[512]; + struct addr * addrs = NULL; + struct addr ** addrend; + struct topic * topics = NULL; + struct topic **topicend; + nng_socket sock; + int port; + FILE * f; + + idx = 1; + addrend = &addrs; + topicend = &topics; + + while ((rv = nng_opts_parse(ac, av, opts, &val, &arg, &idx)) == 0) { + switch (val) { + case OPT_HELP: + help(); + break; + case OPT_REQ0: + case OPT_REP0: + case OPT_SUB0: + case OPT_PUB0: + case OPT_BUS0: + case OPT_SURVEY0: + case OPT_RESPOND0: + case OPT_PAIR0: + case OPT_PAIR1: + case OPT_PUSH0: + case OPT_PULL0: + if (proto != 0) { + fatal("Only one protocol may be " + "specified."); + } + proto = val; + break; + case OPT_DIAL: + case OPT_LISTEN: + addrend = addaddr(addrend, val, arg); + break; + case OPT_DIAL_LOCAL: + case OPT_LISTEN_LOCAL: + port = intarg(arg, 65536); + snprintf(scratch, sizeof(scratch), + "tcp://127.0.0.1:%d", port); + addrend = addaddr(addrend, val, scratch); + break; + case OPT_DIAL_IPC: + case OPT_LISTEN_IPC: + snprintf(scratch, sizeof(scratch), "ipc:///%s", arg); + addrend = addaddr(addrend, val, scratch); + break; + case OPT_SUBSCRIBE: + topicend = addtopic(topicend, arg); + break; + case OPT_VERBOSE: + case OPT_SILENT: + verbose = val; + break; + case OPT_DELAY: + delay = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_INTERVAL: + interval = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_SND_TIMEO: + sendtimeo = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_RCV_TIMEO: + recvtimeo = intarg(arg, 86400) * 1000; // max 1 day + break; + case OPT_COMPAT: + compat = 1; + break; + case OPT_ASYNC: + async = NNG_FLAG_NONBLOCK; + break; + case OPT_ASCII: + case OPT_RAW: + case OPT_QUOTED: + case OPT_MSGPACK: + case OPT_HEX: + if (format != 0) { + fatal("Format may be specified only " + "once."); + } + format = val; + break; + case OPT_FORMAT: + if (format != 0) { + fatal("Format may be specified only " + "once."); + } + if (strcmp(arg, "no") == 0) { + format = OPT_BLANK; + } else if (strcmp(arg, "ascii") == 0) { + format = OPT_ASCII; + } else if (strcmp(arg, "hex") == 0) { + format = OPT_HEX; + } else if (strcmp(arg, "quoted") == 0) { + format = OPT_QUOTED; + } else if (strcmp(arg, "raw") == 0) { + format = OPT_RAW; + } else if (strcmp(arg, "msgpack") == 0) { + format = OPT_MSGPACK; + } else { + fatal("Invalid format specified."); + } + break; + case OPT_FILE: + if (data != NULL) { + fatal("Data (--file, --data) may be specified " + "only once."); + } + if ((f = fopen(arg, "r")) == NULL) { + fatal("Cannot open file %s: %s", arg, + strerror(errno)); + } + for (;;) { + size_t n; + // Read until end of file, reallocating as + // needed. + if (datacap == 0) { + data = malloc(4096); + datacap = 4096; + } else if (datacap == datalen) { + void *odata = data; + datacap *= 2; + data = realloc(odata, datacap); + if (data == NULL) { + free(odata); + } + } + if (data == NULL) { + fatal("Out of memory."); + } + n = fread((char *) data + datalen, 1, + datacap - datalen, f); + if (n == 0) { + if (ferror(f)) { + fatal( + "Read file %s failed: %s", + arg, strerror(errno)); + } + break; + } + } + fclose(f); + break; + case OPT_DATA: + if (data != NULL) { + fatal("Data (--file, --data) may be specified " + "only once."); + } + if ((data = malloc(strlen(arg) + 1)) == NULL) { + fatal("Out of memory."); + } + memcpy(data, arg, strlen(arg) + 1); + datacap = strlen(arg) + 1; + datalen = strlen(arg); + break; + } + } + switch (rv) { + case NNG_EINVAL: + fatal("Option %s is invalid.", av[idx]); + break; + case NNG_EAMBIGUOUS: + fatal("Option %s is ambiguous (specify in full).", av[idx]); + break; + case NNG_ENOARG: + fatal("Option %s requires argument.", av[idx]); + break; + } + + if (addrs == NULL) { + fatal("No address specified."); + } + + if (compat) { + if (async != 0) { + fatal("Option --async and --compat are incompatible."); + } + if (proto == OPT_PAIR) { + proto = OPT_PAIR0; + } + if (proto == OPT_PAIR1) { + fatal("Protocol does not support --compat."); + } + async = NNG_FLAG_NONBLOCK; + } else { + if (proto == OPT_PAIR) { + proto = OPT_PAIR1; + } + } + if (proto == OPT_SUB0) { + if (topics == NULL) { + topicend = addtopic(topicend, ""); // subscribe to all + } + } else { + if (topics != NULL) { + fatal("Protocol does not support --subscribe."); + } + } + + switch (proto) { + case OPT_PULL0: + case OPT_SUB0: + if (data != NULL) { + fatal("Protocol does not support --file or " + "--data."); + } + if (interval >= 0) { + fatal("Protocol does not support --interval."); + } + break; + case OPT_PUSH0: + case OPT_SURVEY0: + case OPT_PUB0: + case OPT_REQ0: + if (data == NULL) { + fatal("Protocol requires either --file or " + "--data."); + } + break; + case OPT_REP0: + case OPT_RESPOND0: + if (interval >= 0) { + fatal("Protocol does not support --interval."); + } + break; + case OPT_PAIR0: + case OPT_PAIR1: + case OPT_BUS0: + if ((interval >= 0) && (data == NULL)) { + fatal("Option --interval requires either " + "--file or --data."); + } + break; + } + + switch (proto) { + case OPT_REQ0: + rv = nng_req0_open(&sock); + break; + case OPT_REP0: + rv = nng_rep0_open(&sock); + break; + case OPT_SUB0: + rv = nng_sub0_open(&sock); + break; + case OPT_PUB0: + rv = nng_pub0_open(&sock); + break; + case OPT_PAIR0: + rv = nng_pair0_open(&sock); + break; + case OPT_PAIR1: + rv = nng_pair1_open(&sock); + break; + case OPT_BUS0: + rv = nng_bus0_open(&sock); + break; + case OPT_PUSH0: + rv = nng_push0_open(&sock); + break; + case OPT_PULL0: + rv = nng_pull0_open(&sock); + break; + case OPT_SURVEY0: + rv = nng_surveyor0_open(&sock); + break; + case OPT_RESPOND0: + rv = nng_respondent0_open(&sock); + break; + case 0: + default: + fatal("No protocol specified."); + break; + } + if (rv != 0) { + fatal("Unable to open socket: %s", nng_strerror(rv)); + } + + for (struct topic *t = topics; t != NULL; t = t->next) { + rv = nng_setopt( + sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val)); + if (rv != 0) { + fatal("Unable to subscribe to topic %s: %s", t->val, + nng_strerror(rv)); + } + } + + if ((sendtimeo > 0) && + ((rv = nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) != 0)) { + fatal("Unable to set send timeout: %s", nng_strerror(rv)); + } + if ((recvtimeo > 0) && + ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) != 0)) { + fatal("Unable to set send timeout: %s", nng_strerror(rv)); + } + + // XXX: TBD: This is where we should add other socket options, + // like TLS configuration, timeouts, etc. + + for (struct addr *a = addrs; a != NULL; a = a->next) { + char *act; + switch (a->mode) { + case OPT_DIAL: + case OPT_DIAL_IPC: + case OPT_DIAL_LOCAL: + rv = nng_dial(sock, a->val, NULL, async); + act = "dial"; + break; + case OPT_LISTEN: + case OPT_LISTEN_IPC: + case OPT_LISTEN_LOCAL: + rv = nng_listen(sock, a->val, NULL, async); + act = "listen"; + break; + default: + fatal("Invalid address mode! (Bug!)"); + } + + if (rv != 0) { + fatal("Unable to %s on %s: %s", act, a->val, + nng_strerror(rv)); + } + } + + switch (proto) { + case OPT_SUB0: + case OPT_PULL0: + recvloop(sock); + break; + case OPT_REP0: + case OPT_RESPOND0: + if (data == NULL) { + recvloop(sock); + } else { + resploop(sock); + } + break; + case OPT_PUSH0: + case OPT_PUB0: + sendloop(sock); + break; + case OPT_BUS0: + case OPT_PAIR0: + case OPT_PAIR1: + if (data != NULL) { + sendrecv(sock); + } else { + recvloop(sock); + } + break; + case OPT_REQ0: + case OPT_SURVEY0: + sendrecv(sock); + break; + default: + fatal("Protocol handling unimplmented."); + } + + exit(0); +} |
