diff options
Diffstat (limited to 'tests/surveyctx.c')
| -rw-r--r-- | tests/surveyctx.c | 298 |
1 files changed, 298 insertions, 0 deletions
diff --git a/tests/surveyctx.c b/tests/surveyctx.c new file mode 100644 index 00000000..9ab2de40 --- /dev/null +++ b/tests/surveyctx.c @@ -0,0 +1,298 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +#include <string.h> + +static struct { + nng_aio *aio; + enum { START, SEND, RECV } state; + nng_socket s; + nng_msg * msg; + int cnt; +} resp_state; + +void +resp_cb(void) +{ + int rv; + + if (resp_state.state == START) { + resp_state.state = RECV; + nng_recv_aio(resp_state.s, resp_state.aio); + return; + } + if ((rv = nng_aio_result(resp_state.aio)) != 0) { + if (resp_state.msg != NULL) { + nng_msg_free(resp_state.msg); + resp_state.msg = NULL; + } + return; + } + switch (resp_state.state) { + case START: + break; + case RECV: + resp_state.msg = nng_aio_get_msg(resp_state.aio); + resp_state.state = SEND; + nng_aio_set_msg(resp_state.aio, resp_state.msg); + nng_send_aio(resp_state.s, resp_state.aio); + break; + case SEND: + resp_state.msg = NULL; + resp_state.state = RECV; + nng_aio_set_msg(resp_state.aio, NULL); + nng_recv_aio(resp_state.s, resp_state.aio); + resp_state.cnt++; + break; + } +} + +#define NCTX 10 + +void +markr(void *arg) +{ + *(bool *) arg = true; +} + +static void +marks(void *arg) +{ + *(bool *) arg = true; +} + +nng_ctx ctxs[NCTX]; +uint32_t recv_order[NCTX]; +nng_aio *saios[NCTX]; +nng_aio *raios[NCTX]; +bool recd[NCTX]; +bool sent[NCTX]; + +TestMain("Surveyor concurrent contexts", { + int rv; + const char *addr = "inproc://test"; + int i; + + memset(recv_order, 0, NCTX * sizeof(int)); + + atexit(nng_fini); + + Convey("We can use Surveyor contexts concurrently", { + nng_socket surv; + + So(nng_aio_alloc(&resp_state.aio, (void *) resp_cb, NULL) == + 0); + So(nng_respondent0_open(&resp_state.s) == 0); + So(nng_surveyor0_open(&surv) == 0); + + for (i = 0; i < NCTX; i++) { + sent[i] = recd[i] = false; + recv_order[i] = (uint32_t) i; + if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) { + break; + } + nng_aio_set_timeout(raios[i], 5000); + if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) { + break; + } + nng_aio_set_timeout(saios[i], 5000); + } + + // So(nng_setopt_int(resp_state.s, NNG_OPT_SENDBUF, NCTX) == + // 0); + So(i == NCTX); + for (i = 0; i < NCTX; i++) { + uint32_t tmp; + int ni = rand() % NCTX; // recv index + + tmp = recv_order[i]; + recv_order[i] = recv_order[ni]; + recv_order[ni] = tmp; + } + Reset({ + for (i = 0; i < NCTX; i++) { + nng_aio_free(saios[i]); + nng_aio_free(raios[i]); + } + nng_close(surv); + nng_close(resp_state.s); + nng_aio_free(resp_state.aio); + }); + + So(nng_listen(resp_state.s, addr, NULL, 0) == 0); + So(nng_dial(surv, addr, NULL, 0) == 0); + + nng_msleep(100); // let things establish. + + // Start the rep state machine going. + resp_cb(); + + for (i = 0; i < NCTX; i++) { + if ((rv = nng_ctx_open(&ctxs[i], surv)) != 0) { + break; + } + } + So(rv == 0); + So(i == NCTX); + + // Send messages + for (i = 0; i < NCTX; i++) { + nng_msg *m; + if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) { + Fail("msg alloc failed: %s", nng_strerror(rv)); + } + if ((rv = nng_msg_append_u32(m, i)) != 0) { + Fail("append failed: %s", nng_strerror(rv)); + } + nng_aio_set_msg(saios[i], m); + nng_ctx_send(ctxs[i], saios[i]); + } + So(rv == 0); + So(i == NCTX); + + for (i = 0; i < NCTX; i++) { + nng_aio_wait(saios[i]); + if ((rv = nng_aio_result(saios[i])) != 0) { + Fail("send failed: %s", nng_strerror(rv)); + So(false); + break; + } + } + for (i = 0; i < NCTX; i++) { + if (!sent[i]) { + Fail("Index %d (%d) not sent", i, i); + } + } + + So(rv == 0); + So(i == NCTX); + // Receive answers + for (i = 0; i < NCTX; i++) { + int ri = recv_order[i]; + nng_ctx_recv(ctxs[ri], raios[ri]); + } + + for (i = 0; i < NCTX; i++) { + nng_msg *msg; + uint32_t x; + + nng_aio_wait(raios[i]); + if ((rv = nng_aio_result(raios[i])) != 0) { + Fail("recv %d (%d) %d failed: %s", i, + recv_order[i], resp_state.cnt, + nng_strerror(rv)); + continue; + } + msg = nng_aio_get_msg(raios[i]); + if ((rv = nng_msg_chop_u32(msg, &x)) != 0) { + Fail("recv msg trim: %s", nng_strerror(rv)); + break; + } + if (x != (uint32_t) i) { + Fail("message body mismatch: %x %x\n", x, + (uint32_t) i); + break; + } + + nng_msg_free(msg); + } + for (i = 0; i < NCTX; i++) { + if (!recd[i]) { + Fail("Index %d (%d) not received", i, + recv_order[i]); + break; + } + } + + So(rv == 0); + So(i == NCTX); + }); + + Convey("Given a socket and a context", { + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + + So(nng_surveyor0_open(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + nng_aio_set_timeout(aio, 1000); + + Reset({ nng_aio_free(aio); }); + + Convey("Recv on the context is ESTATE", { + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ESTATE); + }); + + Convey("Closing the socket aborts a context recv", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_ctx_recv(ctx, aio); + nng_close(surv); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + }); + + Convey("Sending a null message fails", { + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_EINVAL); + }); + + Convey("Closing the context aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_recv_aio(ctx, aio); + nng_ctx_close(ctx); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_close(surv); + }); + + Convey("We can set separate survey times", { + nng_duration ms; + So(nng_setopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, 100) == 0); + So(nng_ctx_setopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, 200) == 0); + So(nng_getopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 100); + So(nng_ctx_getopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 200); + }); + }); + + Convey("Raw mode does not support contexts", { + nng_socket surv; + nng_ctx ctx; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == NNG_ENOTSUP); + nng_close(surv); + }); +}); |
