diff options
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 80 |
1 files changed, 38 insertions, 42 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index a715ab59..a29c3120 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -48,14 +48,14 @@ struct rep0_ctx { // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_mtx lk; - int ttl; - nni_idhash * pipes; - nni_list recvpipes; // list of pipes with data to receive - nni_list recvq; - rep0_ctx ctx; - nni_pollable readable; - nni_pollable writable; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + rep0_ctx ctx; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -63,8 +63,8 @@ struct rep0_pipe { nni_pipe * pipe; rep0_sock * rep; uint32_t id; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node rnode; // receivable list linkage nni_list sendq; // contexts waiting to send bool busy; @@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio) if (!p->busy) { p->busy = true; len = nni_msg_len(msg); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); nni_aio_set_msg(aio, NULL); @@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg) rep0_pipe *p = arg; nng_msg * msg; - if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { - nni_aio_set_msg(p->aio_recv, NULL); + if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) { + nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_free(msg); } - nni_aio_fini(p->aio_send); - nni_aio_fini(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); } static int rep0_pipe_init(void *arg, nni_pipe *pipe, void *s) { rep0_pipe *p = arg; - int rv; - if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { - rep0_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p); NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); @@ -329,7 +325,7 @@ rep0_pipe_start(void *arg) } // By definition, we have not received a request yet on this pipe, // so it cannot cause us to become writable. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -340,8 +336,8 @@ rep0_pipe_close(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { @@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg) nni_msg * msg; size_t len; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); nni_mtx_unlock(&s->lk); @@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&s->lk); return; } - msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); + msg = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { nni_pollable_clear(&s->readable); } - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } @@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg) size_t len; int hops; - if (nni_aio_result(p->aio_recv) != 0) { + if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - msg = nni_aio_get_msg(p->aio_recv); + msg = nni_aio_get_msg(&p->aio_recv); nni_msg_set_pipe(msg, p->id); @@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_close(p->pipe); return; } @@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(p->aio_recv, NULL); + nni_aio_set_msg(&p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(&s->writable); } // schedule another receive - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); ctx->btrace_len = len; memcpy(ctx->btrace, nni_msg_header(msg), len); @@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); - nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_recv(p->pipe, p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); } static int |
