diff options
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 15 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 37 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req_test.c | 12 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep_test.c | 35 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 55 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq_test.c | 78 |
8 files changed, 135 insertions, 129 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index b546044d..ef3f548a 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -8,7 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" @@ -18,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct rep0_pipe rep0_pipe; typedef struct rep0_sock rep0_sock; typedef struct rep0_ctx rep0_ctx; @@ -315,7 +306,7 @@ rep0_pipe_start(void *arg) rep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -691,8 +682,8 @@ static nni_proto_sock_ops rep0_sock_ops = { static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &rep0_sock_ops, .proto_pipe_ops = &rep0_pipe_ops, diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 0eb8b985..367ec66e 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - -void +static void test_rep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } void diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index b5681688..f5d9bbba 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -8,22 +8,12 @@ // found online at https://opensource.org/licenses/MIT. // -#include <stdio.h> - #include "core/nng_impl.h" #include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; typedef struct req0_ctx req0_ctx; @@ -200,7 +190,7 @@ req0_pipe_start(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -851,8 +841,8 @@ static nni_proto_sock_ops req0_sock_ops = { static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &req0_sock_ops, .proto_pipe_ops = &req0_pipe_ops, diff --git a/src/protocol/reqrep0/req_test.c b/src/protocol/reqrep0/req_test.c index 8b57740d..75cff14a 100644 --- a/src/protocol/reqrep0/req_test.c +++ b/src/protocol/reqrep0/req_test.c @@ -17,10 +17,6 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_req_identity(void) { @@ -30,14 +26,14 @@ test_req_identity(void) TEST_NNG_PASS(nng_req0_open(&s)); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 + TEST_CHECK(p == NNG_REQ0_SELF); TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p)); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 + TEST_CHECK(p == NNG_REQ0_PEER); // 49 TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n)); - TEST_CHECK(strcmp(n, "req") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_SELF_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n)); - TEST_CHECK(strcmp(n, "rep") == 0); + TEST_CHECK(strcmp(n, NNG_REQ0_PEER_NAME) == 0); nng_strfree(n); TEST_NNG_PASS(nng_close(s)); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index a036e6f6..e63cfe83 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -17,14 +17,6 @@ // request-reply pair. This is useful for building RPC servers, for // example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xrep0_pipe xrep0_pipe; typedef struct xrep0_sock xrep0_sock; @@ -167,7 +159,7 @@ xrep0_pipe_start(void *arg) xrep0_sock *s = p->rep; int rv; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REP0_PEER) { // Peer protocol mismatch. return (NNG_EPROTO); } @@ -303,7 +295,7 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - bool end = 0; + bool end; uint8_t *body; if (hops > ttl) { // This isn't malformed, but it has gone through @@ -428,8 +420,8 @@ static nni_proto_sock_ops xrep0_sock_ops = { static nni_proto xrep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_self = { NNG_REP0_SELF, NNG_REP0_SELF_NAME }, + .proto_peer = { NNG_REP0_PEER, NNG_REP0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xrep0_sock_ops, .proto_pipe_ops = &xrep0_pipe_ops, diff --git a/src/protocol/reqrep0/xrep_test.c b/src/protocol/reqrep0/xrep_test.c index 958f3240..33d834df 100644 --- a/src/protocol/reqrep0/xrep_test.c +++ b/src/protocol/reqrep0/xrep_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xrep_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_rep0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_rep0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REP0_SELF); + TEST_CHECK(p2 == NNG_REP0_PEER); + TEST_CHECK(strcmp(n1, NNG_REP0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REP0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index ef24010e..4900fb06 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -11,19 +11,11 @@ #include <stdio.h> #include "core/nng_impl.h" -//#include "nng/protocol/reqrep0/req.h" +#include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xreq0_pipe xreq0_pipe; typedef struct xreq0_sock xreq0_sock; @@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg) xreq0_pipe *p = arg; xreq0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg) xreq0_pipe *p = arg; xreq0_sock *sock = p->req; nni_msg * msg; - uint32_t id; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // We yank 4 bytes from front of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->pipe); - return; + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->pipe); + return; + } + nni_msg_trim(msg, sizeof (uint32_t)); } - id = nni_msg_trim_u32(msg); - - // Since we got this from the transport, there had better be some - // room in the header for our stuff. - nni_msg_header_must_append_u32(msg, id); - nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(sock->urq, &p->aio_putq); } @@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = { static nni_proto xreq0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xreq0_sock_ops, .proto_pipe_ops = &xreq0_pipe_ops, diff --git a/src/protocol/reqrep0/xreq_test.c b/src/protocol/reqrep0/xreq_test.c index 7f14f0cf..e474a0ac 100644 --- a/src/protocol/reqrep0/xreq_test.c +++ b/src/protocol/reqrep0/xreq_test.c @@ -16,29 +16,26 @@ #include <acutest.h> #include <testutil.h> -#ifndef NNI_PROTO -#define NNI_PROTO(x, y) (((x) << 4u) | (y)) -#endif - static void test_xreq_identity(void) { nng_socket s; - int p; - char * n; - - TEST_CHECK(nng_req0_open_raw(&s) == 0); - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PROTO, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 0u)); // 48 - TEST_CHECK(nng_getopt_int(s, NNG_OPT_PEER, &p) == 0); - TEST_CHECK(p == NNI_PROTO(3u, 1u)); // 49 - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PROTONAME, &n) == 0); - TEST_CHECK(strcmp(n, "req") == 0); - nng_strfree(n); - TEST_CHECK(nng_getopt_string(s, NNG_OPT_PEERNAME, &n) == 0); - TEST_CHECK(strcmp(n, "rep") == 0); - nng_strfree(n); - TEST_CHECK(nng_close(s) == 0); + int p1, p2; + char * n1; + char * n2; + + TEST_NNG_PASS(nng_req0_open_raw(&s)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PROTO, &p1)); + TEST_NNG_PASS(nng_getopt_int(s, NNG_OPT_PEER, &p2)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PROTONAME, &n1)); + TEST_NNG_PASS(nng_getopt_string(s, NNG_OPT_PEERNAME, &n2)); + TEST_NNG_PASS(nng_close(s)); + TEST_CHECK(p1 == NNG_REQ0_SELF); + TEST_CHECK(p2 == NNG_REQ0_PEER); + TEST_CHECK(strcmp(n1, NNG_REQ0_SELF_NAME) == 0); + TEST_CHECK(strcmp(n2, NNG_REQ0_PEER_NAME) == 0); + nng_strfree(n1); + nng_strfree(n2); } static void @@ -222,6 +219,48 @@ test_xreq_recv_garbage(void) } static void +test_xreq_recv_header(void) +{ + nng_socket rep; + nng_socket req; + nng_msg * m; + nng_pipe p1, p2; + uint32_t id; + + TEST_NNG_PASS(nng_rep0_open_raw(&rep)); + TEST_NNG_PASS(nng_req0_open_raw(&req)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + + TEST_NNG_PASS(testutil_marry_ex(req, rep, &p1, &p2)); + + // Simulate a few hops. + TEST_NNG_PASS(nng_msg_alloc(&m, 0)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, nng_pipe_id(p2))); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x2)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x1)); + TEST_NNG_PASS(nng_msg_header_append_u32(m, 0x80000123u)); + + TEST_NNG_PASS(nng_sendmsg(rep, m, 0)); + + TEST_NNG_PASS(nng_recvmsg(req, &m, 0)); + TEST_CHECK(nng_msg_header_len(m) == 12); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x2); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x1); + TEST_NNG_PASS(nng_msg_header_trim_u32(m, &id)); + TEST_CHECK(id == 0x80000123u); + + nng_msg_free(m); + + TEST_NNG_PASS(nng_close(req)); + TEST_NNG_PASS(nng_close(rep)); +} + +static void test_xreq_close_during_recv(void) { nng_socket rep; @@ -326,6 +365,7 @@ TEST_LIST = { { "xreq validate peer", test_xreq_validate_peer }, { "xreq recv aio stopped", test_xreq_recv_aio_stopped }, { "xreq recv garbage", test_xreq_recv_garbage }, + { "xreq recv header", test_xreq_recv_header }, { "xreq close during recv", test_xreq_close_during_recv }, { "xreq close pipe during send", test_xreq_close_pipe_during_send }, { "xreq ttl option", test_xreq_ttl_option }, |
