diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-18 14:35:36 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-18 19:50:47 -0800 |
| commit | 0242199ddc95e8a683304897a3e1bc26c7e74c0f (patch) | |
| tree | 6b60ee6b9d1d1988c28b42d4af33005a4b8a3ae1 /src/protocol | |
| parent | 10133ca9b5439e67b287703739e23e7d82fb76c4 (diff) | |
| download | nng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.tar.gz nng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.tar.bz2 nng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.zip | |
fixes #1142 raw mode use of message headers is inconsistent
This correctly moves the entire protocol header for XREQ and XRESPONDENT
protocols to the message header (not the body). This is where it should
always have been. There is some small chance that applications which were
coded to parse the header from the body will break. We don't think there
are any such applications in use.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 15 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 37 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req_test.c | 12 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep_test.c | 35 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 55 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq_test.c | 78 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 14 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond_test.c | 33 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 54 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey_test.c | 43 |
12 files changed, 225 insertions, 183 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index b546044d..ef3f548a 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -8,7 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" @@ -18,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct rep0_pipe rep0_pipe; typedef struct rep0_sock rep0_sock; typedef struct rep0_ctx rep0_ctx; @@ -315,7 +306,7 @@ rep0_pipe_start(void *arg) rep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -691,8 +682,8 @@ static nni_proto_sock_ops rep0_sock_ops = { static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &rep0_sock_ops, .proto_pipe_ops = &rep0_pipe_ops, diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 0eb8b985..367ec66e 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - -void +static void test_rep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } void diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index b5681688..f5d9bbba 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -8,22 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdio.h> - #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; typedef struct req0_ctx req0_ctx; @@ -200,7 +190,7 @@ req0_pipe_start(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -851,8 +841,8 @@ static nni_proto_sock_ops req0_sock_ops = { static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &req0_sock_ops, .proto_pipe_ops = &req0_pipe_ops, diff --git a/src/protocol/reqrep0/req_test.c b/src/protocol/reqrep0/req_test.c index 8b57740d..75cff14a 100644 --- a/src/protocol/reqrep0/req_test.c +++ b/src/protocol/reqrep0/req_test.c @@ -17,10 +17,6 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_req_identity(void) { @@ -30,14 +26,14 @@ test_req_identity(void) TEST_NNG_PASS(nng_req0_open(&s)); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 + TEST_CHECK(p == NNG_REQ0_SELF); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 + TEST_CHECK(p == NNG_REQ0_PEER); // 49 TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "req") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_SELF_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n)); - TEST_CHECK(strcmp(n, "rep") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_PEER_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_close(s)); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index a036e6f6..e63cfe83 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -17,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xrep0_pipe xrep0_pipe; typedef struct xrep0_sock xrep0_sock; @@ -167,7 +159,7 @@ xrep0_pipe_start(void *arg) xrep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -303,7 +295,7 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - bool end = 0; + bool end; uint8_t *body; if (hops > ttl) { // This isn't malformed, but it has gone through @@ -428,8 +420,8 @@ static nni_proto_sock_ops xrep0_sock_ops = { static nni_proto xrep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xrep0_sock_ops, .proto_pipe_ops = &xrep0_pipe_ops, diff --git a/src/protocol/reqrep0/xrep_test.c b/src/protocol/reqrep0/xrep_test.c index 958f3240..33d834df 100644 --- a/src/protocol/reqrep0/xrep_test.c +++ b/src/protocol/reqrep0/xrep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xrep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index ef24010e..4900fb06 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -11,19 +11,11 @@ #include <stdio.h> #include "core/nng_impl.h" -//#include "nng/protocol/reqrep0/req.h" +#include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xreq0_pipe xreq0_pipe; typedef struct xreq0_sock xreq0_sock; @@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg) xreq0_pipe *p = arg; xreq0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg) xreq0_pipe *p = arg; xreq0_sock *sock = p->req; nni_msg * msg; - uint32_t id; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // We yank 4 bytes from front of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->pipe); - return; + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->pipe); + return; + } + nni_msg_trim(msg, sizeof (uint32_t)); } - id = nni_msg_trim_u32(msg); - - // Since we got this from the transport, there had better be some - // room in the header for our stuff. - nni_msg_header_must_append_u32(msg, id); - nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(sock->urq, &p->aio_putq); } @@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = { static nni_proto xreq0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xreq0_sock_ops, .proto_pipe_ops = &xreq0_pipe_ops, diff --git a/src/protocol/reqrep0/xreq_test.c b/src/protocol/reqrep0/xreq_test.c index 7f14f0cf..e474a0ac 100644 --- a/src/protocol/reqrep0/xreq_test.c +++ b/src/protocol/reqrep0/xreq_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xreq_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_req0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_req0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REQ0_SELF); + TEST_CHECK(p2 == NNG_REQ0_PEER); + TEST_CHECK(strcmp(n1, NNG_REQ0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REQ0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,6 +219,48 @@ test_xreq_recv_garbage(void) } static void +test_xreq_recv_header(void) +{ + nng_socket rep; + nng_socket req; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_rep0_open_raw(&rep)); + TEST_NNG_PASS(nng_req0_open_raw(&req)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(rep, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(req, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(req)); + TEST_NNG_PASS(nng_close(rep)); +} + +static void test_xreq_close_during_recv(void) { nng_socket rep; @@ -326,6 +365,7 @@ TEST_LIST = { { "xreq validate peer", test_xreq_validate_peer }, { "xreq recv aio stopped", test_xreq_recv_aio_stopped }, { "xreq recv garbage", test_xreq_recv_garbage }, + { "xreq recv header", test_xreq_recv_header }, { "xreq close during recv", test_xreq_close_during_recv }, { "xreq close pipe during send", test_xreq_close_pipe_during_send }, { "xreq ttl option", test_xreq_ttl_option }, diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 35a14de7..9746ff45 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -19,14 +19,6 @@ // multiple use of queues for simplicity. Typically this is used in cases // where a few dozen extra microseconds does not matter. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct surv0_pipe surv0_pipe; typedef struct surv0_sock surv0_sock; typedef struct surv0_ctx surv0_ctx; @@ -321,7 +313,7 @@ surv0_pipe_start(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -582,8 +574,8 @@ static nni_proto_sock_ops surv0_sock_ops = { static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &surv0_sock_ops, .proto_pipe_ops = &surv0_pipe_ops, diff --git a/src/protocol/survey0/xrespond_test.c b/src/protocol/survey0/xrespond_test.c index 7b8196b6..eec5c4a6 100644 --- a/src/protocol/survey0/xrespond_test.c +++ b/src/protocol/survey0/xrespond_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xresp_identity(void) { nng_socket s; - int p; - char * n; + int p1, p2; + char * n1; + char * n2; TEST_NNG_PASS(nng_respondent0_open_raw(&s)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 3u)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 2u)); - TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "respondent") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "surveyor") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_RESPONDENT0_SELF); + TEST_CHECK(p2 == NNG_RESPONDENT0_PEER); + TEST_CHECK(strcmp(n1, NNG_RESPONDENT0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_RESPONDENT0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,7 +219,7 @@ test_xresp_close_pipe_during_send(void) TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); TEST_NNG_PASS( nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u)); - // xsrep does not exert back-pressure + // protocol does not exert back-pressure TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); } TEST_NNG_PASS(nng_pipe_close(p)); diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 13976a85..7a5a5c1b 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -8,21 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // - #include "core/nng_impl.h" #include "nng/protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct xsurv0_pipe xsurv0_pipe; typedef struct xsurv0_sock xsurv0_sock; @@ -152,7 +143,7 @@ xsurv0_pipe_start(void *arg) xsurv0_pipe *p = arg; xsurv0_sock *s = p->psock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -236,6 +227,7 @@ xsurv0_recv_cb(void *arg) { xsurv0_pipe *p = arg; nni_msg * msg; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); @@ -245,19 +237,31 @@ xsurv0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + end = false; - // We yank 4 bytes of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->npipe); - return; - } + while (!end) { + uint8_t *body; - // This cannot fail because the header should be zero bytes with - // 32 bytes of room. - (void) nni_msg_header_append(msg, nni_msg_body(msg), 4); - (void) nni_msg_trim(msg, 4); + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->npipe); + return; + } + nni_msg_trim(msg, sizeof(uint32_t)); + } nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(p->psock->urq, &p->aio_putq); @@ -267,8 +271,8 @@ static int xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { xsurv0_sock *s = arg; - int ttl; - int rv; + int ttl; + int rv; if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { nni_atomic_set(&s->ttl, ttl); } @@ -371,8 +375,8 @@ static nni_proto_sock_ops xsurv0_sock_ops = { static nni_proto xsurv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xsurv0_sock_ops, .proto_pipe_ops = &xsurv0_pipe_ops, diff --git a/src/protocol/survey0/xsurvey_test.c b/src/protocol/survey0/xsurvey_test.c index 2c17525a..b0123145 100644 --- a/src/protocol/survey0/xsurvey_test.c +++ b/src/protocol/survey0/xsurvey_test.c @@ -222,6 +222,48 @@ test_xsurvey_recv_garbage(void) } static void +test_xsurvey_recv_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_respondent0_open_raw(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(surv, resp, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(surv, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +static void test_xsurvey_close_during_recv(void) { nng_socket resp; @@ -357,6 +399,7 @@ TEST_LIST = { { "xsurvey validate peer", test_xsurvey_validate_peer }, { "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped }, { "xsurvey recv garbage", test_xsurvey_recv_garbage }, + { "xsurvey recv header", test_xsurvey_recv_header }, { "xsurvey close during recv", test_xsurvey_close_during_recv }, { "xsurvey close pipe during send", test_xsurvey_close_pipe_during_send }, |
