diff options
Diffstat (limited to 'src/sp/reconnect_stress_test.c')
| -rw-r--r-- | src/sp/reconnect_stress_test.c | 153 |
1 files changed, 77 insertions, 76 deletions
diff --git a/src/sp/reconnect_stress_test.c b/src/sp/reconnect_stress_test.c index f463e0e5..578e34bd 100644 --- a/src/sp/reconnect_stress_test.c +++ b/src/sp/reconnect_stress_test.c @@ -8,12 +8,12 @@ // #include <assert.h> -#include <nng/nng.h> #include <core/nng_impl.h> -#include <nng/protocol/survey0/survey.h> -#include <nng/protocol/survey0/respond.h> -#include <nng/protocol/reqrep0/req.h> +#include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> +#include <nng/protocol/reqrep0/req.h> +#include <nng/protocol/survey0/respond.h> +#include <nng/protocol/survey0/survey.h> #include <nuts.h> @@ -26,10 +26,10 @@ struct work; typedef void (*work_fn)(struct work *); struct work { - work_fn start; - nng_socket socket; - nng_aio * aio; - enum state state; + work_fn start; + nng_socket socket; + nng_aio *aio; + enum state state; nni_atomic_int received; }; @@ -40,10 +40,10 @@ fatal(const char *msg, int result) abort(); } -#define PASS(cond) \ - do { \ - int result_ = (cond); \ - if (result_ != 0) \ +#define PASS(cond) \ + do { \ + int result_ = (cond); \ + if (result_ != 0) \ fatal(#cond, result_); \ } while (0) @@ -76,7 +76,7 @@ work_listen(struct work *w, const char *url) } void -work_dial(struct work *w, const char * const * urls, size_t urls_size) +work_dial(struct work *w, const char *const *urls, size_t urls_size) { size_t i; @@ -87,7 +87,7 @@ work_dial(struct work *w, const char * const * urls, size_t urls_size) void close_work(struct work *w) { - nng_close(w->socket); + nng_socket_close(w->socket); nng_aio_wait(w->aio); nng_aio_free(w->aio); } @@ -101,39 +101,39 @@ ping_start(struct work *w) void ping_cb(void *arg) { - nng_msg *msg; - struct work *w = arg; - int result = nng_aio_result(w->aio); + nng_msg *msg; + struct work *w = arg; + int result = nng_aio_result(w->aio); if (result) switch (result) { - case NNG_ETIMEDOUT: - case NNG_ESTATE: - free_aio_msg(w); - ping_start(w); - return; - case NNG_ECANCELED: - case NNG_ECLOSED: - free_aio_msg(w); - return; - default: - fatal("ping_cb", result); + case NNG_ETIMEDOUT: + case NNG_ESTATE: + free_aio_msg(w); + ping_start(w); + return; + case NNG_ECANCELED: + case NNG_ECLOSED: + free_aio_msg(w); + return; + default: + fatal("ping_cb", result); } switch (w->state) { - case SEND: - w->state = RECV; - nng_recv_aio(w->socket, w->aio); - break; - case RECV: - msg = nng_aio_get_msg(w->aio); - assert(msg != NULL); - assert(nng_msg_len(msg) == 5); - assert(0 == strncmp(nng_msg_body(msg), "echo", 4)); - nng_msg_free(msg); - nni_atomic_inc(&w->received); - ping_start(w); - break; + case SEND: + w->state = RECV; + nng_recv_aio(w->socket, w->aio); + break; + case RECV: + msg = nng_aio_get_msg(w->aio); + assert(msg != NULL); + assert(nng_msg_len(msg) == 5); + assert(0 == strncmp(nng_msg_body(msg), "echo", 4)); + nng_msg_free(msg); + nni_atomic_inc(&w->received); + ping_start(w); + break; } } @@ -147,48 +147,49 @@ echo_start(struct work *w) void echo_cb(void *arg) { - nng_msg *msg; - struct work *w = arg; - int result = nng_aio_result(w->aio); + nng_msg *msg; + struct work *w = arg; + int result = nng_aio_result(w->aio); if (result) switch (result) { - case NNG_ECANCELED: - case NNG_ECLOSED: - free_aio_msg(w); - return; - default: - fatal("echo_cb", result); + case NNG_ECANCELED: + case NNG_ECLOSED: + free_aio_msg(w); + return; + default: + fatal("echo_cb", result); } switch (w->state) { - case RECV: - msg = nng_aio_get_msg(w->aio); - assert(msg != NULL); - assert(nng_msg_len(msg) == 5); - assert(0 == strncmp(nng_msg_body(msg), "ping", 4)); - nng_msg_free(msg); - nni_atomic_inc(&w->received); - work_send(w, "echo", 5); - break; - case SEND: - echo_start(w); - break; + case RECV: + msg = nng_aio_get_msg(w->aio); + assert(msg != NULL); + assert(nng_msg_len(msg) == 5); + assert(0 == strncmp(nng_msg_body(msg), "ping", 4)); + nng_msg_free(msg); + nni_atomic_inc(&w->received); + work_send(w, "echo", 5); + break; + case SEND: + echo_start(w); + break; } } -#define CLIENTS_COUNT 64 -#define SERVICES_COUNT 8 -#define CLIENT_RX_COUNT 100 -#define TEST_DURATION_MS 3000 -#define SURVEY_TIMEOUT_MS 100 +#define CLIENTS_COUNT 64 +#define SERVICES_COUNT 8 +#define CLIENT_RX_COUNT 100 +#define TEST_DURATION_MS 3000 +#define SURVEY_TIMEOUT_MS 100 void surveyor_open(struct work *w) { w->start = ping_start; NUTS_PASS(nng_surveyor_open(&w->socket)); - NUTS_PASS(nng_socket_set_ms(w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS)); + NUTS_PASS(nng_socket_set_ms( + w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS)); NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w)); nni_atomic_init(&w->received); } @@ -223,14 +224,14 @@ rep_open(struct work *w) void run_test(work_fn open_service, work_fn open_client) { - int i; - nng_time stop_time; - struct work * service; - struct work * client; - struct work services[SERVICES_COUNT]; - struct work clients [CLIENTS_COUNT]; - - const char * service_urls[SERVICES_COUNT] = { + int i; + nng_time stop_time; + struct work *service; + struct work *client; + struct work services[SERVICES_COUNT]; + struct work clients[CLIENTS_COUNT]; + + const char *service_urls[SERVICES_COUNT] = { "inproc://stressA", "inproc://stressB", "inproc://stressC", |
