diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/pair | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/pair')
| -rw-r--r-- | src/protocol/pair/pair_v0.c | 90 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v1.c | 130 |
2 files changed, 110 insertions, 110 deletions
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index ef420051..486ce43b 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -43,10 +43,10 @@ struct pair0_sock { struct pair0_pipe { nni_pipe * npipe; pair0_sock *psock; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; }; static int @@ -76,18 +76,34 @@ pair0_sock_fini(void *arg) NNI_FREE_STRUCT(s); } +static void +pair0_pipe_fini(void *arg) +{ + pair0_pipe *p = arg; + + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + NNI_FREE_STRUCT(p); +} + static int pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) { pair0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_aio_init(&p->aio_send, pair0_send_cb, p); - nni_aio_init(&p->aio_recv, pair0_recv_cb, p); - nni_aio_init(&p->aio_getq, pair0_getq_cb, p); - nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { + pair0_pipe_fini(p); + return (rv); + } p->npipe = npipe; p->psock = psock; @@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (0); } -static void -pair0_pipe_fini(void *arg) -{ - pair0_pipe *p = arg; - - nni_aio_fini(&p->aio_send); - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); - NNI_FREE_STRUCT(p); -} - static int pair0_pipe_start(void *arg) { @@ -123,8 +127,8 @@ pair0_pipe_start(void *arg) // Schedule a getq on the upper, and a read from the pipe. // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, &p->aio_getq); - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } @@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - nni_aio_cancel(&p->aio_send, NNG_ECANCELED); - nni_aio_cancel(&p->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&p->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&p->aio_getq, NNG_ECANCELED); + nni_aio_cancel(p->aio_send, NNG_ECANCELED); + nni_aio_cancel(p->aio_recv, NNG_ECANCELED); + nni_aio_cancel(p->aio_putq, NNG_ECANCELED); + nni_aio_cancel(p->aio_getq, NNG_ECANCELED); nni_mtx_lock(&s->mtx); if (s->ppipe == p) { @@ -154,17 +158,17 @@ pair0_recv_cb(void *arg) pair0_sock *s = p->psock; nni_msg * msg; - if (nni_aio_result(&p->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_recv.a_msg; - p->aio_putq.a_msg = msg; - p->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_putq, msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, &p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -172,13 +176,13 @@ pair0_putq_cb(void *arg) { pair0_pipe *p = arg; - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(p->aio_putq.a_msg); - p->aio_putq.a_msg = NULL; + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void @@ -187,14 +191,14 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); return; } - p->aio_send.a_msg = p->aio_getq.a_msg; - p->aio_getq.a_msg = NULL; - nni_pipe_send(p->npipe, &p->aio_send); + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + nni_pipe_send(p->npipe, p->aio_send); } static void @@ -203,14 +207,14 @@ pair0_send_cb(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(p->aio_send.a_msg); - p->aio_send.a_msg = NULL; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); return; } - nni_msgq_aio_get(s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->uwq, p->aio_getq); } static void diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index a737402f..d6c8ea75 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -41,7 +41,7 @@ struct pair1_sock { nni_list plist; int started; int poly; - nni_aio aio_getq; + nni_aio * aio_getq; }; // pair1_pipe is our per-pipe protocol private structure. @@ -49,10 +49,10 @@ struct pair1_pipe { nni_pipe * npipe; pair1_sock * psock; nni_msgq * sendq; - nni_aio aio_send; - nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_getq; + nni_aio * aio_putq; nni_list_node node; }; @@ -61,7 +61,7 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(&s->aio_getq); + nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); @@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock) NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. - nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s); nni_mtx_init(&s->mtx); - if ((rv = nni_option_register("polyamorous", &poly)) != 0) { + if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) || + ((rv = nni_option_register("polyamorous", &poly)) != 0)) { pair1_sock_fini(s); return (rv); } @@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (0); } +static void +pair1_pipe_fini(void *arg) +{ + pair1_pipe *p = arg; + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + static int pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) { @@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { - NNI_FREE_STRUCT(p); + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { + pair1_pipe_fini(p); return (NNG_ENOMEM); } - nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); - nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); - nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p); - nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p); p->npipe = npipe; p->psock = psock; @@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (rv); } -static void -pair1_pipe_fini(void *arg) -{ - pair1_pipe *p = arg; - nni_aio_fini(&p->aio_send); - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); - nni_msgq_fini(p->sendq); - NNI_FREE_STRUCT(p); -} - static int pair1_pipe_start(void *arg) { @@ -163,7 +163,7 @@ pair1_pipe_start(void *arg) } } else { if (!s->started) { - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } } nni_list_append(&s->plist, p); @@ -171,16 +171,16 @@ pair1_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); // Schedule a getq. In polyamorous mode we get on the per pipe - // sendq, as the socket distributes to us. In monogamous mode we - // bypass and get from the upper writeq directly (saving a set of - // context switches). + // sendq, as the socket distributes to us. In monogamous mode + // we bypass and get from the upper writeq directly (saving a + // set of context switches). if (s->poly) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); + nni_msgq_aio_get(p->sendq, p->aio_getq); } else { - nni_msgq_aio_get(s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->uwq, p->aio_getq); } // And the pipe read of course. - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } @@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg) nni_mtx_unlock(&s->mtx); nni_msgq_close(p->sendq); - nni_aio_cancel(&p->aio_send, NNG_ECANCELED); - nni_aio_cancel(&p->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&p->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&p->aio_getq, NNG_ECANCELED); + nni_aio_cancel(p->aio_send, NNG_ECANCELED); + nni_aio_cancel(p->aio_recv, NNG_ECANCELED); + nni_aio_cancel(p->aio_putq, NNG_ECANCELED); + nni_aio_cancel(p->aio_getq, NNG_ECANCELED); } static void @@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg) nni_pipe * npipe = p->npipe; int rv; - if (nni_aio_result(&p->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_recv.a_msg; - p->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); // Store the pipe ID. nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); @@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg) // keep getting more. if (hdr > (unsigned) s->ttl) { nni_msg_free(msg); - nni_pipe_recv(npipe, &p->aio_recv); + nni_pipe_recv(npipe, p->aio_recv); return; } // Store the pipe id followed by the hop count. if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) { nni_msg_free(msg); - nni_pipe_recv(npipe, &p->aio_recv); + nni_pipe_recv(npipe, p->aio_recv); return; } // Send the message up. - p->aio_putq.a_msg = msg; - nni_msgq_aio_put(s->urq, &p->aio_putq); + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg) nni_msg * msg; uint32_t id; - if (nni_aio_result(&s->aio_getq) != 0) { + if (nni_aio_result(s->aio_getq) != 0) { // Socket closing... return; } - msg = s->aio_getq.a_msg; - s->aio_getq.a_msg = NULL; + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); // By definition we are in polyamorous mode. NNI_ASSERT(s->poly); @@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg) // Pipe not present! nni_mtx_unlock(&s->mtx); nni_msg_free(msg); - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); return; } @@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg) } nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, &s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void @@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg) { pair1_pipe *p = arg; - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(p->aio_putq.a_msg); - p->aio_putq.a_msg = NULL; + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void @@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg) nni_msg * msg; uint32_t hops; - if (nni_aio_result(&p->aio_getq) != 0) { + if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); return; } - msg = p->aio_getq.a_msg; - p->aio_getq.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_getq); + nni_aio_set_msg(p->aio_getq, NULL); // Raw mode messages have the header already formed, with // a hop count. Cooked mode messages have no @@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg) goto badmsg; } - p->aio_send.a_msg = msg; - nni_pipe_send(p->npipe, &p->aio_send); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); return; badmsg: nni_msg_free(msg); - nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq); + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); } static void @@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg) pair1_pipe *p = arg; pair1_sock *s = p->psock; - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(p->aio_send.a_msg); - p->aio_send.a_msg = NULL; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->npipe); return; } // In polyamorous mode, we want to get from the sendq; in // monogamous we get from upper writeq. - if (s->poly) { - nni_msgq_aio_get(p->sendq, &p->aio_getq); - } else { - nni_msgq_aio_get(s->uwq, &p->aio_getq); - } + nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); } static void |
