aboutsummaryrefslogtreecommitdiff
path: root/tools/nngcat/nngcat.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-07 14:58:07 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-07 17:39:03 -0700
commite89202c83138bbc6bad1d5c5dcf55e00c0ee1800 (patch)
tree58f3e2bc5e824c34a6954dc6276bd77725794b69 /tools/nngcat/nngcat.c
parentd066d6d4307371f3bea1134a694dba18c381f564 (diff)
downloadnng-e89202c83138bbc6bad1d5c5dcf55e00c0ee1800.tar.gz
nng-e89202c83138bbc6bad1d5c5dcf55e00c0ee1800.tar.bz2
nng-e89202c83138bbc6bad1d5c5dcf55e00c0ee1800.zip
fixes #413 desire --count option for nngcat
fixes #249 nngcat needs test cases fixes #416 transports do not permit unlimited message size with 0 fixes #417 nngcat truncates input files to 4k fixes #348 nngcat should have switch to adjust maximum receive size
Diffstat (limited to 'tools/nngcat/nngcat.c')
-rw-r--r--tools/nngcat/nngcat.c119
1 files changed, 89 insertions, 30 deletions
diff --git a/tools/nngcat/nngcat.c b/tools/nngcat/nngcat.c
index c8a73859..132599e4 100644
--- a/tools/nngcat/nngcat.c
+++ b/tools/nngcat/nngcat.c
@@ -53,6 +53,8 @@ size_t keylen = 0;
void * certfile = NULL;
size_t certlen = 0;
const char * zthome = NULL;
+int count = 0;
+int recvmaxsz = -1;
// Options, must start at 1 because zero is sentinel.
enum options {
@@ -78,6 +80,7 @@ enum options {
OPT_SUBSCRIBE,
OPT_INTERVAL,
OPT_DELAY,
+ OPT_COUNT,
OPT_FORMAT,
OPT_RAW,
OPT_ASCII,
@@ -99,6 +102,7 @@ enum options {
OPT_KEYFILE,
OPT_CERTFILE,
OPT_VERSION,
+ OPT_RECVMAXSZ,
OPT_ZTHOME,
};
@@ -132,6 +136,18 @@ static nng_optspec opts[] = {
{ .o_name = "compat", .o_val = OPT_COMPAT },
{ .o_name = "async", .o_val = OPT_ASYNC },
{
+ .o_name = "recv-maxsz",
+ .o_short = 'Z',
+ .o_val = OPT_RECVMAXSZ,
+ .o_arg = true,
+ },
+ {
+ .o_name = "count",
+ .o_short = 'C',
+ .o_val = OPT_COUNT,
+ .o_arg = true,
+ },
+ {
.o_name = "delay",
.o_short = 'd',
.o_val = OPT_DELAY,
@@ -238,8 +254,18 @@ help(void)
printf(" --subscribe <topic> (only with --sub protocol)\n");
printf(" --silent (or alias -q)\n");
printf(" --verbose (or alias -v)\n");
+ printf(" --count <num> (or alias -C <num>)\n");
+ printf(" --delay <secs> (or alias -d <secs>)\n");
+ printf(" --interval <secs> (or alias -i <secs>)\n");
+ printf(" --recv-timeout <secs>\n");
+ printf(" --send-timeout <secs>\n");
+ printf(" --recv-maxsz <size> (or alias -Z <size>)\n");
printf(" --compat\n");
printf(" --async\n");
+ printf(" --insecure (or alias -k)\n");
+ printf(" --cacert <file>\n");
+ printf(" --cert <file> (or alias -E <file>)\n");
+ printf(" --key <file>\n");
printf(" --zt-home <path>\n");
printf("\n<src> may be one of:\n");
printf(" --file <file> (or alias -F <file>)\n");
@@ -279,38 +305,24 @@ static void
loadfile(const char *path, void **datap, size_t *lenp)
{
FILE * f;
- size_t cap;
char * data;
size_t len;
+
if ((f = fopen(path, "r")) == NULL) {
fatal("Cannot open file %s: %s", path, strerror(errno));
}
- cap = 4096;
- len = 0;
- if ((data = malloc(cap)) == NULL) {
+ if (fseek(f, 0, SEEK_END) != 0) {
+ fatal("Cannot seek to end of file: %s", strerror(errno));
+ }
+ len = ftell(f);
+ (void) fseek(f, 0, SEEK_SET);
+ if ((data = malloc(len + 1)) == NULL) {
fatal("Out of memory.");
}
- data[0] = '\0';
- for (;;) {
- size_t n;
- // Read until end of file, reallocating as needed.
- if (len == (cap - 1)) {
- void *old = data;
- cap *= 2;
- if ((data = realloc(old, cap)) == NULL) {
- fatal("Out of memory");
- }
- }
- n = fread(data + len, 1, cap - len, f);
- if (n == 0) {
- if (ferror(f)) {
- fatal("Read file %s failed: %s", path,
- strerror(errno));
- }
- break;
- }
- len += n;
- data[len] = '\0';
+ data[len] = '\0';
+
+ if (fread(data, 1, len, f) != len) {
+ fatal("Read file %s failed: %s", path, strerror(errno));
}
fclose(f);
*datap = data;
@@ -462,11 +474,16 @@ printmsg(char *buf, size_t len)
void
recvloop(nng_socket sock)
{
+ int iters = 0;
for (;;) {
int rv;
nng_msg *msg;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
rv = nng_recvmsg(sock, &msg, 0);
+ iters++;
switch (rv) {
case NNG_ETIMEDOUT:
case NNG_ESTATE:
@@ -486,6 +503,7 @@ recvloop(nng_socket sock)
void
resploop(nng_socket sock)
{
+ int iters = 0;
for (;;) {
int rv;
nng_msg *msg;
@@ -502,12 +520,21 @@ resploop(nng_socket sock)
if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
fatal("Send error: %s", nng_strerror(rv));
}
+
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
}
+
+ nng_msleep(200);
}
void
sendloop(nng_socket sock)
{
+ int iters = 0;
+
if (data == NULL) {
fatal("No data to send (specify with --data or --file)");
}
@@ -533,10 +560,13 @@ sendloop(nng_socket sock)
end = nng_clock();
delta = (nng_duration)(end - start);
+ iters++;
// By default, we don't loop.
- if (interval < 0) {
+ if (((interval < 0) && (count == 0)) ||
+ ((count > 0) && (iters >= count))) {
break;
}
+
// We sleep, but we account for time spent, so that our
// interval appears more or less constant. Of course
// if we took more than the interval here, then we skip
@@ -545,11 +575,15 @@ sendloop(nng_socket sock)
nng_msleep(interval - delta);
}
}
+ // Wait a bit to give queues a chance to drain.
+ nng_msleep(200);
}
void
sendrecv(nng_socket sock)
{
+ int iters = 0;
+
if (data == NULL) {
fatal("No data to send (specify with --data or --file)");
}
@@ -575,7 +609,7 @@ sendrecv(nng_socket sock)
if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
fatal("Send error: %s", nng_strerror(rv));
}
- if (interval < 0) {
+ if ((interval < 0) && (count == 0)) {
// Only one iteration through.
recvloop(sock);
break;
@@ -583,7 +617,8 @@ sendrecv(nng_socket sock)
// We would like to use recvloop, but we need to reset
// our timeout each time, as the timer counts down
- // towards zero.
+ // towards zero. Furthermore, with survey, we don't
+ // want to increment the iteration count.
for (;;) {
delta = (nng_duration)(nng_clock() - start);
@@ -620,6 +655,11 @@ sendrecv(nng_socket sock)
end = nng_clock();
delta = (nng_duration)(end - start);
+ iters++;
+ if ((count > 0) && (iters >= count)) {
+ break;
+ }
+
// We sleep, but we account for time spent, so that our
// interval appears more or less constant. Of course
// if we took more than the interval here, then we skip
@@ -687,6 +727,9 @@ main(int ac, const char **av)
snprintf(scratch, sizeof(scratch), "ipc:///%s", arg);
addrend = addaddr(addrend, val, scratch);
break;
+ case OPT_COUNT:
+ count = intarg(arg, 0x7fffffff);
+ break;
case OPT_SUBSCRIBE:
topicend = addtopic(topicend, arg);
break;
@@ -706,6 +749,12 @@ main(int ac, const char **av)
case OPT_RCV_TIMEO:
recvtimeo = intarg(arg, 86400) * 1000; // max 1 day
break;
+ case OPT_RECVMAXSZ:
+ recvmaxsz = intarg(arg, 0x7fffffff);
+ if (recvmaxsz == 0) {
+ recvmaxsz = 0x7fffffff;
+ }
+ break;
case OPT_COMPAT:
compat = 1;
break;
@@ -815,7 +864,11 @@ main(int ac, const char **av)
if (compat) {
if (async != 0) {
- fatal("Option --async and --compat are "
+ fatal("Options --async and --compat are "
+ "incompatible.");
+ }
+ if (count != 0) {
+ fatal("Options --count and --compat are "
"incompatible.");
}
if (proto == OPT_PAIR) {
@@ -990,6 +1043,12 @@ main(int ac, const char **av)
fatal("Unable to set send timeout: %s", nng_strerror(rv));
}
+ if ((recvmaxsz >= 0) &&
+ ((rv = nng_setopt_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) !=
+ 0)) {
+ fatal("Unable to set max receive size: %s", nng_strerror(rv));
+ }
+
for (struct addr *a = addrs; a != NULL; a = a->next) {
char * act;
nng_listener l;
@@ -1109,7 +1168,7 @@ main(int ac, const char **av)
sendrecv(sock);
break;
default:
- fatal("Protocol handling unimplmented.");
+ fatal("Protocol handling unimplemented.");
}
exit(0);