summaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/xrep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/xrep.c')
-rw-r--r--src/protocol/reqrep0/xrep.c25
1 files changed, 11 insertions, 14 deletions
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index f7189453..4773677e 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg)
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_putq);
+ nni_mtx_lock(&s->lk);
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+ nni_mtx_unlock(&s->lk);
}
static void
@@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
xrep0_pipe *p;
- int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
- rv = nni_msgq_tryput(p->sendq, msg);
- }
- if (rv != 0) {
+ nni_mtx_lock(&s->lk);
+ if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ (nni_msgq_tryput(p->sendq, msg) != 0)) {
nni_msg_free(msg);
}
+ nni_mtx_unlock(&s->lk);
// Now look for another message on the upper write queue.
nni_msgq_aio_get(uwq, s->aio_getq);
@@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg)
xrep0_pipe *p = arg;
xrep0_sock *s = p->rep;
nni_msg * msg;
- int rv;
- uint8_t * body;
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
@@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
- if (rv != 0) {
+ if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) {
// Failure here causes us to drop the message.
goto drop;
}
@@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = 0;
+ uint8_t *body;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
@@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory most likely, but keep going to
// avoid breaking things.
goto drop;
@@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = {
.sock_open = xrep0_sock_open,
.sock_close = xrep0_sock_close,
.sock_options = xrep0_sock_options,
- .sock_filter = NULL, // No filtering for raw mode
.sock_send = xrep0_sock_send,
.sock_recv = xrep0_sock_recv,
};