diff options
Diffstat (limited to 'src/protocol/reqrep0/xreq.c')
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 55 |
1 files changed, 29 insertions, 26 deletions
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index ef24010e..4900fb06 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -11,19 +11,11 @@ #include <stdio.h> #include "core/nng_impl.h" -//#include "nng/protocol/reqrep0/req.h" +#include "nng/protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -#ifndef NNI_PROTO_REQ_V0 -#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) -#endif - -#ifndef NNI_PROTO_REP_V0 -#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) -#endif - typedef struct xreq0_pipe xreq0_pipe; typedef struct xreq0_sock xreq0_sock; @@ -124,7 +116,7 @@ xreq0_pipe_start(void *arg) xreq0_pipe *p = arg; xreq0_sock *s = p->req; - if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + if (nni_pipe_peer(p->pipe) != NNG_REQ0_PEER) { return (NNG_EPROTO); } @@ -204,7 +196,7 @@ xreq0_recv_cb(void *arg) xreq0_pipe *p = arg; xreq0_sock *sock = p->req; nni_msg * msg; - uint32_t id; + bool end; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -214,20 +206,31 @@ xreq0_recv_cb(void *arg) msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // We yank 4 bytes from front of body, and move them to the header. - if (nni_msg_len(msg) < 4) { - // Peer gave us garbage, so kick it. - nni_msg_free(msg); - nni_pipe_close(p->pipe); - return; + end = false; + + while (!end) { + uint8_t *body; + + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_close(p->pipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80u) != 0); + + if (nng_msg_header_append(msg, body, sizeof (uint32_t)) != 0) { + // TODO: bump a no-memory stat + nni_msg_free(msg); + // Closing the pipe may release some memory. + // It at least gives an indication to the peer + // that we've lost the message. + nni_pipe_close(p->pipe); + return; + } + nni_msg_trim(msg, sizeof (uint32_t)); } - id = nni_msg_trim_u32(msg); - - // Since we got this from the transport, there had better be some - // room in the header for our stuff. - nni_msg_header_must_append_u32(msg, id); - nni_aio_set_msg(&p->aio_putq, msg); nni_msgq_aio_put(sock->urq, &p->aio_putq); } @@ -301,8 +304,8 @@ static nni_proto_sock_ops xreq0_sock_ops = { static nni_proto xreq0_proto = { .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_self = { NNG_REQ0_SELF, NNG_REQ0_SELF_NAME }, + .proto_peer = { NNG_REQ0_PEER, NNG_REQ0_PEER_NAME }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, .proto_sock_ops = &xreq0_sock_ops, .proto_pipe_ops = &xreq0_pipe_ops, |
