diff options
Diffstat (limited to 'src/protocol/reqrep0/xrep.c')
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index f7189453..4773677e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg) nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_putq); + nni_mtx_lock(&s->lk); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_mtx_unlock(&s->lk); } static void @@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg) nni_msg * msg; uint32_t id; xrep0_pipe *p; - int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { + nni_mtx_lock(&s->lk); + if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } + nni_mtx_unlock(&s->lk); // Now look for another message on the upper write queue. nni_msgq_aio_get(uwq, s->aio_getq); @@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg) xrep0_pipe *p = arg; xrep0_sock *s = p->rep; nni_msg * msg; - int rv; - uint8_t * body; int hops; if (nni_aio_result(p->aio_recv) != 0) { @@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg) nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { + if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) { // Failure here causes us to drop the message. goto drop; } @@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - int end = 0; + bool end = 0; + uint8_t *body; if (hops > s->ttl) { // This isn't malformed, but it has gone through // too many hops. Do not disconnect, because we @@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg) return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { // Out of memory most likely, but keep going to // avoid breaking things. goto drop; @@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = { .sock_open = xrep0_sock_open, .sock_close = xrep0_sock_close, .sock_options = xrep0_sock_options, - .sock_filter = NULL, // No filtering for raw mode .sock_send = xrep0_sock_send, .sock_recv = xrep0_sock_recv, }; |
