diff options
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 124 |
1 files changed, 52 insertions, 72 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9e8a3829..6475e43b 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -28,7 +28,7 @@ struct nni_ipc_pipe { uint16_t peer; uint16_t proto; size_t rcvmax; - nng_sockaddr sa; + nni_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -133,7 +133,7 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) static void nni_ipc_cancel_start(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_negaio != aio) { @@ -143,7 +143,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->negaio, rv); + nni_aio_abort(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -167,18 +167,20 @@ nni_ipc_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; - aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_iov iov; + iov.iov_len = pipe->wanttxhead - pipe->gottxhead; + iov.iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_aio_set_iov(aio, 1, &iov); // send it down... nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; - aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_iov iov; + iov.iov_len = pipe->wantrxhead - pipe->gotrxhead; + iov.iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; @@ -229,20 +231,8 @@ nni_ipc_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) != 0) { nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); return; @@ -281,22 +271,9 @@ nni_ipc_pipe_recv_cb(void *arg) } n = nni_aio_count(rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - rxaio->a_niov--; - } - - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) != 0) { + // Was this a partial read? If so then resubmit for the rest. nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -329,17 +306,17 @@ nni_ipc_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { goto recv_error; } if (len != 0) { + nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; - + iov.iov_buf = nni_msg_body(pipe->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -368,7 +345,7 @@ recv_error: static void nni_ipc_cancel_tx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_txaio != aio) { @@ -378,7 +355,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->txaio, rv); + nni_aio_abort(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -390,6 +367,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -404,22 +382,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - txaio = pipe->txaio; - niov = 0; - txaio->a_iov[niov].iov_buf = pipe->txhead; - txaio->a_iov[niov].iov_len = sizeof(pipe->txhead); + txaio = pipe->txaio; + niov = 0; + iov[0].iov_buf = pipe->txhead; + iov[0].iov_len = sizeof(pipe->txhead); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); @@ -428,7 +406,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) static void nni_ipc_cancel_rx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_rxaio != aio) { @@ -438,7 +416,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->rxaio, rv); + nni_aio_abort(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -447,6 +425,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); @@ -459,10 +438,10 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - rxaio = pipe->rxaio; - rxaio->a_iov[0].iov_buf = pipe->rxhead; - rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); - rxaio->a_niov = 1; + rxaio = pipe->rxaio; + iov.iov_buf = pipe->rxhead; + iov.iov_len = sizeof(pipe->rxhead); + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); @@ -474,6 +453,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_ipc_pipe *pipe = arg; int rv; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -483,15 +463,15 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - negaio = pipe->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + iov.iov_len = 8; + iov.iov_buf = &pipe->txhead[0]; + nni_aio_set_iov(negaio, 1, &iov); rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); @@ -638,7 +618,7 @@ nni_ipc_ep_cb(void *arg) static void nni_ipc_cancel_ep(nni_aio *aio, int rv) { - nni_ipc_ep *ep = aio->a_prov_data; + nni_ipc_ep *ep = nni_aio_get_prov_data(aio); NNI_ASSERT(rv != 0); nni_mtx_lock(&ep->mtx); @@ -649,7 +629,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } |
