diff options
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 133 |
1 files changed, 73 insertions, 60 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index b2c382fb..f10f80b4 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -39,9 +39,9 @@ struct nni_ipc_pipe { nni_aio *user_txaio; nni_aio *user_rxaio; nni_aio *user_negaio; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -51,7 +51,7 @@ struct nni_ipc_ep { nni_plat_ipc_ep *iep; uint16_t proto; size_t rcvmax; - nni_aio aio; + nni_aio * aio; nni_aio * user_aio; nni_mtx mtx; }; @@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio_stop(&pipe->rxaio); - nni_aio_stop(&pipe->txaio); - nni_aio_stop(&pipe->negaio); + nni_aio_stop(pipe->rxaio); + nni_aio_stop(pipe->txaio); + nni_aio_stop(pipe->negaio); - nni_aio_fini(&pipe->rxaio); - nni_aio_fini(&pipe->txaio); - nni_aio_fini(&pipe->negaio); + nni_aio_fini(pipe->rxaio); + nni_aio_fini(pipe->txaio); + nni_aio_fini(pipe->negaio); if (pipe->ipp != NULL) { nni_plat_ipc_pipe_fini(pipe->ipp); } @@ -115,14 +115,18 @@ static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p); - nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p); - nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p); + if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) { + nni_ipc_pipe_fini(p); + return (rv); + } p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->negaio, rv); + nni_aio_cancel(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -154,7 +158,7 @@ static void nni_ipc_pipe_nego_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio * aio = &pipe->negaio; + nni_aio * aio = pipe->negaio; int rv; nni_mtx_lock(&pipe->mtx); @@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg) return; } pipe->user_txaio = NULL; - if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + if ((rv = nni_aio_result(pipe->txaio)) != 0) { len = 0; } else { - len = nni_msg_len(aio->a_msg); - nni_msg_free(aio->a_msg); - aio->a_msg = NULL; + nni_msg *msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_msg_free(msg); + nni_aio_set_msg(aio, NULL); } nni_aio_finish(aio, rv, len); nni_mtx_unlock(&pipe->mtx); @@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + if ((rv = nni_aio_result(pipe->rxaio)) != 0) { // Error on receive. This has to cause an error back // to the user. Also, if we had allocated an rxmsg, lets // toss it. @@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg) // message to allocate and how much more to expect. if (pipe->rxmsg == NULL) { uint64_t len; + nni_aio *rxaio; // Check to make sure we got msg type 1. if (pipe->rxhead[0] != 1) { @@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg) // Submit the rest of the data for a read -- we want to // read the entire message now. - pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->txaio, rv); + nni_aio_cancel(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -337,8 +344,9 @@ static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); uint64_t len; + nni_aio * txaio; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - pipe->txaio.a_iov[0].iov_buf = pipe->txhead; - pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead); - pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); - pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); - pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); - pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + txaio = pipe->txaio; + txaio->a_iov[0].iov_buf = pipe->txhead; + txaio->a_iov[0].iov_len = sizeof(pipe->txhead); + txaio->a_iov[1].iov_buf = nni_msg_header(msg); + txaio->a_iov[1].iov_len = nni_msg_header_len(msg); + txaio->a_iov[2].iov_buf = nni_msg_body(msg); + txaio->a_iov[2].iov_len = nni_msg_len(msg); + txaio->a_niov = 3; - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); + nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); } @@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(&pipe->rxaio, rv); + nni_aio_cancel(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -386,6 +395,7 @@ static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + nni_aio * rxaio; nni_mtx_lock(&pipe->mtx); @@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; - pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead); - pipe->rxaio.a_niov = 1; + rxaio = pipe->rxaio; + rxaio->a_iov[0].iov_buf = pipe->rxhead; + rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); + rxaio->a_niov = 1; - nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); + nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; int rv; + nni_aio * negaio; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; - pipe->negaio.a_iov[0].iov_len = 8; - pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + negaio->a_niov = 1; + negaio->a_iov[0].iov_len = 8; + negaio->a_iov[0].iov_buf = &pipe->txhead[0]; rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio); + nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg) { nni_ipc_ep *ep = arg; - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); nni_plat_ipc_ep_fini(ep->iep); - nni_aio_fini(&ep->aio); + nni_aio_fini(ep->aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg) nni_plat_ipc_ep_close(ep->iep); nni_mtx_unlock(&ep->mtx); - nni_aio_stop(&ep->aio); + nni_aio_stop(ep->aio); } static int @@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) int rv; nni_ipc_pipe *pipe = NULL; - if ((rv = nni_aio_result(&ep->aio)) != 0) { + if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(ep->aio.a_pipe != NULL); + NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); done: - ep->aio.a_pipe = NULL; - aio = ep->user_aio; - ep->user_aio = NULL; + nni_aio_set_pipe(ep->aio, NULL); + aio = ep->user_aio; + ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); @@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(&ep->aio, rv); + nni_aio_cancel(ep->aio, rv); nni_aio_finish_error(aio, rv); } @@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_accept(ep->iep, &ep->aio); + nni_plat_ipc_ep_accept(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) ep->user_aio = aio; - nni_plat_ipc_ep_connect(ep->iep, &ep->aio); + nni_plat_ipc_ep_connect(ep->iep, ep->aio); nni_mtx_unlock(&ep->mtx); } |
