aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/rep.c73
-rw-r--r--src/protocol/reqrep0/xrep.c25
-rw-r--r--src/protocol/reqrep0/xreq.c20
3 files changed, 46 insertions, 72 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 4e20466b..385860cd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -36,11 +36,9 @@ static void rep0_pipe_fini(void *);
struct rep0_ctx {
rep0_sock * sock;
- bool closed;
char * btrace;
size_t btrace_len;
size_t btrace_size;
- int ttl;
uint32_t pipe_id;
rep0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
@@ -56,7 +54,6 @@ struct rep0_sock {
nni_idhash * pipes;
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
- bool closed;
rep0_ctx * ctx;
nni_pollable *recvable;
nni_pollable *sendable;
@@ -82,15 +79,11 @@ rep0_ctx_close(void *arg)
nni_aio * aio;
nni_mtx_lock(&s->lk);
- ctx->closed = true;
if ((aio = ctx->saio) != NULL) {
- nni_msg * msg;
rep0_pipe *pipe = ctx->spipe;
ctx->saio = NULL;
ctx->spipe = NULL;
nni_list_remove(&pipe->sendq, ctx);
- msg = nni_aio_get_msg(aio);
- nni_msg_free(msg);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if ((aio = ctx->raio) != NULL) {
@@ -191,53 +184,48 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_pollable_clear(s->sendable);
}
- if (ctx->closed) {
+ if (len == 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if (len == 0) {
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
+ if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
// lost interest in our reply.
- nni_aio_set_msg(aio, NULL);
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
return;
}
- if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ if (!p->busy) {
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
return;
}
- if (p->busy) {
- rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
- if (rv != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
- ctx->saio = aio;
- ctx->spipe = p;
- nni_list_append(&p->sendq, ctx);
+
+ rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
+ if (rv != 0) {
nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
return;
}
-
- p->busy = true;
- len = nni_msg_len(msg);
- nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ ctx->saio = aio;
+ ctx->spipe = p;
+ nni_list_append(&p->sendq, ctx);
nni_mtx_unlock(&s->lk);
-
- nni_aio_finish(aio, 0, len);
}
static void
@@ -376,6 +364,7 @@ rep0_pipe_stop(void *arg)
aio = ctx->saio;
ctx->saio = NULL;
msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
@@ -384,12 +373,11 @@ rep0_pipe_stop(void *arg)
// accept a message and discard it.)
nni_pollable_raise(s->sendable);
}
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
-
- nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
@@ -465,11 +453,6 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&s->lk);
- if (ctx->closed) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
int rv;
rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
@@ -509,7 +492,6 @@ rep0_pipe_recv_cb(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
nni_msg * msg;
- int rv;
uint8_t * body;
nni_aio * aio;
size_t len;
@@ -527,7 +509,7 @@ rep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = false;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
@@ -544,9 +526,8 @@ rep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory, so drop it.
goto drop;
}
@@ -571,7 +552,6 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(aio, msg);
nni_aio_set_msg(p->aio_recv, NULL);
// schedule another receive
@@ -591,6 +571,7 @@ rep0_pipe_recv_cb(void *arg)
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, msg);
nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
return;
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index f7189453..4773677e 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg)
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_putq);
+ nni_mtx_lock(&s->lk);
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+ nni_mtx_unlock(&s->lk);
}
static void
@@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
xrep0_pipe *p;
- int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
- rv = nni_msgq_tryput(p->sendq, msg);
- }
- if (rv != 0) {
+ nni_mtx_lock(&s->lk);
+ if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) ||
+ (nni_msgq_tryput(p->sendq, msg) != 0)) {
nni_msg_free(msg);
}
+ nni_mtx_unlock(&s->lk);
// Now look for another message on the upper write queue.
nni_msgq_aio_get(uwq, s->aio_getq);
@@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg)
xrep0_pipe *p = arg;
xrep0_sock *s = p->rep;
nni_msg * msg;
- int rv;
- uint8_t * body;
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
@@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
// Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
- if (rv != 0) {
+ if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) {
// Failure here causes us to drop the message.
goto drop;
}
@@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = 0;
+ uint8_t *body;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
@@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory most likely, but keep going to
// avoid breaking things.
goto drop;
@@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = {
.sock_open = xrep0_sock_open,
.sock_close = xrep0_sock_close,
.sock_options = xrep0_sock_options,
- .sock_filter = NULL, // No filtering for raw mode
.sock_send = xrep0_sock_send,
.sock_recv = xrep0_sock_recv,
};
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 5c1841b2..13ae7418 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -226,25 +226,21 @@ xreq0_recv_cb(void *arg)
// We yank 4 bytes from front of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
- // Malformed message.
- goto malformed;
+ // Peer gave us garbage, so kick it.
+ nni_msg_free(msg);
+ nni_pipe_stop(p->pipe);
+ return;
}
id = nni_msg_trim_u32(msg);
if (nni_msg_header_append_u32(msg, id) != 0) {
- // Arguably we could just discard and carry on. But
- // dropping the connection is probably more helpful since
- // it lets the other side see that a problem occurred.
- // Plus it gives us a chance to reclaim some memory.
- goto malformed;
+ // Probably ENOMEM, discard and carry on.
+ nni_msg_free(msg);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return;
}
nni_aio_set_msg(p->aio_putq, msg);
nni_msgq_aio_put(sock->urq, p->aio_putq);
- return;
-
-malformed:
- nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
}
static void