aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-18 14:35:36 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-18 19:50:47 -0800
commit0242199ddc95e8a683304897a3e1bc26c7e74c0f (patch)
tree6b60ee6b9d1d1988c28b42d4af33005a4b8a3ae1 /src/protocol/survey0
parent10133ca9b5439e67b287703739e23e7d82fb76c4 (diff)
downloadnng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.tar.gz
nng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.tar.bz2
nng-0242199ddc95e8a683304897a3e1bc26c7e74c0f.zip
fixes #1142 raw mode use of message headers is inconsistent
This correctly moves the entire protocol header for XREQ and XRESPONDENT protocols to the message header (not the body). This is where it should always have been. There is some small chance that applications which were coded to parse the header from the body will break. We don't think there are any such applications in use.
Diffstat (limited to 'src/protocol/survey0')
-rw-r--r--src/protocol/survey0/survey.c14
-rw-r--r--src/protocol/survey0/xrespond_test.c33
-rw-r--r--src/protocol/survey0/xsurvey.c54
-rw-r--r--src/protocol/survey0/xsurvey_test.c43
4 files changed, 90 insertions, 54 deletions
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 35a14de7..9746ff45 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -19,14 +19,6 @@
// multiple use of queues for simplicity. Typically this is used in cases
// where a few dozen extra microseconds does not matter.
-#ifndef NNI_PROTO_SURVEYOR_V0
-#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
-#endif
-
-#ifndef NNI_PROTO_RESPONDENT_V0
-#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
-#endif
-
typedef struct surv0_pipe surv0_pipe;
typedef struct surv0_sock surv0_sock;
typedef struct surv0_ctx surv0_ctx;
@@ -321,7 +313,7 @@ surv0_pipe_start(void *arg)
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) {
return (NNG_EPROTO);
}
@@ -582,8 +574,8 @@ static nni_proto_sock_ops surv0_sock_ops = {
static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME },
+ .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &surv0_sock_ops,
.proto_pipe_ops = &surv0_pipe_ops,
diff --git a/src/protocol/survey0/xrespond_test.c b/src/protocol/survey0/xrespond_test.c
index 7b8196b6..eec5c4a6 100644
--- a/src/protocol/survey0/xrespond_test.c
+++ b/src/protocol/survey0/xrespond_test.c
@@ -16,29 +16,26 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
static void
test_xresp_identity(void)
{
nng_socket s;
- int p;
- char * n;
+ int p1, p2;
+ char * n1;
+ char * n2;
TEST_NNG_PASS(nng_respondent0_open_raw(&s));
- TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p));
- TEST_CHECK(p == NNI_PROTO(6u, 3u));
- TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p));
- TEST_CHECK(p == NNI_PROTO(6u, 2u));
- TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n));
- TEST_CHECK(strcmp(n, "respondent") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
- TEST_CHECK(strcmp(n, "surveyor") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_close(s) == 0);
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2));
+ TEST_NNG_PASS(nng_close(s));
+ TEST_CHECK(p1 == NNG_RESPONDENT0_SELF);
+ TEST_CHECK(p2 == NNG_RESPONDENT0_PEER);
+ TEST_CHECK(strcmp(n1, NNG_RESPONDENT0_SELF_NAME) == 0);
+ TEST_CHECK(strcmp(n2, NNG_RESPONDENT0_PEER_NAME) == 0);
+ nng_strfree(n1);
+ nng_strfree(n2);
}
static void
@@ -222,7 +219,7 @@ test_xresp_close_pipe_during_send(void)
TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p)));
TEST_NNG_PASS(
nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u));
- // xsrep does not exert back-pressure
+ // protocol does not exert back-pressure
TEST_NNG_PASS(nng_sendmsg(resp, m, 0));
}
TEST_NNG_PASS(nng_pipe_close(p));
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 13976a85..7a5a5c1b 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -8,21 +8,12 @@
// found online at https://opensource.org/licenses/MIT.
//
-
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-#ifndef NNI_PROTO_SURVEYOR_V0
-#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
-#endif
-
-#ifndef NNI_PROTO_RESPONDENT_V0
-#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
-#endif
-
typedef struct xsurv0_pipe xsurv0_pipe;
typedef struct xsurv0_sock xsurv0_sock;
@@ -152,7 +143,7 @@ xsurv0_pipe_start(void *arg)
xsurv0_pipe *p = arg;
xsurv0_sock *s = p->psock;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) {
return (NNG_EPROTO);
}
@@ -236,6 +227,7 @@ xsurv0_recv_cb(void *arg)
{
xsurv0_pipe *p = arg;
nni_msg * msg;
+ bool end;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
@@ -245,19 +237,31 @@ xsurv0_recv_cb(void *arg)
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
+ end = false;
- // We yank 4 bytes of body, and move them to the header.
- if (nni_msg_len(msg) < 4) {
- // Peer gave us garbage, so kick it.
- nni_msg_free(msg);
- nni_pipe_close(p->npipe);
- return;
- }
+ while (!end) {
+ uint8_t *body;
- // This cannot fail because the header should be zero bytes with
- // 32 bytes of room.
- (void) nni_msg_header_append(msg, nni_msg_body(msg), 4);
- (void) nni_msg_trim(msg, 4);
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->npipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+
+ if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) {
+ // TODO: bump a no-memory stat
+ nni_msg_free(msg);
+ // Closing the pipe may release some memory.
+ // It at least gives an indication to the peer
+ // that we've lost the message.
+ nni_pipe_close(p->npipe);
+ return;
+ }
+ nni_msg_trim(msg, sizeof(uint32_t));
+ }
nni_aio_set_msg(&p->aio_putq, msg);
nni_msgq_aio_put(p->psock->urq, &p->aio_putq);
@@ -267,8 +271,8 @@ static int
xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
xsurv0_sock *s = arg;
- int ttl;
- int rv;
+ int ttl;
+ int rv;
if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
nni_atomic_set(&s->ttl, ttl);
}
@@ -371,8 +375,8 @@ static nni_proto_sock_ops xsurv0_sock_ops = {
static nni_proto xsurv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME },
+ .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
.proto_sock_ops = &xsurv0_sock_ops,
.proto_pipe_ops = &xsurv0_pipe_ops,
diff --git a/src/protocol/survey0/xsurvey_test.c b/src/protocol/survey0/xsurvey_test.c
index 2c17525a..b0123145 100644
--- a/src/protocol/survey0/xsurvey_test.c
+++ b/src/protocol/survey0/xsurvey_test.c
@@ -222,6 +222,48 @@ test_xsurvey_recv_garbage(void)
}
static void
+test_xsurvey_recv_header(void)
+{
+ nng_socket resp;
+ nng_socket surv;
+ nng_msg * m;
+ nng_pipe p1, p2;
+ uint32_t id;
+
+ TEST_NNG_PASS(nng_respondent0_open_raw(&resp));
+ TEST_NNG_PASS(nng_surveyor0_open_raw(&surv));
+ TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_NNG_PASS(testutil_marry_ex(surv, resp, &p1, &p2));
+
+ // Simulate a few hops.
+ TEST_NNG_PASS(nng_msg_alloc(&m, 0));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u));
+
+ TEST_NNG_PASS(nng_sendmsg(resp, m, 0));
+
+ TEST_NNG_PASS(nng_recvmsg(surv, &m, 0));
+ TEST_CHECK(nng_msg_header_len(m) == 12);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x2);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x1);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x80000123u);
+
+ nng_msg_free(m);
+
+ TEST_NNG_PASS(nng_close(surv));
+ TEST_NNG_PASS(nng_close(resp));
+}
+
+static void
test_xsurvey_close_during_recv(void)
{
nng_socket resp;
@@ -357,6 +399,7 @@ TEST_LIST = {
{ "xsurvey validate peer", test_xsurvey_validate_peer },
{ "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped },
{ "xsurvey recv garbage", test_xsurvey_recv_garbage },
+ { "xsurvey recv header", test_xsurvey_recv_header },
{ "xsurvey close during recv", test_xsurvey_close_during_recv },
{ "xsurvey close pipe during send",
test_xsurvey_close_pipe_during_send },