aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--perf/CMakeLists.txt2
-rw-r--r--perf/perf.c165
2 files changed, 167 insertions, 0 deletions
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt
index 44c4c607..07c6480b 100644
--- a/perf/CMakeLists.txt
+++ b/perf/CMakeLists.txt
@@ -41,3 +41,5 @@ endif ()
add_nng_perf(remote_lat)
add_nng_perf(local_lat)
+add_nng_perf(local_thr)
+add_nng_perf(remote_thr)
diff --git a/perf/perf.c b/perf/perf.c
index 169a2119..335e3a9d 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -23,8 +23,12 @@
static void latency_client(const char *, int, int);
static void latency_server(const char *, int, int);
+static void throughput_client(const char *, int, int);
+static void throughput_server(const char *, int, int);
static void do_remote_lat(int argc, char **argv);
static void do_local_lat(int argc, char **argv);
+static void do_remote_thr(int argc, char **argv);
+static void do_local_thr(int argc, char **argv);
static void die(const char *, ...);
// perf implements the same performance tests found in the standard
@@ -69,6 +73,13 @@ main(int argc, char **argv)
} else if ((strcmp(prog, "local_lat") == 0) ||
(strcmp(prog, "latency_server") == 0)) {
do_local_lat(argc, argv);
+ } else if ((strcmp(prog, "local_thr") == 0) ||
+ (strcmp(prog, "throughput_server") == 0)) {
+ do_local_thr(argc, argv);
+ } else if ((strcmp(prog, "remote_thr") == 0) ||
+ (strcmp(prog, "throughput_client") == 0)) {
+ do_remote_thr(argc, argv);
+
} else {
die("Unknown program mode? Use -m <mode>.");
}
@@ -138,6 +149,40 @@ do_remote_lat(int argc, char **argv)
void
+do_local_thr(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: local_thr <listen-addr> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_server(argv[0], msgsize, trips);
+}
+
+
+void
+do_remote_thr(int argc, char **argv)
+{
+ long int msgsize;
+ long int trips;
+
+ if (argc != 3) {
+ die("Usage: remote_thr <connect-to> <msg-size> <count>");
+ }
+
+ msgsize = parse_int(argv[1], "message size");
+ trips = parse_int(argv[2], "count");
+
+ throughput_client(argv[0], msgsize, trips);
+}
+
+
+void
latency_client(const char *addr, int msgsize, int trips)
{
nng_socket *s;
@@ -176,6 +221,7 @@ latency_client(const char *addr, int msgsize, int trips)
end = nni_clock();
nni_msg_free(msg);
+ nng_close(s);
total = (end - start) / 1.0;
latency = (total / (trips * 2));
@@ -222,4 +268,123 @@ latency_server(const char *addr, int msgsize, int trips)
// Wait a bit for things to drain... linger should do this.
// 100ms ought to be enough.
nni_usleep(100000);
+ nng_close(s);
+}
+
+
+// Our throughput story is quite a mess. Mostly I think because of the poor
+// caching and message reuse. We should probably implement a message pooling
+// API somewhere.
+
+void
+throughput_server(const char *addr, int msgsize, int count)
+{
+ nng_socket *s;
+ nng_msg **msgs;
+ int rv;
+ int i;
+ size_t len;
+ uint64_t start, end;
+ float msgpersec, mbps, total;
+
+ if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_listen(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) {
+ die("nng_listen: %s", nng_strerror(rv));
+ }
+
+ // Preallocate holding area for messages. Otherwise we are
+ // benchmarking the allocator free(), which is less useful.
+ // Note that recycling messages would help a lot here.
+ if ((msgs = calloc(count, sizeof (nng_msg *))) == NULL) {
+ die("calloc: %s", strerror(errno));
+ }
+
+ // Receive first synchronization message.
+ if ((rv = nng_recvmsg(s, &msgs[0], 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ nni_msg_free(msgs[0]);
+ start = nni_clock();
+
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_recvmsg(s, &msgs[i], 0)) != 0) {
+ die("nng_recvmsg: %s", nng_strerror(rv));
+ }
+ if (i == 0) {
+ start = nni_clock();
+ } else {
+ size_t len;
+ nng_msg_body(msgs[i], &len);
+ if (len != msgsize) {
+ die("wrong message size: %d != %d", len,
+ msgsize);
+ }
+ }
+ }
+ end = nni_clock();
+ nng_close(s);
+ for (i = 0; i < count; i++) {
+ nng_msg_free(msgs[i]);
+ }
+ free(msgs);
+ total = (end - start) / 1.0;
+ msgpersec = count * 1000000 / total;
+ mbps = (count * 8.0 * msgsize);
+ mbps /= total;
+ printf("total time: %.3f [s]\n", total / 1000000.0);
+ printf("message size: %d [B]\n", msgsize);
+ printf("message count: %d\n", count);
+ printf("throughput: %.3f [msg/s]\n", msgpersec);
+ printf("throughput: %.3f [Mb/s]\n", mbps);
+}
+
+
+void
+throughput_client(const char *addr, int msgsize, int count)
+{
+ nng_socket *s;
+ nng_msg **msgs;
+ int rv;
+ int i;
+
+ // We send one extra zero length message to start the timer.
+ count++;
+
+ if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) {
+ die("nng_socket: %s", nng_strerror(rv));
+ }
+
+ // XXX: set no delay
+ // XXX: other options (TLS in the future?, Linger?)
+
+ if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) {
+ die("nng_dial: %s", nng_strerror(rv));
+ }
+
+ // Preallocate the messages. Otherwise we are benchmarking the
+ // allocator, which is less useful.
+ if ((msgs = calloc(count, sizeof (nng_msg *))) == NULL) {
+ die("calloc: %s", strerror(errno));
+ }
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_msg_alloc(&msgs[i], i ? msgsize : 0)) != 0) {
+ die("nng_msg_alloc: %s", nng_strerror(rv));
+ }
+ }
+ for (i = 0; i < count; i++) {
+ if ((rv = nng_sendmsg(s, msgs[i], 0)) != 0) {
+ die("nng_sendmsg: %s", nng_strerror(rv));
+ }
+ }
+
+ // Wait 100msec for pipes to drain.
+ nng_close(s);
+ nni_usleep(100000);
+ free(msgs);
}