From 1a4e60f94dd4febffe3b651b5c800f1b9096f3d9 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 19 Jan 2020 12:08:08 -0800 Subject: fixes #1154 perf programs could use options to select different protocols This allows --reqrep0, --pair0, --pair1, and --bus0 to be used with the inproc_lat, and also inproc_thr (though only with pair options for that for now). It also introduced --url for both programs to support testing over different transports. Also, we no longer pass out the header for REQ reply -- that is an error, and led to some unfortunate failures when reusing the message. --- perf/perf.c | 207 +++++++++++++++++++++++++++++++++++++++------ src/protocol/reqrep0/req.c | 1 - 2 files changed, 183 insertions(+), 25 deletions(-) diff --git a/perf/perf.c b/perf/perf.c index 810fd151..6df2873c 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -16,26 +16,99 @@ #include #include +#include #include +static void die(const char *, ...); +static int +no_open(nng_socket *arg) +{ + (void) arg; + die("Protocol not supported in this build!"); + return (NNG_ENOTSUP); +} + +typedef int (*open_func)(nng_socket *); + +static open_func open_server = no_open; +static open_func open_client = no_open; + #if defined(NNG_HAVE_PAIR1) #include +#else +#define nng_pair1_open no_open +#endif -#elif defined(NNG_HAVE_PAIR0) +#if defined(NNG_HAVE_PAIR0) #include +#else +#define nng_pair0_open no_open +#endif +#if defined(NNG_HAVE_REQ0) +#include #else +#define nng_req0_open no_open +#endif -static void die(const char *, ...); +#if defined(NNG_HAVE_REP0) +#include +#else +#define nng_rep0_open no_open +#endif -static int -nng_pair_open(nng_socket *arg) -{ - (void) arg; - die("No pair protocol enabled in this build!"); - return (NNG_ENOTSUP); -} -#endif // NNG_ENABLE_PAIR +#if defined(NNG_HAVE_BUS0) +#include +#else +#define nng_bus0_open no_open +#endif + +#if defined(NNG_HAVE_PULL0) +#include +#else +#define nng_pull0_open no_open +#endif + +#if defined(NNG_HAVE_PUSH0) +#include +#else +#define nng_push0_open no_open +#endif + +#if defined(NNG_HAVE_PUB0) +#include +#else +#define nng_pub0_open no_open +#endif + +#if defined(NNG_HAVE_SUB0) +#include +#else +#define nng_sub0_open no_open +#endif + +enum options { + OPT_PAIR0 = 1, + OPT_PAIR1, + OPT_REQREP0, + OPT_PUBSUB0, + OPT_PIPELINE0, + OPT_SURVEY0, + OPT_BUS0, + OPT_URL, +}; + +// These are not universally supported by the variants yet. +static nng_optspec opts[] = { + { .o_name = "pair1", .o_val = OPT_PAIR1 }, + { .o_name = "pair0", .o_val = OPT_PAIR0 }, + { .o_name = "reqrep0", .o_val = OPT_REQREP0 }, + { .o_name = "bus0", .o_val = OPT_BUS0 }, + { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 }, + { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 }, + { .o_name = "url", .o_val = OPT_URL, .o_arg = true }, + { .o_name = NULL, .o_val = 0 }, +}; static void latency_client(const char *, size_t, int); static void latency_server(const char *, size_t, int); @@ -96,11 +169,23 @@ matches(const char *arg, const char *name) } } +const int PAIR0 = 0; +const int PAIR1 = 1; +const int REQREP = 2; + int main(int argc, char **argv) { char *prog; +#if defined(NNG_HAVE_PAIR1) + open_server = nng_pair1_open; + open_client = nng_pair1_open; +#elif defined(NNG_HAVE_PAIR0) + open_server = nng_pair0_open; + open_client = nng_pair0_open; +#endif + // Allow -m or whatever to override argv[0]. if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) { prog = argv[2]; @@ -242,12 +327,49 @@ do_inproc_lat(int argc, char **argv) nng_thread * thr; struct inproc_args ia; int rv; + int val; + int optidx; + char * arg; + char * addr; + + addr = "inproc://latency_test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_BUS0: + open_client = nng_bus0_open; + open_server = nng_bus0_open; + break; + + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; if (argc != 2) { die("Usage: inproc_lat "); } - ia.addr = "inproc://latency_test"; + ia.addr = addr; ia.msgsize = parse_int(argv[0], "message size"); ia.count = parse_int(argv[1], "count"); ia.func = latency_server; @@ -259,7 +381,7 @@ do_inproc_lat(int argc, char **argv) // Sleep a bit. nng_msleep(100); - latency_client("inproc://latency_test", ia.msgsize, ia.count); + latency_client(addr, ia.msgsize, ia.count); nng_thread_destroy(thr); } @@ -269,12 +391,49 @@ do_inproc_thr(int argc, char **argv) nng_thread * thr; struct inproc_args ia; int rv; + int optidx; + int val; + char * arg; + char * addr = "inproc://throughput-test"; + + optidx = 0; + while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) == + 0) { + switch (val) { +#if 0 + // For now these protocols simply do not work with + // throughput -- they don't work with backpressure properly. + // In the future we should support synchronizing in the same + // process, and alerting the sender both on completion of + // a single message, and on completion of all messages. + case OPT_REQREP0: + open_client = nng_req0_open; + open_server = nng_rep0_open; + break; +#endif + case OPT_PAIR0: + open_client = nng_pair0_open; + open_server = nng_pair0_open; + break; + case OPT_PAIR1: + open_client = nng_pair1_open; + open_server = nng_pair1_open; + break; + case OPT_URL: + addr = arg; + break; + default: + die("bad option"); + } + } + argc -= optidx; + argv += optidx; if (argc != 2) { die("Usage: inproc_thr "); } - ia.addr = "inproc://tput_test"; + ia.addr = addr; ia.msgsize = parse_int(argv[0], "message size"); ia.count = parse_int(argv[1], "count"); ia.func = throughput_server; @@ -286,7 +445,7 @@ do_inproc_thr(int argc, char **argv) // Sleep a bit. nng_msleep(100); - throughput_client("inproc://tput_test", ia.msgsize, ia.count); + throughput_client(addr, ia.msgsize, ia.count); nng_thread_destroy(thr); } @@ -300,8 +459,7 @@ latency_client(const char *addr, size_t msgsize, int trips) int i; float total; float latency; - - if ((rv = nng_pair_open(&s)) != 0) { + if ((rv = open_client(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } @@ -312,6 +470,8 @@ latency_client(const char *addr, size_t msgsize, int trips) die("nng_dial: %s", nng_strerror(rv)); } + nng_msleep(100); + if (nng_msg_alloc(&msg, msgsize) != 0) { die("nng_msg_alloc: %s", nng_strerror(rv)); } @@ -332,7 +492,7 @@ latency_client(const char *addr, size_t msgsize, int trips) nng_close(s); total = (float) ((end - start)) / 1000; - latency = ((float) ((total * 1000000)) / (float)(trips * 2)); + latency = ((float) ((total * 1000000)) / (float) (trips * 2)); printf("total time: %.3f [s]\n", total); printf("message size: %d [B]\n", (int) msgsize); printf("round trip count: %d\n", trips); @@ -347,7 +507,7 @@ latency_server(const char *addr, size_t msgsize, int trips) int rv; int i; - if ((rv = nng_pair_open(&s)) != 0) { + if ((rv = open_server(&s)) != 0) { die("nng_socket: %s", nng_strerror(rv)); } @@ -363,8 +523,7 @@ latency_server(const char *addr, size_t msgsize, int trips) die("nng_recvmsg: %s", nng_strerror(rv)); } if (nng_msg_len(msg) != msgsize) { - die("wrong message size: %lu != %lu", - nng_msg_len(msg), + die("wrong message size: %lu != %lu", nng_msg_len(msg), msgsize); } if ((rv = nng_sendmsg(s, msg, 0)) != 0) { @@ -419,8 +578,7 @@ throughput_server(const char *addr, size_t msgsize, int count) die("nng_recvmsg: %s", nng_strerror(rv)); } if (nng_msg_len(msg) != msgsize) { - die("wrong message size: %lu != %lu", - nng_msg_len(msg), + die("wrong message size: %lu != %lu", nng_msg_len(msg), msgsize); } nng_msg_free(msg); @@ -490,7 +648,8 @@ throughput_client(const char *addr, size_t msgsize, int count) } } - // Attempt to get the completion indication from the other side. + // Attempt to get the completion indication from the other + // side. if (nng_recvmsg(s, &msg, 0) == 0) { nng_msg_free(msg); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index f5d9bbba..1de93929 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -307,7 +307,6 @@ req0_recv_cb(void *arg) goto malformed; } id = nni_msg_trim_u32(msg); - nni_msg_header_must_append_u32(msg, id); // Schedule another receive while we are processing this. nni_mtx_lock(&s->mtx); -- cgit v1.2.3-70-g09d2