diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/transport | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/ipc/ipc.c | 133 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 163 |
2 files changed, 163 insertions, 133 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index b2c382fb..f10f80b4 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -39,9 +39,9 @@ struct nni_ipc_pipe { nni_aio *user_txaio; nni_aio *user_rxaio; nni_aio *user_negaio; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -51,7 +51,7 @@ struct nni_ipc_ep { nni_plat_ipc_ep *iep; uint16_t proto; size_t rcvmax; - nni_aio aio; + nni_aio * aio; nni_aio * user_aio; nni_mtx mtx; }; @@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio_stop(&pipe->rxaio); - nni_aio_stop(&pipe->txaio); - nni_aio_stop(&pipe->negaio); + nni_aio_stop(pipe->rxaio); + nni_aio_stop(pipe->txaio); + nni_aio_stop(pipe->negaio); - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_aio_fini(&pipe->negaio); + nni_aio_fini(pipe->rxaio); + nni_aio_fini(pipe->txaio); + nni_aio_fini(pipe->negaio); if (pipe->ipp != NULL) { nni_plat_ipc_pipe_fini(pipe->ipp); } @@ -115,14 +115,18 @@ static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p); - nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p); - nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p); + if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) { + nni_ipc_pipe_fini(p); + return (rv); + } p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, rv); + nni_aio_cancel(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -154,7 +158,7 @@ static void nni_ipc_pipe_nego_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio * aio = &pipe->negaio; + nni_aio * aio = pipe->negaio; int rv; nni_mtx_lock(&pipe->mtx); @@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg) return; } pipe->user_txaio = NULL; - if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + if ((rv = nni_aio_result(pipe->txaio)) != 0) { len = 0; } else { - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; + nni_msg *msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_msg_free(msg); + nni_aio_set_msg(aio, NULL); } nni_aio_finish(aio, rv, len); nni_mtx_unlock(&pipe->mtx); @@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + if ((rv = nni_aio_result(pipe->rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. @@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg) // message to allocate and how much more to expect. if (pipe->rxmsg == NULL) { uint64_t len; + nni_aio *rxaio; // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { @@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg) // Submit the rest of the data for a read -- we want to // read the entire message now. - pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->txaio, rv); + nni_aio_cancel(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -337,8 +344,9 @@ static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; + nni_aio * txaio; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - pipe->txaio.a_iov[0].iov_buf = pipe->txhead; - pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead); - pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); - pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); - pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); - pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + txaio = pipe->txaio; + txaio->a_iov[0].iov_buf = pipe->txhead; + txaio->a_iov[0].iov_len = sizeof(pipe->txhead); + txaio->a_iov[1].iov_buf = nni_msg_header(msg); + txaio->a_iov[1].iov_len = nni_msg_header_len(msg); + txaio->a_iov[2].iov_buf = nni_msg_body(msg); + txaio->a_iov[2].iov_len = nni_msg_len(msg); + txaio->a_niov = 3; - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); } @@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_cancel(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -386,6 +395,7 @@ static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + nni_aio * rxaio; nni_mtx_lock(&pipe->mtx); @@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; - pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = pipe->rxhead; + rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; int rv; + nni_aio * negaio; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; - pipe->negaio.a_iov[0].iov_len = 8; - pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + negaio->a_niov = 1; + negaio->a_iov[0].iov_len = 8; + negaio->a_iov[0].iov_buf = &pipe->txhead[0]; rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg) { nni_ipc_ep *ep = arg; - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); nni_plat_ipc_ep_fini(ep->iep); - nni_aio_fini(&ep->aio); + nni_aio_fini(ep->aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg) nni_plat_ipc_ep_close(ep->iep); nni_mtx_unlock(&ep->mtx); - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); } static int @@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) int rv; nni_ipc_pipe *pipe = NULL; - if ((rv = nni_aio_result(&ep->aio)) != 0) { + if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(ep->aio.a_pipe != NULL); + NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); done: - ep->aio.a_pipe = NULL; - aio = ep->user_aio; - ep->user_aio = NULL; + nni_aio_set_pipe(ep->aio, NULL); + aio = ep->user_aio; + ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); @@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, rv); + nni_aio_cancel(ep->aio, rv); nni_aio_finish_error(aio, rv); } @@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_plat_ipc_ep_accept(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_plat_ipc_ep_connect(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index f0f07592..759e20f5 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -38,9 +38,9 @@ struct nni_tcp_pipe { size_t gotrxhead; size_t wanttxhead; size_t wantrxhead; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -52,7 +52,7 @@ struct nni_tcp_ep { size_t rcvmax; nni_duration linger; int ipv4only; - nni_aio aio; + nni_aio * aio; nni_aio * user_aio; nni_mtx mtx; }; @@ -98,13 +98,13 @@ nni_tcp_pipe_fini(void *arg) { nni_tcp_pipe *p = arg; - nni_aio_stop(&p->rxaio); - nni_aio_stop(&p->txaio); - nni_aio_stop(&p->negaio); + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); + nni_aio_stop(p->negaio); - nni_aio_fini(&p->rxaio); - nni_aio_fini(&p->txaio); - nni_aio_fini(&p->negaio); + nni_aio_fini(p->rxaio); + nni_aio_fini(p->txaio); + nni_aio_fini(p->negaio); if (p->tpp != NULL) { nni_plat_tcp_pipe_fini(p->tpp); } @@ -119,14 +119,18 @@ static int nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p); - nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p); - nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p); + if (((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) { + nni_tcp_pipe_fini(p); + return (rv); + } p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -150,7 +154,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv) p->user_negaio = NULL; nni_mtx_unlock(&p->mtx); - nni_aio_cancel(&p->negaio, rv); + nni_aio_cancel(p->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -158,7 +162,7 @@ static void nni_tcp_pipe_nego_cb(void *arg) { nni_tcp_pipe *p = arg; - nni_aio * aio = &p->negaio; + nni_aio * aio = p->negaio; int rv; nni_mtx_lock(&p->mtx); @@ -224,12 +228,12 @@ nni_tcp_pipe_send_cb(void *arg) } p->user_txaio = NULL; - if ((rv = nni_aio_result(&p->txaio)) != 0) { + if ((rv = nni_aio_result(p->txaio)) != 0) { len = 0; } else { len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); } nni_aio_finish(aio, 0, len); nni_mtx_unlock(&p->mtx); @@ -251,7 +255,7 @@ nni_tcp_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(&p->rxaio)) != 0) { + if ((rv = nni_aio_result(p->rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. @@ -269,6 +273,7 @@ nni_tcp_pipe_recv_cb(void *arg) // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (p->rxmsg == NULL) { + nni_aio *rxaio; uint64_t len; // We should have gotten a message header. NNI_GET64(p->rxlen, len); @@ -291,11 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg) // Submit the rest of the data for a read -- we want to // read the entire message now. - p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg); - p->rxaio.a_niov = 1; + rxaio = p->rxaio; + rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); + rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg); + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio); + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -322,7 +328,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(&p->txaio, rv); + nni_aio_cancel(p->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -330,8 +336,9 @@ static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; + nni_aio * txaio; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -346,15 +353,16 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - p->txaio.a_iov[0].iov_buf = p->txlen; - p->txaio.a_iov[0].iov_len = sizeof(p->txlen); - p->txaio.a_iov[1].iov_buf = nni_msg_header(msg); - p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); - p->txaio.a_iov[2].iov_buf = nni_msg_body(msg); - p->txaio.a_iov[2].iov_len = nni_msg_len(msg); - p->txaio.a_niov = 3; + txaio = p->txaio; + txaio->a_iov[0].iov_buf = p->txlen; + txaio->a_iov[0].iov_len = sizeof(p->txlen); + txaio->a_iov[1].iov_buf = nni_msg_header(msg); + txaio->a_iov[1].iov_len = nni_msg_header_len(msg); + txaio->a_iov[2].iov_buf = nni_msg_body(msg); + txaio->a_iov[2].iov_len = nni_msg_len(msg); + txaio->a_niov = 3; - nni_plat_tcp_pipe_send(p->tpp, &p->txaio); + nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); } @@ -372,7 +380,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(&p->rxaio, rv); + nni_aio_cancel(p->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -380,6 +388,7 @@ static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + nni_aio * rxaio; nni_mtx_lock(&p->mtx); @@ -392,11 +401,12 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the TCP header. - p->rxaio.a_iov[0].iov_buf = p->rxlen; - p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen); - p->rxaio.a_niov = 1; + rxaio = p->rxaio; + rxaio->a_iov[0].iov_buf = p->rxlen; + rxaio->a_iov[0].iov_len = sizeof(p->rxlen); + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio); + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); } @@ -525,6 +535,7 @@ static void nni_tcp_pipe_start(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + nni_aio * negaio; nni_mtx_lock(&p->mtx); p->txlen[0] = 0; @@ -534,19 +545,20 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&p->txlen[4], p->proto); NNI_PUT16(&p->txlen[6], 0); - p->user_negaio = aio; - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - p->negaio.a_niov = 1; - p->negaio.a_iov[0].iov_len = 8; - p->negaio.a_iov[0].iov_buf = &p->txlen[0]; + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + negaio->a_niov = 1; + negaio->a_iov[0].iov_len = 8; + negaio->a_iov[0].iov_buf = &p->txlen[0]; if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { nni_mtx_unlock(&p->mtx); return; } - nni_plat_tcp_pipe_send(p->tpp, &p->negaio); + nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -555,11 +567,11 @@ nni_tcp_ep_fini(void *arg) { nni_tcp_ep *ep = arg; - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); if (ep->tep != NULL) { nni_plat_tcp_ep_fini(ep->tep); } - nni_aio_fini(&ep->aio); + nni_aio_fini(ep->aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -575,7 +587,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) char * lhost; char * lserv; nni_sockaddr rsa, lsa; - nni_aio aio; + nni_aio * aio; int passive; // Make a copy of the url (to allow for destructive operations) @@ -590,17 +602,19 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } passive = (mode == NNI_EP_MODE_DIAL ? 0 : 1); - nni_aio_init(&aio, NULL, NULL); + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } // XXX: arguably we could defer this part to the point we do a bind // or connect! if ((rhost != NULL) || (rserv != NULL)) { - aio.a_addr = &rsa; - nni_plat_tcp_resolv( - rhost, rserv, NNG_AF_UNSPEC, passive, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { + aio->a_addr = &rsa; + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) != 0) { + nni_aio_fini(aio); return (rv); } } else { @@ -608,16 +622,17 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } if ((lhost != NULL) || (lserv != NULL)) { - aio.a_addr = &lsa; - nni_plat_tcp_resolv( - lhost, lserv, NNG_AF_UNSPEC, passive, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { + aio->a_addr = &lsa; + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) != 0) { + nni_aio_fini(aio); return (rv); } } else { lsa.s_un.s_family = NNG_AF_UNSPEC; } + nni_aio_fini(aio); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -633,8 +648,10 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } nni_mtx_init(&ep->mtx); - nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep); - + if ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) { + nni_tcp_ep_fini(ep); + return (rv); + } ep->proto = nni_sock_proto(sock); *epp = ep; @@ -650,7 +667,7 @@ nni_tcp_ep_close(void *arg) nni_plat_tcp_ep_close(ep->tep); nni_mtx_unlock(&ep->mtx); - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); } static int @@ -673,19 +690,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep) int rv; nni_tcp_pipe *pipe = NULL; - if ((rv = nni_aio_result(&ep->aio)) != 0) { + if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(ep->aio.a_pipe != NULL); + NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe); + rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); done: - ep->aio.a_pipe = NULL; - aio = ep->user_aio; - ep->user_aio = NULL; + nni_aio_set_pipe(ep->aio, NULL); + aio = ep->user_aio; + ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { nni_aio_finish_pipe(aio, pipe); @@ -723,7 +740,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, rv); + nni_aio_cancel(ep->aio, rv); nni_aio_finish_error(aio, rv); } @@ -743,7 +760,7 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, &ep->aio); + nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -764,7 +781,7 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, &ep->aio); + nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } |
