aboutsummaryrefslogtreecommitdiff
path: root/src/sp/reconnect_stress_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp/reconnect_stress_test.c')
-rw-r--r--src/sp/reconnect_stress_test.c153
1 files changed, 77 insertions, 76 deletions
diff --git a/src/sp/reconnect_stress_test.c b/src/sp/reconnect_stress_test.c
index f463e0e5..578e34bd 100644
--- a/src/sp/reconnect_stress_test.c
+++ b/src/sp/reconnect_stress_test.c
@@ -8,12 +8,12 @@
//
#include <assert.h>
-#include <nng/nng.h>
#include <core/nng_impl.h>
-#include <nng/protocol/survey0/survey.h>
-#include <nng/protocol/survey0/respond.h>
-#include <nng/protocol/reqrep0/req.h>
+#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
#include <nuts.h>
@@ -26,10 +26,10 @@ struct work;
typedef void (*work_fn)(struct work *);
struct work {
- work_fn start;
- nng_socket socket;
- nng_aio * aio;
- enum state state;
+ work_fn start;
+ nng_socket socket;
+ nng_aio *aio;
+ enum state state;
nni_atomic_int received;
};
@@ -40,10 +40,10 @@ fatal(const char *msg, int result)
abort();
}
-#define PASS(cond) \
- do { \
- int result_ = (cond); \
- if (result_ != 0) \
+#define PASS(cond) \
+ do { \
+ int result_ = (cond); \
+ if (result_ != 0) \
fatal(#cond, result_); \
} while (0)
@@ -76,7 +76,7 @@ work_listen(struct work *w, const char *url)
}
void
-work_dial(struct work *w, const char * const * urls, size_t urls_size)
+work_dial(struct work *w, const char *const *urls, size_t urls_size)
{
size_t i;
@@ -87,7 +87,7 @@ work_dial(struct work *w, const char * const * urls, size_t urls_size)
void
close_work(struct work *w)
{
- nng_close(w->socket);
+ nng_socket_close(w->socket);
nng_aio_wait(w->aio);
nng_aio_free(w->aio);
}
@@ -101,39 +101,39 @@ ping_start(struct work *w)
void
ping_cb(void *arg)
{
- nng_msg *msg;
- struct work *w = arg;
- int result = nng_aio_result(w->aio);
+ nng_msg *msg;
+ struct work *w = arg;
+ int result = nng_aio_result(w->aio);
if (result)
switch (result) {
- case NNG_ETIMEDOUT:
- case NNG_ESTATE:
- free_aio_msg(w);
- ping_start(w);
- return;
- case NNG_ECANCELED:
- case NNG_ECLOSED:
- free_aio_msg(w);
- return;
- default:
- fatal("ping_cb", result);
+ case NNG_ETIMEDOUT:
+ case NNG_ESTATE:
+ free_aio_msg(w);
+ ping_start(w);
+ return;
+ case NNG_ECANCELED:
+ case NNG_ECLOSED:
+ free_aio_msg(w);
+ return;
+ default:
+ fatal("ping_cb", result);
}
switch (w->state) {
- case SEND:
- w->state = RECV;
- nng_recv_aio(w->socket, w->aio);
- break;
- case RECV:
- msg = nng_aio_get_msg(w->aio);
- assert(msg != NULL);
- assert(nng_msg_len(msg) == 5);
- assert(0 == strncmp(nng_msg_body(msg), "echo", 4));
- nng_msg_free(msg);
- nni_atomic_inc(&w->received);
- ping_start(w);
- break;
+ case SEND:
+ w->state = RECV;
+ nng_recv_aio(w->socket, w->aio);
+ break;
+ case RECV:
+ msg = nng_aio_get_msg(w->aio);
+ assert(msg != NULL);
+ assert(nng_msg_len(msg) == 5);
+ assert(0 == strncmp(nng_msg_body(msg), "echo", 4));
+ nng_msg_free(msg);
+ nni_atomic_inc(&w->received);
+ ping_start(w);
+ break;
}
}
@@ -147,48 +147,49 @@ echo_start(struct work *w)
void
echo_cb(void *arg)
{
- nng_msg *msg;
- struct work *w = arg;
- int result = nng_aio_result(w->aio);
+ nng_msg *msg;
+ struct work *w = arg;
+ int result = nng_aio_result(w->aio);
if (result)
switch (result) {
- case NNG_ECANCELED:
- case NNG_ECLOSED:
- free_aio_msg(w);
- return;
- default:
- fatal("echo_cb", result);
+ case NNG_ECANCELED:
+ case NNG_ECLOSED:
+ free_aio_msg(w);
+ return;
+ default:
+ fatal("echo_cb", result);
}
switch (w->state) {
- case RECV:
- msg = nng_aio_get_msg(w->aio);
- assert(msg != NULL);
- assert(nng_msg_len(msg) == 5);
- assert(0 == strncmp(nng_msg_body(msg), "ping", 4));
- nng_msg_free(msg);
- nni_atomic_inc(&w->received);
- work_send(w, "echo", 5);
- break;
- case SEND:
- echo_start(w);
- break;
+ case RECV:
+ msg = nng_aio_get_msg(w->aio);
+ assert(msg != NULL);
+ assert(nng_msg_len(msg) == 5);
+ assert(0 == strncmp(nng_msg_body(msg), "ping", 4));
+ nng_msg_free(msg);
+ nni_atomic_inc(&w->received);
+ work_send(w, "echo", 5);
+ break;
+ case SEND:
+ echo_start(w);
+ break;
}
}
-#define CLIENTS_COUNT 64
-#define SERVICES_COUNT 8
-#define CLIENT_RX_COUNT 100
-#define TEST_DURATION_MS 3000
-#define SURVEY_TIMEOUT_MS 100
+#define CLIENTS_COUNT 64
+#define SERVICES_COUNT 8
+#define CLIENT_RX_COUNT 100
+#define TEST_DURATION_MS 3000
+#define SURVEY_TIMEOUT_MS 100
void
surveyor_open(struct work *w)
{
w->start = ping_start;
NUTS_PASS(nng_surveyor_open(&w->socket));
- NUTS_PASS(nng_socket_set_ms(w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS));
+ NUTS_PASS(nng_socket_set_ms(
+ w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS));
NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w));
nni_atomic_init(&w->received);
}
@@ -223,14 +224,14 @@ rep_open(struct work *w)
void
run_test(work_fn open_service, work_fn open_client)
{
- int i;
- nng_time stop_time;
- struct work * service;
- struct work * client;
- struct work services[SERVICES_COUNT];
- struct work clients [CLIENTS_COUNT];
-
- const char * service_urls[SERVICES_COUNT] = {
+ int i;
+ nng_time stop_time;
+ struct work *service;
+ struct work *client;
+ struct work services[SERVICES_COUNT];
+ struct work clients[CLIENTS_COUNT];
+
+ const char *service_urls[SERVICES_COUNT] = {
"inproc://stressA",
"inproc://stressB",
"inproc://stressC",