diff options
Diffstat (limited to 'src/protocol/survey0')
| -rw-r--r-- | src/protocol/survey0/survey.c | 14 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond_test.c | 33 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 54 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey_test.c | 43 |
4 files changed, 90 insertions, 54 deletions
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 35a14de7..9746ff45 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -19,14 +19,6 @@ // multiple use of queues for simplicity. Typically this is used in cases // where a few dozen extra microseconds does not matter. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct surv0_pipe surv0_pipe; typedef struct surv0_sock surv0_sock; typedef struct surv0_ctx surv0_ctx; @@ -321,7 +313,7 @@ surv0_pipe_start(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -582,8 +574,8 @@ static nni_proto_sock_ops surv0_sock_ops = { static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &surv0_sock_ops, .proto_pipe_ops = &surv0_pipe_ops, diff --git a/src/protocol/survey0/xrespond_test.c b/src/protocol/survey0/xrespond_test.c index 7b8196b6..eec5c4a6 100644 --- a/src/protocol/survey0/xrespond_test.c +++ b/src/protocol/survey0/xrespond_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xresp_identity(void) { nng_socket s; - int p; - char * n; + int p1, p2; + char * n1; + char * n2; TEST_NNG_PASS(nng_respondent0_open_raw(&s)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 3u)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 2u)); - TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "respondent") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "surveyor") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_RESPONDENT0_SELF); + TEST_CHECK(p2 == NNG_RESPONDENT0_PEER); + TEST_CHECK(strcmp(n1, NNG_RESPONDENT0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_RESPONDENT0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,7 +219,7 @@ test_xresp_close_pipe_during_send(void) TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); TEST_NNG_PASS( nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u)); - // xsrep does not exert back-pressure + // protocol does not exert back-pressure TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); } TEST_NNG_PASS(nng_pipe_close(p)); diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 13976a85..7a5a5c1b 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -8,21 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // - #include "core/nng_impl.h" #include "nng/protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct xsurv0_pipe xsurv0_pipe; typedef struct xsurv0_sock xsurv0_sock; @@ -152,7 +143,7 @@ xsurv0_pipe_start(void *arg) xsurv0_pipe *p = arg; xsurv0_sock *s = p->psock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -236,6 +227,7 @@ xsurv0_recv_cb(void *arg) { xsurv0_pipe *p = arg; nni_msg * msg; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); @@ -245,19 +237,31 @@ xsurv0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + end = false; - // We yank 4 bytes of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->npipe); - return; - } + while (!end) { + uint8_t *body; - // This cannot fail because the header should be zero bytes with - // 32 bytes of room. - (void) nni_msg_header_append(msg, nni_msg_body(msg), 4); - (void) nni_msg_trim(msg, 4); + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->npipe); + return; + } + nni_msg_trim(msg, sizeof(uint32_t)); + } nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(p->psock->urq, &p->aio_putq); @@ -267,8 +271,8 @@ static int xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { xsurv0_sock *s = arg; - int ttl; - int rv; + int ttl; + int rv; if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { nni_atomic_set(&s->ttl, ttl); } @@ -371,8 +375,8 @@ static nni_proto_sock_ops xsurv0_sock_ops = { static nni_proto xsurv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xsurv0_sock_ops, .proto_pipe_ops = &xsurv0_pipe_ops, diff --git a/src/protocol/survey0/xsurvey_test.c b/src/protocol/survey0/xsurvey_test.c index 2c17525a..b0123145 100644 --- a/src/protocol/survey0/xsurvey_test.c +++ b/src/protocol/survey0/xsurvey_test.c @@ -222,6 +222,48 @@ test_xsurvey_recv_garbage(void) } static void +test_xsurvey_recv_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_respondent0_open_raw(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(surv, resp, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(surv, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +static void test_xsurvey_close_during_recv(void) { nng_socket resp; @@ -357,6 +399,7 @@ TEST_LIST = { { "xsurvey validate peer", test_xsurvey_validate_peer }, { "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped }, { "xsurvey recv garbage", test_xsurvey_recv_garbage }, + { "xsurvey recv header", test_xsurvey_recv_header }, { "xsurvey close during recv", test_xsurvey_close_during_recv }, { "xsurvey close pipe during send", test_xsurvey_close_pipe_during_send }, |
