diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /tests/surveyctx.c | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'tests/surveyctx.c')
| -rw-r--r-- | tests/surveyctx.c | 298 |
1 files changed, 298 insertions, 0 deletions
diff --git a/tests/surveyctx.c b/tests/surveyctx.c new file mode 100644 index 00000000..9ab2de40 --- /dev/null +++ b/tests/surveyctx.c @@ -0,0 +1,298 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +#include <string.h> + +static struct { + nng_aio *aio; + enum { START, SEND, RECV } state; + nng_socket s; + nng_msg * msg; + int cnt; +} resp_state; + +void +resp_cb(void) +{ + int rv; + + if (resp_state.state == START) { + resp_state.state = RECV; + nng_recv_aio(resp_state.s, resp_state.aio); + return; + } + if ((rv = nng_aio_result(resp_state.aio)) != 0) { + if (resp_state.msg != NULL) { + nng_msg_free(resp_state.msg); + resp_state.msg = NULL; + } + return; + } + switch (resp_state.state) { + case START: + break; + case RECV: + resp_state.msg = nng_aio_get_msg(resp_state.aio); + resp_state.state = SEND; + nng_aio_set_msg(resp_state.aio, resp_state.msg); + nng_send_aio(resp_state.s, resp_state.aio); + break; + case SEND: + resp_state.msg = NULL; + resp_state.state = RECV; + nng_aio_set_msg(resp_state.aio, NULL); + nng_recv_aio(resp_state.s, resp_state.aio); + resp_state.cnt++; + break; + } +} + +#define NCTX 10 + +void +markr(void *arg) +{ + *(bool *) arg = true; +} + +static void +marks(void *arg) +{ + *(bool *) arg = true; +} + +nng_ctx ctxs[NCTX]; +uint32_t recv_order[NCTX]; +nng_aio *saios[NCTX]; +nng_aio *raios[NCTX]; +bool recd[NCTX]; +bool sent[NCTX]; + +TestMain("Surveyor concurrent contexts", { + int rv; + const char *addr = "inproc://test"; + int i; + + memset(recv_order, 0, NCTX * sizeof(int)); + + atexit(nng_fini); + + Convey("We can use Surveyor contexts concurrently", { + nng_socket surv; + + So(nng_aio_alloc(&resp_state.aio, (void *) resp_cb, NULL) == + 0); + So(nng_respondent0_open(&resp_state.s) == 0); + So(nng_surveyor0_open(&surv) == 0); + + for (i = 0; i < NCTX; i++) { + sent[i] = recd[i] = false; + recv_order[i] = (uint32_t) i; + if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) { + break; + } + nng_aio_set_timeout(raios[i], 5000); + if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) { + break; + } + nng_aio_set_timeout(saios[i], 5000); + } + + // So(nng_setopt_int(resp_state.s, NNG_OPT_SENDBUF, NCTX) == + // 0); + So(i == NCTX); + for (i = 0; i < NCTX; i++) { + uint32_t tmp; + int ni = rand() % NCTX; // recv index + + tmp = recv_order[i]; + recv_order[i] = recv_order[ni]; + recv_order[ni] = tmp; + } + Reset({ + for (i = 0; i < NCTX; i++) { + nng_aio_free(saios[i]); + nng_aio_free(raios[i]); + } + nng_close(surv); + nng_close(resp_state.s); + nng_aio_free(resp_state.aio); + }); + + So(nng_listen(resp_state.s, addr, NULL, 0) == 0); + So(nng_dial(surv, addr, NULL, 0) == 0); + + nng_msleep(100); // let things establish. + + // Start the rep state machine going. + resp_cb(); + + for (i = 0; i < NCTX; i++) { + if ((rv = nng_ctx_open(&ctxs[i], surv)) != 0) { + break; + } + } + So(rv == 0); + So(i == NCTX); + + // Send messages + for (i = 0; i < NCTX; i++) { + nng_msg *m; + if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) { + Fail("msg alloc failed: %s", nng_strerror(rv)); + } + if ((rv = nng_msg_append_u32(m, i)) != 0) { + Fail("append failed: %s", nng_strerror(rv)); + } + nng_aio_set_msg(saios[i], m); + nng_ctx_send(ctxs[i], saios[i]); + } + So(rv == 0); + So(i == NCTX); + + for (i = 0; i < NCTX; i++) { + nng_aio_wait(saios[i]); + if ((rv = nng_aio_result(saios[i])) != 0) { + Fail("send failed: %s", nng_strerror(rv)); + So(false); + break; + } + } + for (i = 0; i < NCTX; i++) { + if (!sent[i]) { + Fail("Index %d (%d) not sent", i, i); + } + } + + So(rv == 0); + So(i == NCTX); + // Receive answers + for (i = 0; i < NCTX; i++) { + int ri = recv_order[i]; + nng_ctx_recv(ctxs[ri], raios[ri]); + } + + for (i = 0; i < NCTX; i++) { + nng_msg *msg; + uint32_t x; + + nng_aio_wait(raios[i]); + if ((rv = nng_aio_result(raios[i])) != 0) { + Fail("recv %d (%d) %d failed: %s", i, + recv_order[i], resp_state.cnt, + nng_strerror(rv)); + continue; + } + msg = nng_aio_get_msg(raios[i]); + if ((rv = nng_msg_chop_u32(msg, &x)) != 0) { + Fail("recv msg trim: %s", nng_strerror(rv)); + break; + } + if (x != (uint32_t) i) { + Fail("message body mismatch: %x %x\n", x, + (uint32_t) i); + break; + } + + nng_msg_free(msg); + } + for (i = 0; i < NCTX; i++) { + if (!recd[i]) { + Fail("Index %d (%d) not received", i, + recv_order[i]); + break; + } + } + + So(rv == 0); + So(i == NCTX); + }); + + Convey("Given a socket and a context", { + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + + So(nng_surveyor0_open(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + nng_aio_set_timeout(aio, 1000); + + Reset({ nng_aio_free(aio); }); + + Convey("Recv on the context is ESTATE", { + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ESTATE); + }); + + Convey("Closing the socket aborts a context recv", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_ctx_recv(ctx, aio); + nng_close(surv); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + }); + + Convey("Sending a null message fails", { + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_EINVAL); + }); + + Convey("Closing the context aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nng_recv_aio(ctx, aio); + nng_ctx_close(ctx); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_close(surv); + }); + + Convey("We can set separate survey times", { + nng_duration ms; + So(nng_setopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, 100) == 0); + So(nng_ctx_setopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, 200) == 0); + So(nng_getopt_ms( + surv, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 100); + So(nng_ctx_getopt_ms( + ctx, NNG_OPT_SURVEYOR_SURVEYTIME, &ms) == 0); + So(ms == 200); + }); + }); + + Convey("Raw mode does not support contexts", { + nng_socket surv; + nng_ctx ctx; + So(nng_surveyor0_open_raw(&surv) == 0); + So(nng_ctx_open(&ctx, surv) == NNG_ENOTSUP); + nng_close(surv); + }); +}); |
