aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/xreq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/xreq.c')
-rw-r--r--src/protocol/reqrep0/xreq.c55
1 files changed, 29 insertions, 26 deletions
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index ef24010e..4900fb06 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -11,19 +11,11 @@
#include <stdio.h>
#include "core/nng_impl.h"
-//#include "nng/protocol/reqrep0/req.h"
+#include "nng/protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-#ifndef NNI_PROTO_REQ_V0
-#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
-#endif
-
-#ifndef NNI_PROTO_REP_V0
-#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
-#endif
-
typedef struct xreq0_pipe xreq0_pipe;
typedef struct xreq0_sock xreq0_sock;
@@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg)
xreq0_pipe *p = arg;
xreq0_sock *s = p->req;
- if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
+ if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) {
return (NNG_EPROTO);
}
@@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg)
xreq0_pipe *p = arg;
xreq0_sock *sock = p->req;
nni_msg * msg;
- uint32_t id;
+ bool end;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg)
msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
-
- // We yank 4 bytes from front of body, and move them to the header.
- if (nni_msg_len(msg) < 4) {
- // Peer gave us garbage, so kick it.
- nni_msg_free(msg);
- nni_pipe_close(p->pipe);
- return;
+ end = false;
+
+ while (!end) {
+ uint8_t *body;
+
+ if (nni_msg_len(msg) < 4) {
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = ((body[0] & 0x80u) != 0);
+
+ if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) {
+ // TODO: bump a no-memory stat
+ nni_msg_free(msg);
+ // Closing the pipe may release some memory.
+ // It at least gives an indication to the peer
+ // that we've lost the message.
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_msg_trim(msg, sizeof (uint32_t));
}
- id = nni_msg_trim_u32(msg);
-
- // Since we got this from the transport, there had better be some
- // room in the header for our stuff.
- nni_msg_header_must_append_u32(msg, id);
-
nni_aio_set_msg(&p->aio_putq, msg);
nni_msgq_aio_put(sock->urq, &p->aio_putq);
}
@@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = {
static nni_proto xreq0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REQ_V0, "req" },
- .proto_peer = { NNI_PROTO_REP_V0, "rep" },
+ .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME },
+ .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
.proto_sock_ops = &xreq0_sock_ops,
.proto_pipe_ops = &xreq0_pipe_ops,