From 49708b26aa2e836a882343b72c288eae997e23d3 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 5 Jan 2017 14:30:05 -0800 Subject: Implement throughput performance tests. The throughput performance tests "try" to avoid hitting the allocator, but I think this actually causes other cache related performance, and the receive thread still has to perform a message allocation, leading to really rotten performance. Its probably time to think about a message pool. --- perf/CMakeLists.txt | 2 + perf/perf.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) (limited to 'perf') diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt index 44c4c607..07c6480b 100644 --- a/perf/CMakeLists.txt +++ b/perf/CMakeLists.txt @@ -41,3 +41,5 @@ endif () add_nng_perf(remote_lat) add_nng_perf(local_lat) +add_nng_perf(local_thr) +add_nng_perf(remote_thr) diff --git a/perf/perf.c b/perf/perf.c index 169a2119..335e3a9d 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -23,8 +23,12 @@ static void latency_client(const char *, int, int); static void latency_server(const char *, int, int); +static void throughput_client(const char *, int, int); +static void throughput_server(const char *, int, int); static void do_remote_lat(int argc, char **argv); static void do_local_lat(int argc, char **argv); +static void do_remote_thr(int argc, char **argv); +static void do_local_thr(int argc, char **argv); static void die(const char *, ...); // perf implements the same performance tests found in the standard @@ -69,6 +73,13 @@ main(int argc, char **argv) } else if ((strcmp(prog, "local_lat") == 0) || (strcmp(prog, "latency_server") == 0)) { do_local_lat(argc, argv); + } else if ((strcmp(prog, "local_thr") == 0) || + (strcmp(prog, "throughput_server") == 0)) { + do_local_thr(argc, argv); + } else if ((strcmp(prog, "remote_thr") == 0) || + (strcmp(prog, "throughput_client") == 0)) { + do_remote_thr(argc, argv); + } else { die("Unknown program mode? Use -m ."); } @@ -137,6 +148,40 @@ do_remote_lat(int argc, char **argv) } +void +do_local_thr(int argc, char **argv) +{ + long int msgsize; + long int trips; + + if (argc != 3) { + die("Usage: local_thr "); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_server(argv[0], msgsize, trips); +} + + +void +do_remote_thr(int argc, char **argv) +{ + long int msgsize; + long int trips; + + if (argc != 3) { + die("Usage: remote_thr "); + } + + msgsize = parse_int(argv[1], "message size"); + trips = parse_int(argv[2], "count"); + + throughput_client(argv[0], msgsize, trips); +} + + void latency_client(const char *addr, int msgsize, int trips) { @@ -176,6 +221,7 @@ latency_client(const char *addr, int msgsize, int trips) end = nni_clock(); nni_msg_free(msg); + nng_close(s); total = (end - start) / 1.0; latency = (total / (trips * 2)); @@ -222,4 +268,123 @@ latency_server(const char *addr, int msgsize, int trips) // Wait a bit for things to drain... linger should do this. // 100ms ought to be enough. nni_usleep(100000); + nng_close(s); +} + + +// Our throughput story is quite a mess. Mostly I think because of the poor +// caching and message reuse. We should probably implement a message pooling +// API somewhere. + +void +throughput_server(const char *addr, int msgsize, int count) +{ + nng_socket *s; + nng_msg **msgs; + int rv; + int i; + size_t len; + uint64_t start, end; + float msgpersec, mbps, total; + + if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_listen(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) { + die("nng_listen: %s", nng_strerror(rv)); + } + + // Preallocate holding area for messages. Otherwise we are + // benchmarking the allocator free(), which is less useful. + // Note that recycling messages would help a lot here. + if ((msgs = calloc(count, sizeof (nng_msg *))) == NULL) { + die("calloc: %s", strerror(errno)); + } + + // Receive first synchronization message. + if ((rv = nng_recvmsg(s, &msgs[0], 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + nni_msg_free(msgs[0]); + start = nni_clock(); + + for (i = 0; i < count; i++) { + if ((rv = nng_recvmsg(s, &msgs[i], 0)) != 0) { + die("nng_recvmsg: %s", nng_strerror(rv)); + } + if (i == 0) { + start = nni_clock(); + } else { + size_t len; + nng_msg_body(msgs[i], &len); + if (len != msgsize) { + die("wrong message size: %d != %d", len, + msgsize); + } + } + } + end = nni_clock(); + nng_close(s); + for (i = 0; i < count; i++) { + nng_msg_free(msgs[i]); + } + free(msgs); + total = (end - start) / 1.0; + msgpersec = count * 1000000 / total; + mbps = (count * 8.0 * msgsize); + mbps /= total; + printf("total time: %.3f [s]\n", total / 1000000.0); + printf("message size: %d [B]\n", msgsize); + printf("message count: %d\n", count); + printf("throughput: %.3f [msg/s]\n", msgpersec); + printf("throughput: %.3f [Mb/s]\n", mbps); +} + + +void +throughput_client(const char *addr, int msgsize, int count) +{ + nng_socket *s; + nng_msg **msgs; + int rv; + int i; + + // We send one extra zero length message to start the timer. + count++; + + if ((rv = nng_open(&s, NNG_PROTO_PAIR)) != 0) { + die("nng_socket: %s", nng_strerror(rv)); + } + + // XXX: set no delay + // XXX: other options (TLS in the future?, Linger?) + + if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) { + die("nng_dial: %s", nng_strerror(rv)); + } + + // Preallocate the messages. Otherwise we are benchmarking the + // allocator, which is less useful. + if ((msgs = calloc(count, sizeof (nng_msg *))) == NULL) { + die("calloc: %s", strerror(errno)); + } + for (i = 0; i < count; i++) { + if ((rv = nng_msg_alloc(&msgs[i], i ? msgsize : 0)) != 0) { + die("nng_msg_alloc: %s", nng_strerror(rv)); + } + } + for (i = 0; i < count; i++) { + if ((rv = nng_sendmsg(s, msgs[i], 0)) != 0) { + die("nng_sendmsg: %s", nng_strerror(rv)); + } + } + + // Wait 100msec for pipes to drain. + nng_close(s); + nni_usleep(100000); + free(msgs); } -- cgit v1.2.3-70-g09d2