aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/rep.c15
-rw-r--r--src/protocol/reqrep0/rep_test.c37
-rw-r--r--src/protocol/reqrep0/req.c16
-rw-r--r--src/protocol/reqrep0/req_test.c12
-rw-r--r--src/protocol/reqrep0/xrep.c16
-rw-r--r--src/protocol/reqrep0/xrep_test.c35
-rw-r--r--src/protocol/reqrep0/xreq.c55
-rw-r--r--src/protocol/reqrep0/xreq_test.c78
8 files changed, 135 insertions, 129 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index b546044d..ef3f548a 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -8,7 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -18,14 +17,6 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-#ifndef NNI_PROTO_REQ_V0
-#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
-#endif
-
-#ifndef NNI_PROTO_REP_V0
-#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
-#endif
-
typedef struct rep0_pipe rep0_pipe;
typedef struct rep0_sock rep0_sock;
typedef struct rep0_ctx rep0_ctx;
@@ -315,7 +306,7 @@ rep0_pipe_start(void *arg)
rep0_sock *s = p->rep;
int rv;
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) {
// Peer protocol mismatch.
return (NNG_EPROTO);
}
@@ -691,8 +682,8 @@ static nni_proto_sock_ops rep0_sock_ops = {
static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REP_V0, "rep" },
- .proto_peer = { NNI_PROTO_REQ_V0, "req" },
+ .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME },
+ .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &rep0_sock_ops,
.proto_pipe_ops = &rep0_pipe_ops,
diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c
index 0eb8b985..367ec66e 100644
--- a/src/protocol/reqrep0/rep_test.c
+++ b/src/protocol/reqrep0/rep_test.c
@@ -16,29 +16,26 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
-void
+static void
test_rep_identity(void)
{
nng_socket s;
- int p;
- char * n;
-
- TEST_CHECK(nng_rep0_open(&s) == 0);
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
- TEST_CHECK(strcmp(n, "rep") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
- TEST_CHECK(strcmp(n, "req") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_close(s) == 0);
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ TEST_NNG_PASS(nng_rep0_open(&s));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2));
+ TEST_NNG_PASS(nng_close(s));
+ TEST_CHECK(p1 == NNG_REP0_SELF);
+ TEST_CHECK(p2 == NNG_REP0_PEER);
+ TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0);
+ TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0);
+ nng_strfree(n1);
+ nng_strfree(n2);
}
void
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index b5681688..f5d9bbba 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -8,22 +8,12 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <stdio.h>
-
#include "core/nng_impl.h"
#include "nng/protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-#ifndef NNI_PROTO_REQ_V0
-#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
-#endif
-
-#ifndef NNI_PROTO_REP_V0
-#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
-#endif
-
typedef struct req0_pipe req0_pipe;
typedef struct req0_sock req0_sock;
typedef struct req0_ctx req0_ctx;
@@ -200,7 +190,7 @@ req0_pipe_start(void *arg)
req0_pipe *p = arg;
req0_sock *s = p->req;
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
+ if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
return (NNG_EPROTO);
}
@@ -851,8 +841,8 @@ static nni_proto_sock_ops req0_sock_ops = {
static nni_proto req0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REQ_V0, "req" },
- .proto_peer = { NNI_PROTO_REP_V0, "rep" },
+ .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME },
+ .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &req0_sock_ops,
.proto_pipe_ops = &req0_pipe_ops,
diff --git a/src/protocol/reqrep0/req_test.c b/src/protocol/reqrep0/req_test.c
index 8b57740d..75cff14a 100644
--- a/src/protocol/reqrep0/req_test.c
+++ b/src/protocol/reqrep0/req_test.c
@@ -17,10 +17,6 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
static void
test_req_identity(void)
{
@@ -30,14 +26,14 @@ test_req_identity(void)
TEST_NNG_PASS(nng_req0_open(&s));
TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p));
- TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
+ TEST_CHECK(p == NNG_REQ0_SELF);
TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p));
- TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
+ TEST_CHECK(p == NNG_REQ0_PEER); // 49
TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n));
- TEST_CHECK(strcmp(n, "req") == 0);
+ TEST_CHECK(strcmp(n, NNG_REQ0_SELF_NAME) == 0);
nng_strfree(n);
TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n));
- TEST_CHECK(strcmp(n, "rep") == 0);
+ TEST_CHECK(strcmp(n, NNG_REQ0_PEER_NAME) == 0);
nng_strfree(n);
TEST_NNG_PASS(nng_close(s));
}
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index a036e6f6..e63cfe83 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -17,14 +17,6 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-#ifndef NNI_PROTO_REQ_V0
-#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
-#endif
-
-#ifndef NNI_PROTO_REP_V0
-#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
-#endif
-
typedef struct xrep0_pipe xrep0_pipe;
typedef struct xrep0_sock xrep0_sock;
@@ -167,7 +159,7 @@ xrep0_pipe_start(void *arg)
xrep0_sock *s = p->rep;
int rv;
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) {
// Peer protocol mismatch.
return (NNG_EPROTO);
}
@@ -303,7 +295,7 @@ xrep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- bool end = 0;
+ bool end;
uint8_t *body;
if (hops > ttl) {
// This isn't malformed, but it has gone through
@@ -428,8 +420,8 @@ static nni_proto_sock_ops xrep0_sock_ops = {
static nni_proto xrep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REP_V0, "rep" },
- .proto_peer = { NNI_PROTO_REQ_V0, "req" },
+ .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME },
+ .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
.proto_sock_ops = &xrep0_sock_ops,
.proto_pipe_ops = &xrep0_pipe_ops,
diff --git a/src/protocol/reqrep0/xrep_test.c b/src/protocol/reqrep0/xrep_test.c
index 958f3240..33d834df 100644
--- a/src/protocol/reqrep0/xrep_test.c
+++ b/src/protocol/reqrep0/xrep_test.c
@@ -16,29 +16,26 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
static void
test_xrep_identity(void)
{
nng_socket s;
- int p;
- char * n;
-
- TEST_CHECK(nng_rep0_open_raw(&s) == 0);
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
- TEST_CHECK(strcmp(n, "rep") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
- TEST_CHECK(strcmp(n, "req") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_close(s) == 0);
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ TEST_NNG_PASS(nng_rep0_open_raw(&s));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2));
+ TEST_NNG_PASS(nng_close(s));
+ TEST_CHECK(p1 == NNG_REP0_SELF);
+ TEST_CHECK(p2 == NNG_REP0_PEER);
+ TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0);
+ TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0);
+ nng_strfree(n1);
+ nng_strfree(n2);
}
static void
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index ef24010e..4900fb06 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -11,19 +11,11 @@
#include <stdio.h>
#include "core/nng_impl.h"
-//#include "nng/protocol/reqrep0/req.h"
+#include "nng/protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-#ifndef NNI_PROTO_REQ_V0
-#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
-#endif
-
-#ifndef NNI_PROTO_REP_V0
-#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
-#endif
-
typedef struct xreq0_pipe xreq0_pipe;
typedef struct xreq0_sock xreq0_sock;
@@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg)
xreq0_pipe *p = arg;
xreq0_sock *s = p->req;
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
+ if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
return (NNG_EPROTO);
}
@@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg)
xreq0_pipe *p = arg;
xreq0_sock *sock = p->req;
nni_msg * msg;
- uint32_t id;
+ bool end;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg)
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
-
- // We yank 4 bytes from front of body, and move them to the header.
- if (nni_msg_len(msg) < 4) {
- // Peer gave us garbage, so kick it.
- nni_msg_free(msg);
- nni_pipe_close(p->pipe);
- return;
+ end = false;
+
+ while (!end) {
+ uint8_t *body;
+
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+
+ if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) {
+ // TODO: bump a no-memory stat
+ nni_msg_free(msg);
+ // Closing the pipe may release some memory.
+ // It at least gives an indication to the peer
+ // that we've lost the message.
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_msg_trim(msg, sizeof (uint32_t));
}
- id = nni_msg_trim_u32(msg);
-
- // Since we got this from the transport, there had better be some
- // room in the header for our stuff.
- nni_msg_header_must_append_u32(msg, id);
-
nni_aio_set_msg(&p->aio_putq, msg);
nni_msgq_aio_put(sock->urq, &p->aio_putq);
}
@@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = {
static nni_proto xreq0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REQ_V0, "req" },
- .proto_peer = { NNI_PROTO_REP_V0, "rep" },
+ .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME },
+ .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
.proto_sock_ops = &xreq0_sock_ops,
.proto_pipe_ops = &xreq0_pipe_ops,
diff --git a/src/protocol/reqrep0/xreq_test.c b/src/protocol/reqrep0/xreq_test.c
index 7f14f0cf..e474a0ac 100644
--- a/src/protocol/reqrep0/xreq_test.c
+++ b/src/protocol/reqrep0/xreq_test.c
@@ -16,29 +16,26 @@
#include <acutest.h>
#include <testutil.h>
-#ifndef NNI_PROTO
-#define NNI_PROTO(x, y) (((x) << 4u) | (y))
-#endif
-
static void
test_xreq_identity(void)
{
nng_socket s;
- int p;
- char * n;
-
- TEST_CHECK(nng_req0_open_raw(&s) == 0);
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48
- TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0);
- TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0);
- TEST_CHECK(strcmp(n, "req") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0);
- TEST_CHECK(strcmp(n, "rep") == 0);
- nng_strfree(n);
- TEST_CHECK(nng_close(s) == 0);
+ int p1, p2;
+ char * n1;
+ char * n2;
+
+ TEST_NNG_PASS(nng_req0_open_raw(&s));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1));
+ TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1));
+ TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2));
+ TEST_NNG_PASS(nng_close(s));
+ TEST_CHECK(p1 == NNG_REQ0_SELF);
+ TEST_CHECK(p2 == NNG_REQ0_PEER);
+ TEST_CHECK(strcmp(n1, NNG_REQ0_SELF_NAME) == 0);
+ TEST_CHECK(strcmp(n2, NNG_REQ0_PEER_NAME) == 0);
+ nng_strfree(n1);
+ nng_strfree(n2);
}
static void
@@ -222,6 +219,48 @@ test_xreq_recv_garbage(void)
}
static void
+test_xreq_recv_header(void)
+{
+ nng_socket rep;
+ nng_socket req;
+ nng_msg * m;
+ nng_pipe p1, p2;
+ uint32_t id;
+
+ TEST_NNG_PASS(nng_rep0_open_raw(&rep));
+ TEST_NNG_PASS(nng_req0_open_raw(&req));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+ TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000));
+
+ TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2));
+
+ // Simulate a few hops.
+ TEST_NNG_PASS(nng_msg_alloc(&m, 0));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2)));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1));
+ TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u));
+
+ TEST_NNG_PASS(nng_sendmsg(rep, m, 0));
+
+ TEST_NNG_PASS(nng_recvmsg(req, &m, 0));
+ TEST_CHECK(nng_msg_header_len(m) == 12);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x2);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x1);
+ TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id));
+ TEST_CHECK(id == 0x80000123u);
+
+ nng_msg_free(m);
+
+ TEST_NNG_PASS(nng_close(req));
+ TEST_NNG_PASS(nng_close(rep));
+}
+
+static void
test_xreq_close_during_recv(void)
{
nng_socket rep;
@@ -326,6 +365,7 @@ TEST_LIST = {
{ "xreq validate peer", test_xreq_validate_peer },
{ "xreq recv aio stopped", test_xreq_recv_aio_stopped },
{ "xreq recv garbage", test_xreq_recv_garbage },
+ { "xreq recv header", test_xreq_recv_header },
{ "xreq close during recv", test_xreq_close_during_recv },
{ "xreq close pipe during send", test_xreq_close_pipe_during_send },
{ "xreq ttl option", test_xreq_ttl_option },