From 1cf3205d805be83e17ce096305d6eaef94e3a4d2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 15 Jan 2020 22:16:09 -0800 Subject: RESPOND needs to use atomic for TTL. Also, add a test to cover the RESPOND protocol. This gets about 95% of the coverage. --- src/nng.c | 4 +- src/protocol/survey0/CMakeLists.txt | 1 + src/protocol/survey0/respond.c | 114 +++---- src/protocol/survey0/respond_test.c | 603 ++++++++++++++++++++++++++++++++++++ 4 files changed, 664 insertions(+), 58 deletions(-) create mode 100644 src/protocol/survey0/respond_test.c (limited to 'src') diff --git a/src/nng.c b/src/nng.c index b3179988..d758adab 100644 --- a/src/nng.c +++ b/src/nng.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -43,7 +43,7 @@ nng_close(nng_socket s) } // No release -- close releases it. nni_sock_close(sock); - return (rv); + return (0); } int diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt index 57d236f2..eeec4ee1 100644 --- a/src/protocol/survey0/CMakeLists.txt +++ b/src/protocol/survey0/CMakeLists.txt @@ -23,4 +23,5 @@ nng_sources_if(NNG_PROTO_RESPONDENT0 respond.c xrespond.c) nng_headers_if(NNG_PROTO_RESPONDENT0 nng/protocol/survey0/respond.h) nng_defines_if(NNG_PROTO_RESPONDENT0 NNG_HAVE_RESPONDENT0) +nng_test(respond_test) nng_test(xsurvey_test) \ No newline at end of file diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 06010d99..201811de 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -48,14 +48,14 @@ struct resp0_ctx { // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_mtx mtx; - int ttl; - nni_idhash * pipes; - resp0_ctx ctx; - nni_list recvpipes; - nni_list recvq; - nni_pollable readable; - nni_pollable writable; + nni_mtx mtx; + nni_atomic_int ttl; + nni_idhash * pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; }; // resp0_pipe is our per-pipe protocol private structure. @@ -65,8 +65,8 @@ struct resp0_pipe { bool busy; uint32_t id; nni_list sendq; // contexts waiting to send - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable linkage }; @@ -159,6 +159,11 @@ resp0_ctx_send(void *arg, nni_aio *aio) } nni_mtx_lock(&s->mtx); + if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } if ((len = ctx->btrace_len) == 0) { nni_mtx_unlock(&s->mtx); @@ -187,8 +192,8 @@ resp0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->npipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, NULL); @@ -196,12 +201,6 @@ resp0_ctx_send(void *arg, nni_aio *aio) return; } - if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); @@ -237,7 +236,8 @@ resp0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); - s->ttl = 8; // Per RFC + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); // Per RFC (void) resp0_ctx_init(&s->ctx, s); @@ -267,8 +267,8 @@ resp0_pipe_stop(void *arg) { resp0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -277,25 +277,21 @@ resp0_pipe_fini(void *arg) resp0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int resp0_pipe_init(void *arg, nni_pipe *npipe, void *s) { resp0_pipe *p = arg; - int rv; - if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { - resp0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p); + nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p); NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); @@ -325,7 +321,7 @@ resp0_pipe_start(void *arg) return (rv); } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); return (rv); } @@ -336,8 +332,8 @@ resp0_pipe_close(void *arg) resp0_sock *s = p->psock; resp0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->mtx); while ((ctx = nni_list_first(&p->sendq)) != NULL) { @@ -370,9 +366,9 @@ resp0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->npipe); return; } @@ -396,8 +392,8 @@ resp0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->npipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->npipe, &p->aio_send); nni_mtx_unlock(&s->mtx); @@ -452,13 +448,13 @@ resp0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); len = nni_msg_header_len(msg); memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -485,12 +481,12 @@ resp0_pipe_recv_cb(void *arg) int hops; size_t len; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); // Move backtrace from body to header @@ -499,14 +495,14 @@ resp0_pipe_recv_cb(void *arg) bool end = 0; uint8_t *body; - if (hops > s->ttl) { + if (hops > nni_atomic_get(&s->ttl)) { goto drop; } hops++; if (nni_msg_len(msg) < 4) { // Peer is speaking garbage, kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->npipe); return; } @@ -535,10 +531,10 @@ resp0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); // Start the next receive. - nni_pipe_recv(p->npipe, p->aio_recv); + nni_pipe_recv(p->npipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -556,22 +552,28 @@ resp0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->npipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->npipe, &p->aio_recv); } static int -resp0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { resp0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t)); + int ttl; + int rv; + + if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { + nni_atomic_set(&s->ttl, ttl); + } + return (rv); } static int -resp0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) { resp0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, t)); + return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } static int @@ -636,8 +638,8 @@ static nni_proto_ctx_ops resp0_ctx_ops = { static nni_option resp0_sock_options[] = { { .o_name = NNG_OPT_MAXTTL, - .o_get = resp0_sock_get_maxttl, - .o_set = resp0_sock_set_maxttl, + .o_get = resp0_sock_get_max_ttl, + .o_set = resp0_sock_set_max_ttl, }, { .o_name = NNG_OPT_RECVFD, diff --git a/src/protocol/survey0/respond_test.c b/src/protocol/survey0/respond_test.c new file mode 100644 index 00000000..3c211843 --- /dev/null +++ b/src/protocol/survey0/respond_test.c @@ -0,0 +1,603 @@ +// +// Copyright 2020 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +#include +#include +#include + +#include +#include + +#include +#include + +#ifndef NNI_PROTO +#define NNI_PROTO(x, y) (((x) << 4u) | (y)) +#endif + +void +test_resp_identity(void) +{ + nng_socket s; + int p; + char * n; + + TEST_CHECK(nng_respondent0_open(&s) == 0); + TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); + TEST_CHECK(p == NNI_PROTO(6u, 3u)); + TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); + TEST_CHECK(p == NNI_PROTO(6u, 2u)); + TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); + TEST_CHECK(strcmp(n, "respondent") == 0); + nng_strfree(n); + TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); + TEST_CHECK(strcmp(n, "surveyor") == 0); + nng_strfree(n); + TEST_CHECK(nng_close(s) == 0); +} + +void +test_resp_send_bad_state(void) +{ + nng_socket resp; + nng_msg * msg = NULL; + + TEST_CHECK(nng_respondent0_open(&resp) == 0); + TEST_CHECK(nng_msg_alloc(&msg, 0) == 0); + TEST_CHECK(nng_sendmsg(resp, msg, 0) == NNG_ESTATE); + nng_msg_free(msg); + TEST_CHECK(nng_close(resp) == 0); +} + +void +test_resp_poll_writeable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + + TEST_NNG_PASS(nng_surveyor0_open(&surv)); + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_getopt_int(resp, NNG_OPT_SENDFD, &fd)); + TEST_CHECK(fd >= 0); + + // Not writable before connect. + TEST_CHECK(testutil_pollfd(fd) == false); + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + // Still not writable. + TEST_CHECK(testutil_pollfd(fd) == false); + + // If we get a job, *then* we become writable + TEST_NNG_SEND_STR(surv, "abc"); + TEST_NNG_RECV_STR(resp, "abc"); + TEST_CHECK(testutil_pollfd(fd) == true); + + // And is no longer writable once we send a message + TEST_NNG_SEND_STR(resp, "def"); + TEST_CHECK(testutil_pollfd(fd) == false); + // Even after receiving it + TEST_NNG_RECV_STR(surv, "def"); + TEST_CHECK(testutil_pollfd(fd) == false); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +void +test_resp_poll_readable(void) +{ + int fd; + nng_socket surv; + nng_socket resp; + nng_msg * msg; + + TEST_NNG_PASS(nng_surveyor0_open(&surv)); + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_getopt_int(resp, NNG_OPT_RECVFD, &fd)); + TEST_CHECK(fd >= 0); + + // Not readable if not connected! + TEST_CHECK(testutil_pollfd(fd) == false); + + // Even after connect (no message yet) + TEST_NNG_PASS(testutil_marry(surv, resp)); + TEST_CHECK(testutil_pollfd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + TEST_NNG_SEND_STR(surv, "abc"); + testutil_sleep(100); + + TEST_CHECK(testutil_pollfd(fd) == true); + + // and receiving makes it no longer ready + TEST_NNG_PASS(nng_recvmsg(resp, &msg, 0)); + nng_msg_free(msg); + TEST_CHECK(testutil_pollfd(fd) == false); + + // TODO verify unsolicited response + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +void +test_resp_context_no_poll(void) +{ + int fd; + nng_socket resp; + nng_ctx ctx; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_ctx_open(&ctx, resp)); + TEST_NNG_FAIL( + nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + TEST_NNG_FAIL( + nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + TEST_NNG_PASS(nng_ctx_close(ctx)); + TEST_NNG_PASS(nng_close(resp)); +} + +void +test_resp_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char addr[64]; + + testutil_scratch_addr("inproc", sizeof(addr), addr); + + TEST_NNG_PASS(nng_respondent0_open(&s1)); + TEST_NNG_PASS(nng_respondent0_open(&s2)); + + TEST_NNG_PASS(nng_listen(s1, addr, NULL, 0)); + TEST_NNG_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + testutil_sleep(100); + TEST_NNG_PASS(nng_stats_get(&stats)); + + TEST_CHECK(stats != NULL); + TEST_CHECK((reject = nng_stat_find_socket(stats, s1)) != NULL); + TEST_CHECK((reject = nng_stat_find(reject, "reject")) != NULL); + + TEST_CHECK(nng_stat_type(reject) == NNG_STAT_COUNTER); + TEST_CHECK(nng_stat_value(reject) > 0); + + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_PASS(nng_close(s2)); + nng_stats_free(stats); +} + +void +test_resp_double_recv(void) +{ + nng_socket s1; + nng_aio * aio1; + nng_aio * aio2; + + TEST_NNG_PASS(nng_respondent0_open(&s1)); + TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + + nng_recv_aio(s1, aio1); + nng_recv_aio(s1, aio2); + + nng_aio_wait(aio2); + TEST_NNG_FAIL(nng_aio_result(aio2), NNG_ESTATE); + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_FAIL(nng_aio_result(aio1), NNG_ECLOSED); + nng_aio_free(aio1); + nng_aio_free(aio2); +} + +void +test_resp_close_pipe_before_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p; + nng_aio * aio1; + nng_msg * m; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open(&surv)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + + TEST_NNG_PASS(testutil_marry(surv, resp)); + TEST_NNG_SEND_STR(surv, "test"); + + nng_recv_aio(resp, aio1); + nng_aio_wait(aio1); + TEST_NNG_PASS(nng_aio_result(aio1)); + TEST_CHECK((m = nng_aio_get_msg(aio1)) != NULL); + p = nng_msg_get_pipe(m); + TEST_NNG_PASS(nng_pipe_close(p)); + TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); + nng_aio_free(aio1); +} + +void +test_resp_close_pipe_during_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 20)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 20)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 20)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + for (int i = 0; i < 100; i++) { + int rv; + TEST_NNG_PASS(nng_msg_alloc(&m, 4)); + TEST_NNG_PASS( + nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + TEST_NNG_PASS(nng_recvmsg(resp, &m, 0)); + p = nng_msg_get_pipe(m); + rv = nng_sendmsg(resp, m, 0); + if (rv == NNG_ETIMEDOUT) { + // Queue is backed up, senders are busy. + nng_msg_free(m); + break; + } + TEST_NNG_PASS(rv); + } + TEST_NNG_PASS(nng_pipe_close(p)); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +void +test_resp_ctx_recv_aio_stopped(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + TEST_NNG_PASS(nng_ctx_open(&ctx, resp)); + + nng_aio_stop(aio); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED); + TEST_NNG_PASS(nng_ctx_close(ctx)); + TEST_NNG_PASS(nng_close(resp)); + nng_aio_free(aio); +} + +void +test_resp_close_pipe_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_pipe p = NNG_PIPE_INITIALIZER; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + TEST_NNG_PASS(nng_ctx_open(&ctx[i], resp)); + TEST_NNG_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + for (i = 0; i < 10; i++) { + TEST_NNG_PASS(nng_msg_alloc(&m, 4)); + TEST_NNG_PASS( + nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + TEST_NNG_PASS(nng_aio_result(aio[i])); + TEST_CHECK((m = nng_aio_get_msg(aio[i])) != NULL); + p = nng_msg_get_pipe(m); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that SURVEYOR socket is not reading the results. + TEST_NNG_PASS(nng_pipe_close(p)); + + for (i = 0; i < 10; i++) { + int rv; + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + TEST_NNG_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + TEST_NNG_PASS(nng_ctx_close(ctx[i])); + } + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +void +test_resp_close_context_send(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_ctx ctx[10]; + nng_aio * aio[10]; + int i; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_SENDBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_RECVBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_SENDBUF, 1)); + TEST_NNG_PASS(nng_setopt_int(surv, NNG_OPT_RECVBUF, 1)); + for (i = 0; i < 10; i++) { + TEST_NNG_PASS(nng_ctx_open(&ctx[i], resp)); + TEST_NNG_PASS(nng_aio_alloc(&aio[i], NULL, NULL)); + } + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + for (i = 0; i < 10; i++) { + TEST_NNG_PASS(nng_msg_alloc(&m, 4)); + TEST_NNG_PASS( + nng_msg_append_u32(m, (unsigned) i | 0x80000000u)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + nng_ctx_recv(ctx[i], aio[i]); + } + for (i = 0; i < 10; i++) { + nng_aio_wait(aio[i]); + TEST_NNG_PASS(nng_aio_result(aio[i])); + TEST_CHECK((m = nng_aio_get_msg(aio[i])) != NULL); + nng_aio_set_msg(aio[i], m); + nng_ctx_send(ctx[i], aio[i]); + } + + // Note that REQ socket is not reading the results. + for (i = 0; i < 10; i++) { + int rv; + TEST_NNG_PASS(nng_ctx_close(ctx[i])); + nng_aio_wait(aio[i]); + rv = nng_aio_result(aio[i]); + if (rv != 0) { + TEST_NNG_FAIL(rv, NNG_ECLOSED); + nng_msg_free(nng_aio_get_msg(aio[i])); + } + nng_aio_free(aio[i]); + } + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +static void +test_resp_ctx_recv_nonblock(void) +{ + nng_socket resp; + nng_ctx ctx; + nng_aio * aio; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_ctx_open(&ctx, resp)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_recv(ctx, aio); + + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + TEST_NNG_PASS(nng_close(resp)); + nng_aio_free(aio); +} + +static void +test_resp_ctx_send_nonblock(void) +{ + nng_socket resp; + nng_socket surv; + nng_ctx ctx; + nng_aio * aio; + nng_msg * msg; + + TEST_NNG_PASS(nng_surveyor0_open(&surv)); + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_ctx_open(&ctx, resp)); + TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); + TEST_NNG_PASS(testutil_marry(surv, resp)); + + TEST_NNG_SEND_STR(surv, "SEND"); + nng_ctx_recv(ctx, aio); + nng_aio_wait(aio); + TEST_NNG_PASS(nng_aio_result(aio)); + // message carries over + msg = nng_aio_get_msg(aio); + nng_aio_set_msg(aio, msg); + nng_aio_set_timeout(aio, 0); // Instant timeout + nng_ctx_send(ctx, aio); + + nng_aio_wait(aio); + TEST_NNG_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); + nng_aio_free(aio); + nng_msg_free(msg); +} + +void +test_resp_recv_garbage(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 200)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + TEST_NNG_PASS(nng_msg_alloc(&m, 4)); + TEST_NNG_PASS(nng_msg_append_u32(m, 1u)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + TEST_NNG_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +static void +test_resp_ttl_option(void) +{ + nng_socket resp; + int v; + bool b; + size_t sz; + const char *opt = NNG_OPT_MAXTTL; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + + TEST_NNG_PASS(nng_setopt_int(resp, opt, 1)); + TEST_NNG_FAIL(nng_setopt_int(resp, opt, 0), NNG_EINVAL); + TEST_NNG_FAIL(nng_setopt_int(resp, opt, -1), NNG_EINVAL); + TEST_NNG_FAIL(nng_setopt_int(resp, opt, 256), NNG_EINVAL); + TEST_NNG_PASS(nng_setopt_int(resp, opt, 3)); + TEST_NNG_PASS(nng_getopt_int(resp, opt, &v)); + TEST_CHECK(v == 3); + v = 0; + sz = sizeof(v); + TEST_NNG_PASS(nng_getopt(resp, opt, &v, &sz)); + TEST_CHECK(v == 3); + TEST_CHECK(sz == sizeof(v)); + + TEST_NNG_FAIL(nng_setopt(resp, opt, "", 1), NNG_EINVAL); + sz = 1; + TEST_NNG_FAIL(nng_getopt(resp, opt, &v, &sz), NNG_EINVAL); + TEST_NNG_FAIL(nng_setopt_bool(resp, opt, true), NNG_EBADTYPE); + TEST_NNG_FAIL(nng_getopt_bool(resp, opt, &b), NNG_EBADTYPE); + + TEST_NNG_PASS(nng_close(resp)); +} + +static void +test_resp_ttl_drop(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + + TEST_NNG_PASS(nng_respondent0_open(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_int(resp, NNG_OPT_MAXTTL, 3)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_RECVTIMEO, 200)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry(surv, resp)); + + // Send messages. Note that xrep implicitly adds a hop on receive. + + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_append_u32(m, 1u)); // 2 hops + TEST_NNG_PASS(nng_msg_append_u32(m, 0x80000001u)); + TEST_NNG_PASS(nng_msg_append(m, "PASS1", 6)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + TEST_NNG_PASS(nng_msg_append_u32(m, 2u)); + TEST_NNG_PASS(nng_msg_append_u32(m, 3u)); + TEST_NNG_PASS(nng_msg_append_u32(m, 0x80000002u)); + TEST_NNG_PASS(nng_msg_append(m, "FAIL2", 6)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_append_u32(m, 1u)); // 3 hops - passes + TEST_NNG_PASS(nng_msg_append_u32(m, 2u)); + TEST_NNG_PASS(nng_msg_append_u32(m, 0x80000003u)); + TEST_NNG_PASS(nng_msg_append(m, "PASS3", 6)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_append_u32(m, 1u)); // 4 hops -- discard! + TEST_NNG_PASS(nng_msg_append_u32(m, 2u)); + TEST_NNG_PASS(nng_msg_append_u32(m, 3u)); + TEST_NNG_PASS(nng_msg_append_u32(m, 0x80000003u)); + TEST_NNG_PASS(nng_msg_append(m, "FAIL4", 6)); + TEST_NNG_PASS(nng_sendmsg(surv, m, 0)); + + TEST_NNG_RECV_STR(resp, "PASS1"); + TEST_NNG_RECV_STR(resp, "PASS3"); + + TEST_NNG_FAIL(nng_recvmsg(resp, &m, 0), NNG_ETIMEDOUT); + + TEST_NNG_PASS(nng_close(resp)); + TEST_NNG_PASS(nng_close(surv)); +} + +TEST_LIST = { + { "respond identity", test_resp_identity }, + { "respond send bad state", test_resp_send_bad_state }, + { "respond poll readable", test_resp_poll_readable }, + { "respond poll writable", test_resp_poll_writeable }, + { "respond context does not poll", test_resp_context_no_poll }, + { "respond validate peer", test_resp_validate_peer }, + { "respond double recv", test_resp_double_recv }, + { "respond close pipe before send", test_resp_close_pipe_before_send }, + { "respond close pipe during send", test_resp_close_pipe_during_send }, + { "respond recv aio ctx stopped", test_resp_ctx_recv_aio_stopped }, + { "respond close pipe context send", test_resp_close_pipe_context_send }, + { "respond close context send", test_resp_close_context_send }, + { "respond context send nonblock", test_resp_ctx_send_nonblock }, + { "respond context recv nonblock", test_resp_ctx_recv_nonblock }, + { "respond recv garbage", test_resp_recv_garbage }, + { "respond ttl option", test_resp_ttl_option }, + { "respond ttl drop", test_resp_ttl_drop }, + { NULL, NULL }, +}; -- cgit v1.2.3-70-g09d2