diff options
Diffstat (limited to 'src/protocol/pair')
| -rw-r--r-- | src/protocol/pair/pair_v0.c | 90 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v1.c | 130 |
2 files changed, 110 insertions, 110 deletions
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index ef420051..486ce43b 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -43,10 +43,10 @@ struct pair0_sock { struct pair0_pipe { nni_pipe * npipe; pair0_sock *psock; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; }; static int @@ -76,18 +76,34 @@ pair0_sock_fini(void *arg) NNI_FREE_STRUCT(s); } +static void +pair0_pipe_fini(void *arg) +{ + pair0_pipe *p = arg; + + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + NNI_FREE_STRUCT(p); +} + static int pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) { pair0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_aio_init(&p->aio_send, pair0_send_cb, p); - nni_aio_init(&p->aio_recv, pair0_recv_cb, p); - nni_aio_init(&p->aio_getq, pair0_getq_cb, p); - nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + pair0_pipe_fini(p); + return (rv); + } p->npipe = npipe; p->psock = psock; @@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (0); } -static void -pair0_pipe_fini(void *arg) -{ - pair0_pipe *p = arg; - - nni_aio_fini(&p->aio_send); - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); - NNI_FREE_STRUCT(p); -} - static int pair0_pipe_start(void *arg) { @@ -123,8 +127,8 @@ pair0_pipe_start(void *arg) // Schedule a getq on the upper, and a read from the pipe. // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, &p->aio_getq); - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } @@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - nni_aio_cancel(&p->aio_send, NNG_ECANCELED); - nni_aio_cancel(&p->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&p->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&p->aio_getq, NNG_ECANCELED); + nni_aio_cancel(p->aio_send, NNG_ECANCELED); + nni_aio_cancel(p->aio_recv, NNG_ECANCELED); + nni_aio_cancel(p->aio_putq, NNG_ECANCELED); + nni_aio_cancel(p->aio_getq, NNG_ECANCELED); nni_mtx_lock(&s->mtx); if (s->ppipe == p) { @@ -154,17 +158,17 @@ pair0_recv_cb(void *arg) pair0_sock *s = p->psock; nni_msg * msg; - if (nni_aio_result(&p->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_recv.a_msg; - p->aio_putq.a_msg = msg; - p->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, &p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -172,13 +176,13 @@ pair0_putq_cb(void *arg) { pair0_pipe *p = arg; - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(p->aio_putq.a_msg); - p->aio_putq.a_msg = NULL; + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void @@ -187,14 +191,14 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); return; } - p->aio_send.a_msg = p->aio_getq.a_msg; - p->aio_getq.a_msg = NULL; - nni_pipe_send(p->npipe, &p->aio_send); + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + nni_pipe_send(p->npipe, p->aio_send); } static void @@ -203,14 +207,14 @@ pair0_send_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(p->aio_send.a_msg); - p->aio_send.a_msg = NULL; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); return; } - nni_msgq_aio_get(s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->uwq, p->aio_getq); } static void diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index a737402f..d6c8ea75 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -41,7 +41,7 @@ struct pair1_sock { nni_list plist; int started; int poly; - nni_aio aio_getq; + nni_aio * aio_getq; }; // pair1_pipe is our per-pipe protocol private structure. @@ -49,10 +49,10 @@ struct pair1_pipe { nni_pipe * npipe; pair1_sock * psock; nni_msgq * sendq; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; nni_list_node node; }; @@ -61,7 +61,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(&s->aio_getq); + nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); @@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock) NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. - nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s); nni_mtx_init(&s->mtx); - if ((rv = nni_option_register("polyamorous", &poly)) != 0) { + if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) || + ((rv = nni_option_register("polyamorous", &poly)) != 0)) { pair1_sock_fini(s); return (rv); } @@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (0); } +static void +pair1_pipe_fini(void *arg) +{ + pair1_pipe *p = arg; + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + static int pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) { @@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { - NNI_FREE_STRUCT(p); + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + pair1_pipe_fini(p); return (NNG_ENOMEM); } - nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); - nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); - nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p); - nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p); p->npipe = npipe; p->psock = psock; @@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (rv); } -static void -pair1_pipe_fini(void *arg) -{ - pair1_pipe *p = arg; - nni_aio_fini(&p->aio_send); - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - static int pair1_pipe_start(void *arg) { @@ -163,7 +163,7 @@ pair1_pipe_start(void *arg) } } else { if (!s->started) { - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } } nni_list_append(&s->plist, p); @@ -171,16 +171,16 @@ pair1_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); // Schedule a getq. In polyamorous mode we get on the per pipe - // sendq, as the socket distributes to us. In monogamous mode we - // bypass and get from the upper writeq directly (saving a set of - // context switches). + // sendq, as the socket distributes to us. In monogamous mode + // we bypass and get from the upper writeq directly (saving a + // set of context switches). if (s->poly) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); + nni_msgq_aio_get(p->sendq, p->aio_getq); } else { - nni_msgq_aio_get(s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->uwq, p->aio_getq); } // And the pipe read of course. - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } @@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg) nni_mtx_unlock(&s->mtx); nni_msgq_close(p->sendq); - nni_aio_cancel(&p->aio_send, NNG_ECANCELED); - nni_aio_cancel(&p->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&p->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&p->aio_getq, NNG_ECANCELED); + nni_aio_cancel(p->aio_send, NNG_ECANCELED); + nni_aio_cancel(p->aio_recv, NNG_ECANCELED); + nni_aio_cancel(p->aio_putq, NNG_ECANCELED); + nni_aio_cancel(p->aio_getq, NNG_ECANCELED); } static void @@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg) nni_pipe * npipe = p->npipe; int rv; - if (nni_aio_result(&p->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_recv.a_msg; - p->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); // Store the pipe ID. nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); @@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg) // keep getting more. if (hdr > (unsigned) s->ttl) { nni_msg_free(msg); - nni_pipe_recv(npipe, &p->aio_recv); + nni_pipe_recv(npipe, p->aio_recv); return; } // Store the pipe id followed by the hop count. if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) { nni_msg_free(msg); - nni_pipe_recv(npipe, &p->aio_recv); + nni_pipe_recv(npipe, p->aio_recv); return; } // Send the message up. - p->aio_putq.a_msg = msg; - nni_msgq_aio_put(s->urq, &p->aio_putq); + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg) nni_msg * msg; uint32_t id; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_result(s->aio_getq) != 0) { // Socket closing... return; } - msg = s->aio_getq.a_msg; - s->aio_getq.a_msg = NULL; + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); // By definition we are in polyamorous mode. NNI_ASSERT(s->poly); @@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg) // Pipe not present! nni_mtx_unlock(&s->mtx); nni_msg_free(msg); - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); return; } @@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg) } nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void @@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg) { pair1_pipe *p = arg; - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(p->aio_putq.a_msg); - p->aio_putq.a_msg = NULL; + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void @@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg) nni_msg * msg; uint32_t hops; - if (nni_aio_result(&p->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_getq.a_msg; - p->aio_getq.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_getq); + nni_aio_set_msg(p->aio_getq, NULL); // Raw mode messages have the header already formed, with // a hop count. Cooked mode messages have no @@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg) goto badmsg; } - p->aio_send.a_msg = msg; - nni_pipe_send(p->npipe, &p->aio_send); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); return; badmsg: nni_msg_free(msg); - nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); } static void @@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg) pair1_pipe *p = arg; pair1_sock *s = p->psock; - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(p->aio_send.a_msg); - p->aio_send.a_msg = NULL; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); return; } // In polyamorous mode, we want to get from the sendq; in // monogamous we get from upper writeq. - if (s->poly) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); - } else { - nni_msgq_aio_get(s->uwq, &p->aio_getq); - } + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); } static void |
