diff options
| -rw-r--r-- | include/nng/nng.h | 5 | ||||
| -rw-r--r-- | include/nng/protocol/reqrep0/rep.h | 7 | ||||
| -rw-r--r-- | include/nng/protocol/reqrep0/req.h | 7 | ||||
| -rw-r--r-- | include/nng/protocol/survey0/respond.h | 7 | ||||
| -rw-r--r-- | include/nng/protocol/survey0/survey.h | 7 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 15 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 37 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req_test.c | 12 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep_test.c | 35 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 55 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq_test.c | 78 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 14 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond_test.c | 33 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 54 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey_test.c | 43 | ||||
| -rw-r--r-- | tests/testutil.c | 2 | ||||
| -rw-r--r-- | tests/testutil.h | 2 |
19 files changed, 256 insertions, 189 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h index a0e565d0..f1813cf1 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -62,6 +62,11 @@ extern "C" { // with other implementations. #define NNG_MAXADDRLEN (128) +// NNG_PROTOCOL_NUMBER is used by protocol headers to calculate their +// protocol number from a major and minor number. Applications should +// probably not need to use this. +#define NNG_PROTOCOL_NUMBER(maj, min) (((x)*16)+(y)) + // Types common to nng. // Identifiers are wrapped in a structure to improve compiler validation diff --git a/include/nng/protocol/reqrep0/rep.h b/include/nng/protocol/reqrep0/rep.h index 6322b5df..04fe18bf 100644 --- a/include/nng/protocol/reqrep0/rep.h +++ b/include/nng/protocol/reqrep0/rep.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,6 +26,11 @@ NNG_DECL int nng_rep0_open_raw(nng_socket *); #define nng_rep_open_raw nng_rep0_open_raw #endif +#define NNG_REP0_SELF 0x31 +#define NNG_REP0_PEER 0x30 +#define NNG_REP0_SELF_NAME "rep" +#define NNG_REP0_PEER_NAME "req" + #ifdef __cplusplus } #endif diff --git a/include/nng/protocol/reqrep0/req.h b/include/nng/protocol/reqrep0/req.h index 392c7932..3ed80216 100644 --- a/include/nng/protocol/reqrep0/req.h +++ b/include/nng/protocol/reqrep0/req.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -25,6 +25,11 @@ NNG_DECL int nng_req0_open_raw(nng_socket *); #define nng_req_open_raw nng_req0_open_raw #endif +#define NNG_REQ0_SELF 0x30 +#define NNG_REQ0_PEER 0x31 +#define NNG_REQ0_SELF_NAME "req" +#define NNG_REQ0_PEER_NAME "rep" + #define NNG_OPT_REQ_RESENDTIME "req:resend-time" #ifdef __cplusplus diff --git a/include/nng/protocol/survey0/respond.h b/include/nng/protocol/survey0/respond.h index b865b2ac..d7dab61d 100644 --- a/include/nng/protocol/survey0/respond.h +++ b/include/nng/protocol/survey0/respond.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,6 +26,11 @@ NNG_DECL int nng_respondent0_open_raw(nng_socket *); #define nng_respondent_open_raw nng_respondent0_open_raw #endif +#define NNG_RESPONDENT0_SELF 0x63 +#define NNG_RESPONDENT0_PEER 0x62 +#define NNG_RESPONDENT0_SELF_NAME "respondent" +#define NNG_RESPONDENT0_PEER_NAME "surveyor" + #ifdef __cplusplus } #endif diff --git a/include/nng/protocol/survey0/survey.h b/include/nng/protocol/survey0/survey.h index 37f76fbf..cea4d58b 100644 --- a/include/nng/protocol/survey0/survey.h +++ b/include/nng/protocol/survey0/survey.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,6 +26,11 @@ NNG_DECL int nng_surveyor0_open_raw(nng_socket *); #define nng_surveyor_open_raw nng_surveyor0_open_raw #endif +#define NNG_SURVEYOR0_SELF 0x62 +#define NNG_SURVEYOR0_PEER 0x63 +#define NNG_SURVEYOR0_SELF_NAME "surveyor" +#define NNG_SURVEYOR0_PEER_NAME "respondent" + #define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" #ifdef __cplusplus diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index b546044d..ef3f548a 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -8,7 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" @@ -18,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct rep0_pipe rep0_pipe; typedef struct rep0_sock rep0_sock; typedef struct rep0_ctx rep0_ctx; @@ -315,7 +306,7 @@ rep0_pipe_start(void *arg) rep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -691,8 +682,8 @@ static nni_proto_sock_ops rep0_sock_ops = { static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &rep0_sock_ops, .proto_pipe_ops = &rep0_pipe_ops, diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 0eb8b985..367ec66e 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - -void +static void test_rep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } void diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index b5681688..f5d9bbba 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -8,22 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdio.h> - #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; typedef struct req0_ctx req0_ctx; @@ -200,7 +190,7 @@ req0_pipe_start(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -851,8 +841,8 @@ static nni_proto_sock_ops req0_sock_ops = { static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &req0_sock_ops, .proto_pipe_ops = &req0_pipe_ops, diff --git a/src/protocol/reqrep0/req_test.c b/src/protocol/reqrep0/req_test.c index 8b57740d..75cff14a 100644 --- a/src/protocol/reqrep0/req_test.c +++ b/src/protocol/reqrep0/req_test.c @@ -17,10 +17,6 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_req_identity(void) { @@ -30,14 +26,14 @@ test_req_identity(void) TEST_NNG_PASS(nng_req0_open(&s)); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 + TEST_CHECK(p == NNG_REQ0_SELF); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 + TEST_CHECK(p == NNG_REQ0_PEER); // 49 TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "req") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_SELF_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n)); - TEST_CHECK(strcmp(n, "rep") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_PEER_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_close(s)); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index a036e6f6..e63cfe83 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -17,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xrep0_pipe xrep0_pipe; typedef struct xrep0_sock xrep0_sock; @@ -167,7 +159,7 @@ xrep0_pipe_start(void *arg) xrep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -303,7 +295,7 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - bool end = 0; + bool end; uint8_t *body; if (hops > ttl) { // This isn't malformed, but it has gone through @@ -428,8 +420,8 @@ static nni_proto_sock_ops xrep0_sock_ops = { static nni_proto xrep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xrep0_sock_ops, .proto_pipe_ops = &xrep0_pipe_ops, diff --git a/src/protocol/reqrep0/xrep_test.c b/src/protocol/reqrep0/xrep_test.c index 958f3240..33d834df 100644 --- a/src/protocol/reqrep0/xrep_test.c +++ b/src/protocol/reqrep0/xrep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xrep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index ef24010e..4900fb06 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -11,19 +11,11 @@ #include <stdio.h> #include "core/nng_impl.h" -//#include "nng/protocol/reqrep0/req.h" +#include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xreq0_pipe xreq0_pipe; typedef struct xreq0_sock xreq0_sock; @@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg) xreq0_pipe *p = arg; xreq0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg) xreq0_pipe *p = arg; xreq0_sock *sock = p->req; nni_msg * msg; - uint32_t id; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // We yank 4 bytes from front of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->pipe); - return; + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->pipe); + return; + } + nni_msg_trim(msg, sizeof (uint32_t)); } - id = nni_msg_trim_u32(msg); - - // Since we got this from the transport, there had better be some - // room in the header for our stuff. - nni_msg_header_must_append_u32(msg, id); - nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(sock->urq, &p->aio_putq); } @@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = { static nni_proto xreq0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xreq0_sock_ops, .proto_pipe_ops = &xreq0_pipe_ops, diff --git a/src/protocol/reqrep0/xreq_test.c b/src/protocol/reqrep0/xreq_test.c index 7f14f0cf..e474a0ac 100644 --- a/src/protocol/reqrep0/xreq_test.c +++ b/src/protocol/reqrep0/xreq_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xreq_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_req0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_req0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REQ0_SELF); + TEST_CHECK(p2 == NNG_REQ0_PEER); + TEST_CHECK(strcmp(n1, NNG_REQ0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REQ0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,6 +219,48 @@ test_xreq_recv_garbage(void) } static void +test_xreq_recv_header(void) +{ + nng_socket rep; + nng_socket req; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_rep0_open_raw(&rep)); + TEST_NNG_PASS(nng_req0_open_raw(&req)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(rep, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(req, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(req)); + TEST_NNG_PASS(nng_close(rep)); +} + +static void test_xreq_close_during_recv(void) { nng_socket rep; @@ -326,6 +365,7 @@ TEST_LIST = { { "xreq validate peer", test_xreq_validate_peer }, { "xreq recv aio stopped", test_xreq_recv_aio_stopped }, { "xreq recv garbage", test_xreq_recv_garbage }, + { "xreq recv header", test_xreq_recv_header }, { "xreq close during recv", test_xreq_close_during_recv }, { "xreq close pipe during send", test_xreq_close_pipe_during_send }, { "xreq ttl option", test_xreq_ttl_option }, diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 35a14de7..9746ff45 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -19,14 +19,6 @@ // multiple use of queues for simplicity. Typically this is used in cases // where a few dozen extra microseconds does not matter. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct surv0_pipe surv0_pipe; typedef struct surv0_sock surv0_sock; typedef struct surv0_ctx surv0_ctx; @@ -321,7 +313,7 @@ surv0_pipe_start(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -582,8 +574,8 @@ static nni_proto_sock_ops surv0_sock_ops = { static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &surv0_sock_ops, .proto_pipe_ops = &surv0_pipe_ops, diff --git a/src/protocol/survey0/xrespond_test.c b/src/protocol/survey0/xrespond_test.c index 7b8196b6..eec5c4a6 100644 --- a/src/protocol/survey0/xrespond_test.c +++ b/src/protocol/survey0/xrespond_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xresp_identity(void) { nng_socket s; - int p; - char * n; + int p1, p2; + char * n1; + char * n2; TEST_NNG_PASS(nng_respondent0_open_raw(&s)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 3u)); - TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(6u, 2u)); - TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "respondent") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "surveyor") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_RESPONDENT0_SELF); + TEST_CHECK(p2 == NNG_RESPONDENT0_PEER); + TEST_CHECK(strcmp(n1, NNG_RESPONDENT0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_RESPONDENT0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,7 +219,7 @@ test_xresp_close_pipe_during_send(void) TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p))); TEST_NNG_PASS( nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u)); - // xsrep does not exert back-pressure + // protocol does not exert back-pressure TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); } TEST_NNG_PASS(nng_pipe_close(p)); diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 13976a85..7a5a5c1b 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -8,21 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // - #include "core/nng_impl.h" #include "nng/protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -#ifndef NNI_PROTO_SURVEYOR_V0 -#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) -#endif - -#ifndef NNI_PROTO_RESPONDENT_V0 -#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) -#endif - typedef struct xsurv0_pipe xsurv0_pipe; typedef struct xsurv0_sock xsurv0_sock; @@ -152,7 +143,7 @@ xsurv0_pipe_start(void *arg) xsurv0_pipe *p = arg; xsurv0_sock *s = p->psock; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) { return (NNG_EPROTO); } @@ -236,6 +227,7 @@ xsurv0_recv_cb(void *arg) { xsurv0_pipe *p = arg; nni_msg * msg; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->npipe); @@ -245,19 +237,31 @@ xsurv0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + end = false; - // We yank 4 bytes of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->npipe); - return; - } + while (!end) { + uint8_t *body; - // This cannot fail because the header should be zero bytes with - // 32 bytes of room. - (void) nni_msg_header_append(msg, nni_msg_body(msg), 4); - (void) nni_msg_trim(msg, 4); + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->npipe); + return; + } + nni_msg_trim(msg, sizeof(uint32_t)); + } nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(p->psock->urq, &p->aio_putq); @@ -267,8 +271,8 @@ static int xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { xsurv0_sock *s = arg; - int ttl; - int rv; + int ttl; + int rv; if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) { nni_atomic_set(&s->ttl, ttl); } @@ -371,8 +375,8 @@ static nni_proto_sock_ops xsurv0_sock_ops = { static nni_proto xsurv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME }, + .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xsurv0_sock_ops, .proto_pipe_ops = &xsurv0_pipe_ops, diff --git a/src/protocol/survey0/xsurvey_test.c b/src/protocol/survey0/xsurvey_test.c index 2c17525a..b0123145 100644 --- a/src/protocol/survey0/xsurvey_test.c +++ b/src/protocol/survey0/xsurvey_test.c @@ -222,6 +222,48 @@ test_xsurvey_recv_garbage(void) } static void +test_xsurvey_recv_header(void) +{ + nng_socket resp; + nng_socket surv; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_respondent0_open_raw(&resp)); + TEST_NNG_PASS(nng_surveyor0_open_raw(&surv)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(surv, resp, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(resp, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(surv, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(surv)); + TEST_NNG_PASS(nng_close(resp)); +} + +static void test_xsurvey_close_during_recv(void) { nng_socket resp; @@ -357,6 +399,7 @@ TEST_LIST = { { "xsurvey validate peer", test_xsurvey_validate_peer }, { "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped }, { "xsurvey recv garbage", test_xsurvey_recv_garbage }, + { "xsurvey recv header", test_xsurvey_recv_header }, { "xsurvey close during recv", test_xsurvey_close_during_recv }, { "xsurvey close pipe during send", test_xsurvey_close_pipe_during_send }, diff --git a/tests/testutil.c b/tests/testutil.c index 356eb333..6934d4ad 100644 --- a/tests/testutil.c +++ b/tests/testutil.c @@ -383,4 +383,4 @@ done: nng_mtx_free(note.mx); } return (rv); -}
\ No newline at end of file +} diff --git a/tests/testutil.h b/tests/testutil.h index f28fed5c..88d978de 100644 --- a/tests/testutil.h +++ b/tests/testutil.h @@ -87,7 +87,7 @@ extern int testutil_marry_ex(nng_socket, nng_socket, nng_pipe *, nng_pipe *); TEST_CHECK_(sz_ == strlen(string) + 1, "length %d want %d", \ sz_, strlen(string) + 1); \ buf_[sizeof(buf_) - 1] = '\0'; \ - TEST_CHECK_( \ + TEST_CHECK_( \ strcmp(string, buf_) == 0, "%s == %s", string, buf_); \ } while (0) |
