aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ipc/ipc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/ipc/ipc.c')
-rw-r--r--src/transport/ipc/ipc.c124
1 files changed, 52 insertions, 72 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 9e8a3829..6475e43b 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -28,7 +28,7 @@ struct nni_ipc_pipe {
uint16_t peer;
uint16_t proto;
size_t rcvmax;
- nng_sockaddr sa;
+ nni_sockaddr sa;
uint8_t txhead[1 + sizeof(uint64_t)];
uint8_t rxhead[1 + sizeof(uint64_t)];
@@ -133,7 +133,7 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
static void
nni_ipc_cancel_start(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_negaio != aio) {
@@ -143,7 +143,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv)
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->negaio, rv);
+ nni_aio_abort(pipe->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -167,18 +167,20 @@ nni_ipc_pipe_nego_cb(void *arg)
}
if (pipe->gottxhead < pipe->wanttxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
- aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
+ nni_iov iov;
+ iov.iov_len = pipe->wanttxhead - pipe->gottxhead;
+ iov.iov_buf = &pipe->txhead[pipe->gottxhead];
+ nni_aio_set_iov(aio, 1, &iov);
// send it down...
nni_plat_ipc_pipe_send(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
if (pipe->gotrxhead < pipe->wantrxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
- aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_iov iov;
+ iov.iov_len = pipe->wantrxhead - pipe->gotrxhead;
+ iov.iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -229,20 +231,8 @@ nni_ipc_pipe_send_cb(void *arg)
}
n = nni_aio_count(txaio);
- while (n) {
- NNI_ASSERT(txaio->a_niov != 0);
- if (txaio->a_iov[0].iov_len > n) {
- txaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(txaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= txaio->a_iov[0].iov_len;
- for (int i = 0; i < txaio->a_niov; i++) {
- txaio->a_iov[i] = txaio->a_iov[i + 1];
- }
- txaio->a_niov--;
- }
- if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(txaio, n);
+ if (nni_aio_iov_count(txaio) != 0) {
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -281,22 +271,9 @@ nni_ipc_pipe_recv_cb(void *arg)
}
n = nni_aio_count(rxaio);
- while (n) {
- NNI_ASSERT(rxaio->a_niov != 0);
- if (rxaio->a_iov[0].iov_len > n) {
- rxaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(rxaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= rxaio->a_iov[0].iov_len;
- for (int i = 0; i < rxaio->a_niov; i++) {
- rxaio->a_iov[i] = rxaio->a_iov[i + 1];
- }
- rxaio->a_niov--;
- }
-
- // Was this a partial read? If so then resubmit for the rest.
- if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(rxaio, n);
+ if (nni_aio_iov_count(rxaio) != 0) {
+ // Was this a partial read? If so then resubmit for the rest.
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -329,17 +306,17 @@ nni_ipc_pipe_recv_cb(void *arg)
// lock for the read side in the future, so that we allow
// transmits to proceed normally. In practice this is
// unlikely to be much of an issue though.
- if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
if (len != 0) {
+ nni_iov iov;
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- rxaio->a_iov[0].iov_len = (size_t) len;
- rxaio->a_niov = 1;
-
+ iov.iov_buf = nni_msg_body(pipe->rxmsg);
+ iov.iov_len = (size_t) len;
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -368,7 +345,7 @@ recv_error:
static void
nni_ipc_cancel_tx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_txaio != aio) {
@@ -378,7 +355,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->txaio, rv);
+ nni_aio_abort(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -390,6 +367,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
uint64_t len;
nni_aio * txaio;
int niov;
+ nni_iov iov[3];
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -404,22 +382,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- txaio = pipe->txaio;
- niov = 0;
- txaio->a_iov[niov].iov_buf = pipe->txhead;
- txaio->a_iov[niov].iov_len = sizeof(pipe->txhead);
+ txaio = pipe->txaio;
+ niov = 0;
+ iov[0].iov_buf = pipe->txhead;
+ iov[0].iov_len = sizeof(pipe->txhead);
niov++;
if (nni_msg_header_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
- txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ iov[niov].iov_len = nni_msg_header_len(msg);
niov++;
}
if (nni_msg_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
- txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ iov[niov].iov_len = nni_msg_len(msg);
niov++;
}
- txaio->a_niov = niov;
+ nni_aio_set_iov(txaio, niov, iov);
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
@@ -428,7 +406,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
static void
nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_rxaio != aio) {
@@ -438,7 +416,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->rxaio, rv);
+ nni_aio_abort(pipe->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -447,6 +425,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
nni_aio * rxaio;
+ nni_iov iov;
nni_mtx_lock(&pipe->mtx);
@@ -459,10 +438,10 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
- rxaio = pipe->rxaio;
- rxaio->a_iov[0].iov_buf = pipe->rxhead;
- rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead);
- rxaio->a_niov = 1;
+ rxaio = pipe->rxaio;
+ iov.iov_buf = pipe->rxhead;
+ iov.iov_len = sizeof(pipe->rxhead);
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
@@ -474,6 +453,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_ipc_pipe *pipe = arg;
int rv;
nni_aio * negaio;
+ nni_iov iov;
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
@@ -483,15 +463,15 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txhead[4], pipe->proto);
NNI_PUT16(&pipe->txhead[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- negaio = pipe->negaio;
- negaio->a_niov = 1;
- negaio->a_iov[0].iov_len = 8;
- negaio->a_iov[0].iov_buf = &pipe->txhead[0];
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ negaio = pipe->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &pipe->txhead[0];
+ nni_aio_set_iov(negaio, 1, &iov);
rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
if (rv != 0) {
nni_mtx_unlock(&pipe->mtx);
@@ -638,7 +618,7 @@ nni_ipc_ep_cb(void *arg)
static void
nni_ipc_cancel_ep(nni_aio *aio, int rv)
{
- nni_ipc_ep *ep = aio->a_prov_data;
+ nni_ipc_ep *ep = nni_aio_get_prov_data(aio);
NNI_ASSERT(rv != 0);
nni_mtx_lock(&ep->mtx);
@@ -649,7 +629,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(ep->aio, rv);
+ nni_aio_abort(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}