aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
-rw-r--r--src/protocol/reqrep0/rep.c73
1 files changed, 27 insertions, 46 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 4e20466b..385860cd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -36,11 +36,9 @@ static void rep0_pipe_fini(void *);
struct rep0_ctx {
rep0_sock * sock;
- bool closed;
char * btrace;
size_t btrace_len;
size_t btrace_size;
- int ttl;
uint32_t pipe_id;
rep0_pipe * spipe; // send pipe
nni_aio * saio; // send aio
@@ -56,7 +54,6 @@ struct rep0_sock {
nni_idhash * pipes;
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
- bool closed;
rep0_ctx * ctx;
nni_pollable *recvable;
nni_pollable *sendable;
@@ -82,15 +79,11 @@ rep0_ctx_close(void *arg)
nni_aio * aio;
nni_mtx_lock(&s->lk);
- ctx->closed = true;
if ((aio = ctx->saio) != NULL) {
- nni_msg * msg;
rep0_pipe *pipe = ctx->spipe;
ctx->saio = NULL;
ctx->spipe = NULL;
nni_list_remove(&pipe->sendq, ctx);
- msg = nni_aio_get_msg(aio);
- nni_msg_free(msg);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if ((aio = ctx->raio) != NULL) {
@@ -191,53 +184,48 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_pollable_clear(s->sendable);
}
- if (ctx->closed) {
+ if (len == 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if (len == 0) {
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
+ if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
// lost interest in our reply.
- nni_aio_set_msg(aio, NULL);
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
return;
}
- if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ if (!p->busy) {
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
+
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
return;
}
- if (p->busy) {
- rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
- if (rv != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
- ctx->saio = aio;
- ctx->spipe = p;
- nni_list_append(&p->sendq, ctx);
+
+ rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
+ if (rv != 0) {
nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
return;
}
-
- p->busy = true;
- len = nni_msg_len(msg);
- nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ ctx->saio = aio;
+ ctx->spipe = p;
+ nni_list_append(&p->sendq, ctx);
nni_mtx_unlock(&s->lk);
-
- nni_aio_finish(aio, 0, len);
}
static void
@@ -376,6 +364,7 @@ rep0_pipe_stop(void *arg)
aio = ctx->saio;
ctx->saio = NULL;
msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
}
@@ -384,12 +373,11 @@ rep0_pipe_stop(void *arg)
// accept a message and discard it.)
nni_pollable_raise(s->sendable);
}
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
-
- nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
@@ -465,11 +453,6 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&s->lk);
- if (ctx->closed) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
int rv;
rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
@@ -509,7 +492,6 @@ rep0_pipe_recv_cb(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
nni_msg * msg;
- int rv;
uint8_t * body;
nni_aio * aio;
size_t len;
@@ -527,7 +509,7 @@ rep0_pipe_recv_cb(void *arg)
// Move backtrace from body to header
hops = 1;
for (;;) {
- int end = 0;
+ bool end = false;
if (hops > s->ttl) {
// This isn't malformed, but it has gone through
@@ -544,9 +526,8 @@ rep0_pipe_recv_cb(void *arg)
return;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_header_append(msg, body, 4);
- if (rv != 0) {
+ end = ((body[0] & 0x80) != 0);
+ if (nni_msg_header_append(msg, body, 4) != 0) {
// Out of memory, so drop it.
goto drop;
}
@@ -571,7 +552,6 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(aio, msg);
nni_aio_set_msg(p->aio_recv, NULL);
// schedule another receive
@@ -591,6 +571,7 @@ rep0_pipe_recv_cb(void *arg)
nni_mtx_unlock(&s->lk);
+ nni_aio_set_msg(aio, msg);
nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
return;