diff options
Diffstat (limited to 'src/protocol/pair/pair_v0.c')
| -rw-r--r-- | src/protocol/pair/pair_v0.c | 90 |
1 files changed, 47 insertions, 43 deletions
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index ef420051..486ce43b 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -43,10 +43,10 @@ struct pair0_sock { struct pair0_pipe { nni_pipe * npipe; pair0_sock *psock; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; }; static int @@ -76,18 +76,34 @@ pair0_sock_fini(void *arg) NNI_FREE_STRUCT(s); } +static void +pair0_pipe_fini(void *arg) +{ + pair0_pipe *p = arg; + + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + NNI_FREE_STRUCT(p); +} + static int pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) { pair0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_aio_init(&p->aio_send, pair0_send_cb, p); - nni_aio_init(&p->aio_recv, pair0_recv_cb, p); - nni_aio_init(&p->aio_getq, pair0_getq_cb, p); - nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + pair0_pipe_fini(p); + return (rv); + } p->npipe = npipe; p->psock = psock; @@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (0); } -static void -pair0_pipe_fini(void *arg) -{ - pair0_pipe *p = arg; - - nni_aio_fini(&p->aio_send); - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); - NNI_FREE_STRUCT(p); -} - static int pair0_pipe_start(void *arg) { @@ -123,8 +127,8 @@ pair0_pipe_start(void *arg) // Schedule a getq on the upper, and a read from the pipe. // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, &p->aio_getq); - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } @@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - nni_aio_cancel(&p->aio_send, NNG_ECANCELED); - nni_aio_cancel(&p->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&p->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&p->aio_getq, NNG_ECANCELED); + nni_aio_cancel(p->aio_send, NNG_ECANCELED); + nni_aio_cancel(p->aio_recv, NNG_ECANCELED); + nni_aio_cancel(p->aio_putq, NNG_ECANCELED); + nni_aio_cancel(p->aio_getq, NNG_ECANCELED); nni_mtx_lock(&s->mtx); if (s->ppipe == p) { @@ -154,17 +158,17 @@ pair0_recv_cb(void *arg) pair0_sock *s = p->psock; nni_msg * msg; - if (nni_aio_result(&p->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_recv.a_msg; - p->aio_putq.a_msg = msg; - p->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, &p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -172,13 +176,13 @@ pair0_putq_cb(void *arg) { pair0_pipe *p = arg; - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(p->aio_putq.a_msg); - p->aio_putq.a_msg = NULL; + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void @@ -187,14 +191,14 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); return; } - p->aio_send.a_msg = p->aio_getq.a_msg; - p->aio_getq.a_msg = NULL; - nni_pipe_send(p->npipe, &p->aio_send); + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + nni_pipe_send(p->npipe, p->aio_send); } static void @@ -203,14 +207,14 @@ pair0_send_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(p->aio_send.a_msg); - p->aio_send.a_msg = NULL; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); return; } - nni_msgq_aio_get(s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->uwq, p->aio_getq); } static void |
