diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/ipc/ipc.c | 133 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 163 |
2 files changed, 163 insertions, 133 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index b2c382fb..f10f80b4 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -39,9 +39,9 @@ struct nni_ipc_pipe { nni_aio *user_txaio; nni_aio *user_rxaio; nni_aio *user_negaio; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -51,7 +51,7 @@ struct nni_ipc_ep { nni_plat_ipc_ep *iep; uint16_t proto; size_t rcvmax; - nni_aio aio; + nni_aio * aio; nni_aio * user_aio; nni_mtx mtx; }; @@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio_stop(&pipe->rxaio); - nni_aio_stop(&pipe->txaio); - nni_aio_stop(&pipe->negaio); + nni_aio_stop(pipe->rxaio); + nni_aio_stop(pipe->txaio); + nni_aio_stop(pipe->negaio); - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_aio_fini(&pipe->negaio); + nni_aio_fini(pipe->rxaio); + nni_aio_fini(pipe->txaio); + nni_aio_fini(pipe->negaio); if (pipe->ipp != NULL) { nni_plat_ipc_pipe_fini(pipe->ipp); } @@ -115,14 +115,18 @@ static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p); - nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p); - nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p); + if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) { + nni_ipc_pipe_fini(p); + return (rv); + } p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, rv); + nni_aio_cancel(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -154,7 +158,7 @@ static void nni_ipc_pipe_nego_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio * aio = &pipe->negaio; + nni_aio * aio = pipe->negaio; int rv; nni_mtx_lock(&pipe->mtx); @@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg) return; } pipe->user_txaio = NULL; - if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + if ((rv = nni_aio_result(pipe->txaio)) != 0) { len = 0; } else { - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; + nni_msg *msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_msg_free(msg); + nni_aio_set_msg(aio, NULL); } nni_aio_finish(aio, rv, len); nni_mtx_unlock(&pipe->mtx); @@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + if ((rv = nni_aio_result(pipe->rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. @@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg) // message to allocate and how much more to expect. if (pipe->rxmsg == NULL) { uint64_t len; + nni_aio *rxaio; // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { @@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg) // Submit the rest of the data for a read -- we want to // read the entire message now. - pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->txaio, rv); + nni_aio_cancel(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -337,8 +344,9 @@ static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; + nni_aio * txaio; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - pipe->txaio.a_iov[0].iov_buf = pipe->txhead; - pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead); - pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); - pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); - pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); - pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + txaio = pipe->txaio; + txaio->a_iov[0].iov_buf = pipe->txhead; + txaio->a_iov[0].iov_len = sizeof(pipe->txhead); + txaio->a_iov[1].iov_buf = nni_msg_header(msg); + txaio->a_iov[1].iov_len = nni_msg_header_len(msg); + txaio->a_iov[2].iov_buf = nni_msg_body(msg); + txaio->a_iov[2].iov_len = nni_msg_len(msg); + txaio->a_niov = 3; - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); } @@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_cancel(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -386,6 +395,7 @@ static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + nni_aio * rxaio; nni_mtx_lock(&pipe->mtx); @@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; - pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = pipe->rxhead; + rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; int rv; + nni_aio * negaio; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; - pipe->negaio.a_iov[0].iov_len = 8; - pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + negaio->a_niov = 1; + negaio->a_iov[0].iov_len = 8; + negaio->a_iov[0].iov_buf = &pipe->txhead[0]; rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg) { nni_ipc_ep *ep = arg; - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); nni_plat_ipc_ep_fini(ep->iep); - nni_aio_fini(&ep->aio); + nni_aio_fini(ep->aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg) nni_plat_ipc_ep_close(ep->iep); nni_mtx_unlock(&ep->mtx); - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); } static int @@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) int rv; nni_ipc_pipe *pipe = NULL; - if ((rv = nni_aio_result(&ep->aio)) != 0) { + if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(ep->aio.a_pipe != NULL); + NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); done: - ep->aio.a_pipe = NULL; - aio = ep->user_aio; - ep->user_aio = NULL; + nni_aio_set_pipe(ep->aio, NULL); + aio = ep->user_aio; + ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); @@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, rv); + nni_aio_cancel(ep->aio, rv); nni_aio_finish_error(aio, rv); } @@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_plat_ipc_ep_accept(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_plat_ipc_ep_connect(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index f0f07592..759e20f5 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -38,9 +38,9 @@ struct nni_tcp_pipe { size_t gotrxhead; size_t wanttxhead; size_t wantrxhead; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -52,7 +52,7 @@ struct nni_tcp_ep { size_t rcvmax; nni_duration linger; int ipv4only; - nni_aio aio; + nni_aio * aio; nni_aio * user_aio; nni_mtx mtx; }; @@ -98,13 +98,13 @@ nni_tcp_pipe_fini(void *arg) { nni_tcp_pipe *p = arg; - nni_aio_stop(&p->rxaio); - nni_aio_stop(&p->txaio); - nni_aio_stop(&p->negaio); + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); + nni_aio_stop(p->negaio); - nni_aio_fini(&p->rxaio); - nni_aio_fini(&p->txaio); - nni_aio_fini(&p->negaio); + nni_aio_fini(p->rxaio); + nni_aio_fini(p->txaio); + nni_aio_fini(p->negaio); if (p->tpp != NULL) { nni_plat_tcp_pipe_fini(p->tpp); } @@ -119,14 +119,18 @@ static int nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p); - nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p); - nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p); + if (((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) { + nni_tcp_pipe_fini(p); + return (rv); + } p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -150,7 +154,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv) p->user_negaio = NULL; nni_mtx_unlock(&p->mtx); - nni_aio_cancel(&p->negaio, rv); + nni_aio_cancel(p->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -158,7 +162,7 @@ static void nni_tcp_pipe_nego_cb(void *arg) { nni_tcp_pipe *p = arg; - nni_aio * aio = &p->negaio; + nni_aio * aio = p->negaio; int rv; nni_mtx_lock(&p->mtx); @@ -224,12 +228,12 @@ nni_tcp_pipe_send_cb(void *arg) } p->user_txaio = NULL; - if ((rv = nni_aio_result(&p->txaio)) != 0) { + if ((rv = nni_aio_result(p->txaio)) != 0) { len = 0; } else { len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); } nni_aio_finish(aio, 0, len); nni_mtx_unlock(&p->mtx); @@ -251,7 +255,7 @@ nni_tcp_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(&p->rxaio)) != 0) { + if ((rv = nni_aio_result(p->rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. @@ -269,6 +273,7 @@ nni_tcp_pipe_recv_cb(void *arg) // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. if (p->rxmsg == NULL) { + nni_aio *rxaio; uint64_t len; // We should have gotten a message header. NNI_GET64(p->rxlen, len); @@ -291,11 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg) // Submit the rest of the data for a read -- we want to // read the entire message now. - p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg); - p->rxaio.a_niov = 1; + rxaio = p->rxaio; + rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); + rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg); + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio); + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -322,7 +328,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(&p->txaio, rv); + nni_aio_cancel(p->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -330,8 +336,9 @@ static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; + nni_aio * txaio; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -346,15 +353,16 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - p->txaio.a_iov[0].iov_buf = p->txlen; - p->txaio.a_iov[0].iov_len = sizeof(p->txlen); - p->txaio.a_iov[1].iov_buf = nni_msg_header(msg); - p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); - p->txaio.a_iov[2].iov_buf = nni_msg_body(msg); - p->txaio.a_iov[2].iov_len = nni_msg_len(msg); - p->txaio.a_niov = 3; + txaio = p->txaio; + txaio->a_iov[0].iov_buf = p->txlen; + txaio->a_iov[0].iov_len = sizeof(p->txlen); + txaio->a_iov[1].iov_buf = nni_msg_header(msg); + txaio->a_iov[1].iov_len = nni_msg_header_len(msg); + txaio->a_iov[2].iov_buf = nni_msg_body(msg); + txaio->a_iov[2].iov_len = nni_msg_len(msg); + txaio->a_niov = 3; - nni_plat_tcp_pipe_send(p->tpp, &p->txaio); + nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); } @@ -372,7 +380,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(&p->rxaio, rv); + nni_aio_cancel(p->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -380,6 +388,7 @@ static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + nni_aio * rxaio; nni_mtx_lock(&p->mtx); @@ -392,11 +401,12 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the TCP header. - p->rxaio.a_iov[0].iov_buf = p->rxlen; - p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen); - p->rxaio.a_niov = 1; + rxaio = p->rxaio; + rxaio->a_iov[0].iov_buf = p->rxlen; + rxaio->a_iov[0].iov_len = sizeof(p->rxlen); + rxaio->a_niov = 1; - nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio); + nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); } @@ -525,6 +535,7 @@ static void nni_tcp_pipe_start(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + nni_aio * negaio; nni_mtx_lock(&p->mtx); p->txlen[0] = 0; @@ -534,19 +545,20 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&p->txlen[4], p->proto); NNI_PUT16(&p->txlen[6], 0); - p->user_negaio = aio; - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - p->negaio.a_niov = 1; - p->negaio.a_iov[0].iov_len = 8; - p->negaio.a_iov[0].iov_buf = &p->txlen[0]; + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + negaio->a_niov = 1; + negaio->a_iov[0].iov_len = 8; + negaio->a_iov[0].iov_buf = &p->txlen[0]; if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { nni_mtx_unlock(&p->mtx); return; } - nni_plat_tcp_pipe_send(p->tpp, &p->negaio); + nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -555,11 +567,11 @@ nni_tcp_ep_fini(void *arg) { nni_tcp_ep *ep = arg; - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); if (ep->tep != NULL) { nni_plat_tcp_ep_fini(ep->tep); } - nni_aio_fini(&ep->aio); + nni_aio_fini(ep->aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -575,7 +587,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) char * lhost; char * lserv; nni_sockaddr rsa, lsa; - nni_aio aio; + nni_aio * aio; int passive; // Make a copy of the url (to allow for destructive operations) @@ -590,17 +602,19 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } passive = (mode == NNI_EP_MODE_DIAL ? 0 : 1); - nni_aio_init(&aio, NULL, NULL); + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } // XXX: arguably we could defer this part to the point we do a bind // or connect! if ((rhost != NULL) || (rserv != NULL)) { - aio.a_addr = &rsa; - nni_plat_tcp_resolv( - rhost, rserv, NNG_AF_UNSPEC, passive, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { + aio->a_addr = &rsa; + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) != 0) { + nni_aio_fini(aio); return (rv); } } else { @@ -608,16 +622,17 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } if ((lhost != NULL) || (lserv != NULL)) { - aio.a_addr = &lsa; - nni_plat_tcp_resolv( - lhost, lserv, NNG_AF_UNSPEC, passive, &aio); - nni_aio_wait(&aio); - if ((rv = nni_aio_result(&aio)) != 0) { + aio->a_addr = &lsa; + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio); + nni_aio_wait(aio); + if ((rv = nni_aio_result(aio)) != 0) { + nni_aio_fini(aio); return (rv); } } else { lsa.s_un.s_family = NNG_AF_UNSPEC; } + nni_aio_fini(aio); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -633,8 +648,10 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) } nni_mtx_init(&ep->mtx); - nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep); - + if ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) { + nni_tcp_ep_fini(ep); + return (rv); + } ep->proto = nni_sock_proto(sock); *epp = ep; @@ -650,7 +667,7 @@ nni_tcp_ep_close(void *arg) nni_plat_tcp_ep_close(ep->tep); nni_mtx_unlock(&ep->mtx); - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); } static int @@ -673,19 +690,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep) int rv; nni_tcp_pipe *pipe = NULL; - if ((rv = nni_aio_result(&ep->aio)) != 0) { + if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(ep->aio.a_pipe != NULL); + NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe); + rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); done: - ep->aio.a_pipe = NULL; - aio = ep->user_aio; - ep->user_aio = NULL; + nni_aio_set_pipe(ep->aio, NULL); + aio = ep->user_aio; + ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { nni_aio_finish_pipe(aio, pipe); @@ -723,7 +740,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, rv); + nni_aio_cancel(ep->aio, rv); nni_aio_finish_error(aio, rv); } @@ -743,7 +760,7 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, &ep->aio); + nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -764,7 +781,7 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, &ep->aio); + nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } |
