aboutsummaryrefslogtreecommitdiff
path: root/perf
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-19 12:08:08 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-19 19:34:57 -0800
commit1a4e60f94dd4febffe3b651b5c800f1b9096f3d9 (patch)
tree79c47f1b53df85eab3ba456066a80b0af38eafcf /perf
parent15f5a7d8cee6416bf4748d15f97ac59c13c2ac75 (diff)
downloadnng-1a4e60f94dd4febffe3b651b5c800f1b9096f3d9.tar.gz
nng-1a4e60f94dd4febffe3b651b5c800f1b9096f3d9.tar.bz2
nng-1a4e60f94dd4febffe3b651b5c800f1b9096f3d9.zip
fixes #1154 perf programs could use options to select different protocols
This allows --reqrep0, --pair0, --pair1, and --bus0 to be used with the inproc_lat, and also inproc_thr (though only with pair options for that for now). It also introduced --url for both programs to support testing over different transports. Also, we no longer pass out the header for REQ reply -- that is an error, and led to some unfortunate failures when reusing the message.
Diffstat (limited to 'perf')
-rw-r--r--perf/perf.c207
1 files changed, 183 insertions, 24 deletions
diff --git a/perf/perf.c b/perf/perf.c
index 810fd151..6df2873c 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -16,26 +16,99 @@
#include <string.h>
#include <nng/nng.h>
+#include <nng/supplemental/util/options.h>
#include <nng/supplemental/util/platform.h>
+static void die(const char *, ...);
+static int
+no_open(nng_socket *arg)
+{
+ (void) arg;
+ die("Protocol not supported in this build!");
+ return (NNG_ENOTSUP);
+}
+
+typedef int (*open_func)(nng_socket *);
+
+static open_func open_server = no_open;
+static open_func open_client = no_open;
+
#if defined(NNG_HAVE_PAIR1)
#include <nng/protocol/pair1/pair.h>
+#else
+#define nng_pair1_open no_open
+#endif
-#elif defined(NNG_HAVE_PAIR0)
+#if defined(NNG_HAVE_PAIR0)
#include <nng/protocol/pair0/pair.h>
+#else
+#define nng_pair0_open no_open
+#endif
+#if defined(NNG_HAVE_REQ0)
+#include <nng/protocol/reqrep0/req.h>
#else
+#define nng_req0_open no_open
+#endif
-static void die(const char *, ...);
+#if defined(NNG_HAVE_REP0)
+#include <nng/protocol/reqrep0/rep.h>
+#else
+#define nng_rep0_open no_open
+#endif
-static int
-nng_pair_open(nng_socket *arg)
-{
- (void) arg;
- die("No pair protocol enabled in this build!");
- return (NNG_ENOTSUP);
-}
-#endif // NNG_ENABLE_PAIR
+#if defined(NNG_HAVE_BUS0)
+#include <nng/protocol/bus0/bus.h>
+#else
+#define nng_bus0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PULL0)
+#include <nng/protocol/pipeline0/pull.h>
+#else
+#define nng_pull0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUSH0)
+#include <nng/protocol/pipeline0/push.h>
+#else
+#define nng_push0_open no_open
+#endif
+
+#if defined(NNG_HAVE_PUB0)
+#include <nng/protocol/pubsub0/pub.h>
+#else
+#define nng_pub0_open no_open
+#endif
+
+#if defined(NNG_HAVE_SUB0)
+#include <nng/protocol/pubsub0/sub.h>
+#else
+#define nng_sub0_open no_open
+#endif
+
+enum options {
+ OPT_PAIR0 = 1,
+ OPT_PAIR1,
+ OPT_REQREP0,
+ OPT_PUBSUB0,
+ OPT_PIPELINE0,
+ OPT_SURVEY0,
+ OPT_BUS0,
+ OPT_URL,
+};
+
+// These are not universally supported by the variants yet.
+static nng_optspec opts[] = {
+ { .o_name = "pair1", .o_val = OPT_PAIR1 },
+ { .o_name = "pair0", .o_val = OPT_PAIR0 },
+ { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
+ { .o_name = "bus0", .o_val = OPT_BUS0 },
+ { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
+ { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
+ { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
+ { .o_name = NULL, .o_val = 0 },
+};
static void latency_client(const char *, size_t, int);
static void latency_server(const char *, size_t, int);
@@ -96,11 +169,23 @@ matches(const char *arg, const char *name)
}
}
+const int PAIR0 = 0;
+const int PAIR1 = 1;
+const int REQREP = 2;
+
int
main(int argc, char **argv)
{
char *prog;
+#if defined(NNG_HAVE_PAIR1)
+ open_server = nng_pair1_open;
+ open_client = nng_pair1_open;
+#elif defined(NNG_HAVE_PAIR0)
+ open_server = nng_pair0_open;
+ open_client = nng_pair0_open;
+#endif
+
// Allow -m <remote_lat> or whatever to override argv[0].
if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
prog = argv[2];
@@ -242,12 +327,49 @@ do_inproc_lat(int argc, char **argv)
nng_thread * thr;
struct inproc_args ia;
int rv;
+ int val;
+ int optidx;
+ char * arg;
+ char * addr;
+
+ addr = "inproc://latency_test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_BUS0:
+ open_client = nng_bus0_open;
+ open_server = nng_bus0_open;
+ break;
+
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
if (argc != 2) {
die("Usage: inproc_lat <msg-size> <count>");
}
- ia.addr = "inproc://latency_test";
+ ia.addr = addr;
ia.msgsize = parse_int(argv[0], "message size");
ia.count = parse_int(argv[1], "count");
ia.func = latency_server;
@@ -259,7 +381,7 @@ do_inproc_lat(int argc, char **argv)
// Sleep a bit.
nng_msleep(100);
- latency_client("inproc://latency_test", ia.msgsize, ia.count);
+ latency_client(addr, ia.msgsize, ia.count);
nng_thread_destroy(thr);
}
@@ -269,12 +391,49 @@ do_inproc_thr(int argc, char **argv)
nng_thread * thr;
struct inproc_args ia;
int rv;
+ int optidx;
+ int val;
+ char * arg;
+ char * addr = "inproc://throughput-test";
+
+ optidx = 0;
+ while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
+ 0) {
+ switch (val) {
+#if 0
+ // For now these protocols simply do not work with
+ // throughput -- they don't work with backpressure properly.
+ // In the future we should support synchronizing in the same
+ // process, and alerting the sender both on completion of
+ // a single message, and on completion of all messages.
+ case OPT_REQREP0:
+ open_client = nng_req0_open;
+ open_server = nng_rep0_open;
+ break;
+#endif
+ case OPT_PAIR0:
+ open_client = nng_pair0_open;
+ open_server = nng_pair0_open;
+ break;
+ case OPT_PAIR1:
+ open_client = nng_pair1_open;
+ open_server = nng_pair1_open;
+ break;
+ case OPT_URL:
+ addr = arg;
+ break;
+ default:
+ die("bad option");
+ }
+ }
+ argc -= optidx;
+ argv += optidx;
if (argc != 2) {
die("Usage: inproc_thr <msg-size> <count>");
}
- ia.addr = "inproc://tput_test";
+ ia.addr = addr;
ia.msgsize = parse_int(argv[0], "message size");
ia.count = parse_int(argv[1], "count");
ia.func = throughput_server;
@@ -286,7 +445,7 @@ do_inproc_thr(int argc, char **argv)
// Sleep a bit.
nng_msleep(100);
- throughput_client("inproc://tput_test", ia.msgsize, ia.count);
+ throughput_client(addr, ia.msgsize, ia.count);
nng_thread_destroy(thr);
}
@@ -300,8 +459,7 @@ latency_client(const char *addr, size_t msgsize, int trips)
int i;
float total;
float latency;
-
- if ((rv = nng_pair_open(&s)) != 0) {
+ if ((rv = open_client(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
@@ -312,6 +470,8 @@ latency_client(const char *addr, size_t msgsize, int trips)
die("nng_dial: %s", nng_strerror(rv));
}
+ nng_msleep(100);
+
if (nng_msg_alloc(&msg, msgsize) != 0) {
die("nng_msg_alloc: %s", nng_strerror(rv));
}
@@ -332,7 +492,7 @@ latency_client(const char *addr, size_t msgsize, int trips)
nng_close(s);
total = (float) ((end - start)) / 1000;
- latency = ((float) ((total * 1000000)) / (float)(trips * 2));
+ latency = ((float) ((total * 1000000)) / (float) (trips * 2));
printf("total time: %.3f [s]\n", total);
printf("message size: %d [B]\n", (int) msgsize);
printf("round trip count: %d\n", trips);
@@ -347,7 +507,7 @@ latency_server(const char *addr, size_t msgsize, int trips)
int rv;
int i;
- if ((rv = nng_pair_open(&s)) != 0) {
+ if ((rv = open_server(&s)) != 0) {
die("nng_socket: %s", nng_strerror(rv));
}
@@ -363,8 +523,7 @@ latency_server(const char *addr, size_t msgsize, int trips)
die("nng_recvmsg: %s", nng_strerror(rv));
}
if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu",
- nng_msg_len(msg),
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
msgsize);
}
if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
@@ -419,8 +578,7 @@ throughput_server(const char *addr, size_t msgsize, int count)
die("nng_recvmsg: %s", nng_strerror(rv));
}
if (nng_msg_len(msg) != msgsize) {
- die("wrong message size: %lu != %lu",
- nng_msg_len(msg),
+ die("wrong message size: %lu != %lu", nng_msg_len(msg),
msgsize);
}
nng_msg_free(msg);
@@ -490,7 +648,8 @@ throughput_client(const char *addr, size_t msgsize, int count)
}
}
- // Attempt to get the completion indication from the other side.
+ // Attempt to get the completion indication from the other
+ // side.
if (nng_recvmsg(s, &msg, 0) == 0) {
nng_msg_free(msg);
}