aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/survey0')
-rw-r--r--src/protocol/survey0/survey.c14
-rw-r--r--src/protocol/survey0/xrespond_test.c33
-rw-r--r--src/protocol/survey0/xsurvey.c54
-rw-r--r--src/protocol/survey0/xsurvey_test.c43
4 files changed, 90 insertions, 54 deletions
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 35a14de7..9746ff45 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -19,14 +19,6 @@
// multiple use of queues for simplicity. Typically this is used in cases
// where a few dozen extra microseconds does not matter.
-#ifndef NNI_PROTO_SURVEYOR_V0
-#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
-#endif
-
-#ifndef NNI_PROTO_RESPONDENT_V0
-#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
-#endif
-
typedef struct surv0_pipe surv0_pipe;
typedef struct surv0_sock surv0_sock;
typedef struct surv0_ctx surv0_ctx;
@@ -321,7 +313,7 @@ surv0_pipe_start(void *arg)
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) {
return (NNG_EPROTO);
}
@@ -582,8 +574,8 @@ static nni_proto_sock_ops surv0_sock_ops = {
static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME },
+ .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &surv0_sock_ops,
.proto_pipe_ops = &surv0_pipe_ops,
diff --git a/src/protocol/survey0/xrespond_test.c b/src/protocol/survey0/xrespond_test.c
index 7b8196b6..eec5c4a6 100644
--- a/src/protocol/survey0/xrespond_test.c
+++ b/src/protocol/survey0/xrespond_test.c
@@ -16,29 +16,26 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
static void
test_xresp_identity(void)
{
nng_socket s;
- int p;
- char * n;
+ int p1, p2;
+ char * n1;
+ char * n2;
TEST_NNG_PASS(nng_respondent0_open_raw(&s));
- TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p));
- TEST_CHECK(p == NNI_PROTO(6u, 3u));
- TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p));
- TEST_CHECK(p == NNI_PROTO(6u, 2u));
- TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n));
- TEST_CHECK(strcmp(n, "respondent") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
- TEST_CHECK(strcmp(n, "surveyor") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_close(s) == 0);
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2));
+ TEST_NNG_PASS(nng_close(s));
+ TEST_CHECK(p1 == NNG_RESPONDENT0_SELF);
+ TEST_CHECK(p2 == NNG_RESPONDENT0_PEER);
+ TEST_CHECK(strcmp(n1, NNG_RESPONDENT0_SELF_NAME) == 0);
+ TEST_CHECK(strcmp(n2, NNG_RESPONDENT0_PEER_NAME) == 0);
+ nng_strfree(n1);
+ nng_strfree(n2);
}
static void
@@ -222,7 +219,7 @@ test_xresp_close_pipe_during_send(void)
TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p)));
TEST_NNG_PASS(
nng_msg_header_append_u32(m, (unsigned) i | 0x80000000u));
- // xsrep does not exert back-pressure
+ // protocol does not exert back-pressure
TEST_NNG_PASS(nng_sendmsg(resp, m, 0));
}
TEST_NNG_PASS(nng_pipe_close(p));
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 13976a85..7a5a5c1b 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -8,21 +8,12 @@
// found online at https://opensource.org/licenses/MIT.
//
-
#include "core/nng_impl.h"
#include "nng/protocol/survey0/survey.h"
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-#ifndef NNI_PROTO_SURVEYOR_V0
-#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
-#endif
-
-#ifndef NNI_PROTO_RESPONDENT_V0
-#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
-#endif
-
typedef struct xsurv0_pipe xsurv0_pipe;
typedef struct xsurv0_sock xsurv0_sock;
@@ -152,7 +143,7 @@ xsurv0_pipe_start(void *arg)
xsurv0_pipe *p = arg;
xsurv0_sock *s = p->psock;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ if (nni_pipe_peer(p->npipe) != NNG_SURVEYOR0_PEER) {
return (NNG_EPROTO);
}
@@ -236,6 +227,7 @@ xsurv0_recv_cb(void *arg)
{
xsurv0_pipe *p = arg;
nni_msg * msg;
+ bool end;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
@@ -245,19 +237,31 @@ xsurv0_recv_cb(void *arg)
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
+ end = false;
- // We yank 4 bytes of body, and move them to the header.
- if (nni_msg_len(msg) < 4) {
- // Peer gave us garbage, so kick it.
- nni_msg_free(msg);
- nni_pipe_close(p->npipe);
- return;
- }
+ while (!end) {
+ uint8_t *body;
- // This cannot fail because the header should be zero bytes with
- // 32 bytes of room.
- (void) nni_msg_header_append(msg, nni_msg_body(msg), 4);
- (void) nni_msg_trim(msg, 4);
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->npipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+
+ if (nni_msg_header_append(msg, body, sizeof(uint32_t)) != 0) {
+ // TODO: bump a no-memory stat
+ nni_msg_free(msg);
+ // Closing the pipe may release some memory.
+ // It at least gives an indication to the peer
+ // that we've lost the message.
+ nni_pipe_close(p->npipe);
+ return;
+ }
+ nni_msg_trim(msg, sizeof(uint32_t));
+ }
nni_aio_set_msg(&p->aio_putq, msg);
nni_msgq_aio_put(p->psock->urq, &p->aio_putq);
@@ -267,8 +271,8 @@ static int
xsurv0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
xsurv0_sock *s = arg;
- int ttl;
- int rv;
+ int ttl;
+ int rv;
if ((rv = nni_copyin_int(&ttl, buf, sz, 1, 255, t)) == 0) {
nni_atomic_set(&s->ttl, ttl);
}
@@ -371,8 +375,8 @@ static nni_proto_sock_ops xsurv0_sock_ops = {
static nni_proto xsurv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_self = { NNG_SURVEYOR0_SELF, NNG_SURVEYOR0_SELF_NAME },
+ .proto_peer = { NNG_SURVEYOR0_PEER, NNG_SURVEYOR0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
.proto_sock_ops = &xsurv0_sock_ops,
.proto_pipe_ops = &xsurv0_pipe_ops,
diff --git a/src/protocol/survey0/xsurvey_test.c b/src/protocol/survey0/xsurvey_test.c
index 2c17525a..b0123145 100644
--- a/src/protocol/survey0/xsurvey_test.c
+++ b/src/protocol/survey0/xsurvey_test.c
@@ -222,6 +222,48 @@ test_xsurvey_recv_garbage(void)
}
static void
+test_xsurvey_recv_header(void)
+{
+ nng_socket resp;
+ nng_socket surv;
+ nng_msg * m;
+ nng_pipe p1, p2;
+ uint32_t id;
+
+ TEST_NNG_PASS(nng_respondent0_open_raw(&resp));
+ TEST_NNG_PASS(nng_surveyor0_open_raw(&surv));
+ TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(surv, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(resp, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_NNG_PASS(testutil_marry_ex(surv, resp, &p1, &p2));
+
+ // Simulate a few hops.
+ TEST_NNG_PASS(nng_msg_alloc(&m, 0));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u));
+
+ TEST_NNG_PASS(nng_sendmsg(resp, m, 0));
+
+ TEST_NNG_PASS(nng_recvmsg(surv, &m, 0));
+ TEST_CHECK(nng_msg_header_len(m) == 12);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x2);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x1);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x80000123u);
+
+ nng_msg_free(m);
+
+ TEST_NNG_PASS(nng_close(surv));
+ TEST_NNG_PASS(nng_close(resp));
+}
+
+static void
test_xsurvey_close_during_recv(void)
{
nng_socket resp;
@@ -357,6 +399,7 @@ TEST_LIST = {
{ "xsurvey validate peer", test_xsurvey_validate_peer },
{ "xsurvey recv aio stopped", test_xsurvey_recv_aio_stopped },
{ "xsurvey recv garbage", test_xsurvey_recv_garbage },
+ { "xsurvey recv header", test_xsurvey_recv_header },
{ "xsurvey close during recv", test_xsurvey_close_during_recv },
{ "xsurvey close pipe during send",
test_xsurvey_close_pipe_during_send },