aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
-rw-r--r--src/protocol/reqrep0/rep.c80
1 files changed, 38 insertions, 42 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index a715ab59..a29c3120 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -48,14 +48,14 @@ struct rep0_ctx {
// rep0_sock is our per-socket protocol private structure.
struct rep0_sock {
- nni_mtx lk;
- int ttl;
- nni_idhash * pipes;
- nni_list recvpipes; // list of pipes with data to receive
- nni_list recvq;
- rep0_ctx ctx;
- nni_pollable readable;
- nni_pollable writable;
+ nni_mtx lk;
+ int ttl;
+ nni_idhash * pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ rep0_ctx ctx;
+ nni_pollable readable;
+ nni_pollable writable;
};
// rep0_pipe is our per-pipe protocol private structure.
@@ -63,8 +63,8 @@ struct rep0_pipe {
nni_pipe * pipe;
rep0_sock * rep;
uint32_t id;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node rnode; // receivable list linkage
nni_list sendq; // contexts waiting to send
bool busy;
@@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio)
if (!p->busy) {
p->busy = true;
len = nni_msg_len(msg);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
nni_aio_set_msg(aio, NULL);
@@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg)
{
rep0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg)
rep0_pipe *p = arg;
nng_msg * msg;
- if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
- nni_aio_set_msg(p->aio_recv, NULL);
+ if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
}
static int
rep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
rep0_pipe *p = arg;
- int rv;
- if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) {
- rep0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p);
NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode);
@@ -329,7 +325,7 @@ rep0_pipe_start(void *arg)
}
// By definition, we have not received a request yet on this pipe,
// so it cannot cause us to become writable.
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -340,8 +336,8 @@ rep0_pipe_close(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
@@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
@@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->lk);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_list_remove(&s->recvpipes, p);
if (nni_list_empty(&s->recvpipes)) {
nni_pollable_clear(&s->readable);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
@@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg)
size_t len;
int hops;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
+ msg = nni_aio_get_msg(&p->aio_recv);
nni_msg_set_pipe(msg, p->id);
@@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
// schedule another receive
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
ctx->btrace_len = len;
memcpy(ctx->btrace, nni_msg_header(msg), len);
@@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static int