diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 41 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 124 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 137 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 135 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 16 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 65 |
6 files changed, 234 insertions, 284 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 4cd1c97d..6329a627 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -143,7 +143,7 @@ static void nni_inproc_pipe_send(void *arg, nni_aio *aio) { nni_inproc_pipe *pipe = arg; - nni_msg * msg = aio->a_msg; + nni_msg * msg = nni_aio_get_msg(aio); char * h; size_t l; int rv; @@ -153,7 +153,7 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio) h = nni_msg_header(msg); l = nni_msg_header_len(msg); if ((rv = nni_msg_insert(msg, h, l)) != 0) { - nni_aio_finish(aio, rv, aio->a_count); + nni_aio_finish(aio, rv, nni_aio_count(aio)); return; } nni_msg_header_chop(msg, l); @@ -219,12 +219,12 @@ nni_inproc_ep_fini(void *arg) static void nni_inproc_conn_finish(nni_aio *aio, int rv) { - nni_inproc_ep *ep = aio->a_prov_extra[0]; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); void * pipe; nni_aio_list_remove(aio); - pipe = aio->a_pipe; - aio->a_pipe = NULL; + pipe = nni_aio_get_pipe(aio); + nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -298,8 +298,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server) nni_mtx_init(&pair->mx); - pair->pipes[0] = caio->a_pipe; - pair->pipes[1] = saio->a_pipe; + pair->pipes[0] = nni_aio_get_pipe(caio); + pair->pipes[1] = nni_aio_get_pipe(saio); pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; pair->pipes[0]->pair = pair->pipes[1]->pair = pair; @@ -324,15 +324,15 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = aio->a_prov_data; + nni_inproc_ep * ep = nni_aio_get_prov_data(aio); nni_inproc_pipe *pipe; nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = aio->a_pipe) != NULL) { - aio->a_pipe = NULL; + if ((pipe = nni_aio_get_pipe(aio)) != NULL) { + nni_aio_set_pipe(aio, NULL); nni_inproc_pipe_fini(pipe); } nni_aio_finish_error(aio, rv); @@ -343,9 +343,10 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; - nni_inproc_ep *server; - int rv; + nni_inproc_ep * ep = arg; + nni_inproc_ep * server; + int rv; + nni_inproc_pipe *pipe; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -358,12 +359,12 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - aio->a_prov_extra[0] = ep; - if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { + if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } + nni_aio_set_pipe(aio, pipe); // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { @@ -406,8 +407,9 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; - int rv; + nni_inproc_ep * ep = arg; + nni_inproc_pipe *pipe; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -416,15 +418,14 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) return; } - aio->a_prov_extra[0] = ep; - // We are already on the master list of servers, thanks to bind. - if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) { + if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish_error(aio, rv); nni_mtx_unlock(&nni_inproc.mx); return; } + nni_aio_set_pipe(aio, pipe); // Insert us into the pending server aios, and then run the // accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9e8a3829..6475e43b 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -28,7 +28,7 @@ struct nni_ipc_pipe { uint16_t peer; uint16_t proto; size_t rcvmax; - nng_sockaddr sa; + nni_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -133,7 +133,7 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) static void nni_ipc_cancel_start(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_negaio != aio) { @@ -143,7 +143,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->negaio, rv); + nni_aio_abort(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -167,18 +167,20 @@ nni_ipc_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; - aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_iov iov; + iov.iov_len = pipe->wanttxhead - pipe->gottxhead; + iov.iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_aio_set_iov(aio, 1, &iov); // send it down... nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; - aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_iov iov; + iov.iov_len = pipe->wantrxhead - pipe->gotrxhead; + iov.iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; @@ -229,20 +231,8 @@ nni_ipc_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) != 0) { nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); return; @@ -281,22 +271,9 @@ nni_ipc_pipe_recv_cb(void *arg) } n = nni_aio_count(rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - rxaio->a_niov--; - } - - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) != 0) { + // Was this a partial read? If so then resubmit for the rest. nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -329,17 +306,17 @@ nni_ipc_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { goto recv_error; } if (len != 0) { + nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; - + iov.iov_buf = nni_msg_body(pipe->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -368,7 +345,7 @@ recv_error: static void nni_ipc_cancel_tx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_txaio != aio) { @@ -378,7 +355,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->txaio, rv); + nni_aio_abort(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -390,6 +367,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -404,22 +382,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - txaio = pipe->txaio; - niov = 0; - txaio->a_iov[niov].iov_buf = pipe->txhead; - txaio->a_iov[niov].iov_len = sizeof(pipe->txhead); + txaio = pipe->txaio; + niov = 0; + iov[0].iov_buf = pipe->txhead; + iov[0].iov_len = sizeof(pipe->txhead); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); @@ -428,7 +406,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) static void nni_ipc_cancel_rx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_rxaio != aio) { @@ -438,7 +416,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->rxaio, rv); + nni_aio_abort(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -447,6 +425,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); @@ -459,10 +438,10 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - rxaio = pipe->rxaio; - rxaio->a_iov[0].iov_buf = pipe->rxhead; - rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); - rxaio->a_niov = 1; + rxaio = pipe->rxaio; + iov.iov_buf = pipe->rxhead; + iov.iov_len = sizeof(pipe->rxhead); + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); @@ -474,6 +453,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_ipc_pipe *pipe = arg; int rv; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -483,15 +463,15 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - negaio = pipe->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + iov.iov_len = 8; + iov.iov_buf = &pipe->txhead[0]; + nni_aio_set_iov(negaio, 1, &iov); rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); @@ -638,7 +618,7 @@ nni_ipc_ep_cb(void *arg) static void nni_ipc_cancel_ep(nni_aio *aio, int rv) { - nni_ipc_ep *ep = aio->a_prov_data; + nni_ipc_ep *ep = nni_aio_get_prov_data(aio); NNI_ASSERT(rv != 0); nni_mtx_lock(&ep->mtx); @@ -649,7 +629,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 28ec9438..1504d0ee 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -130,7 +130,7 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) static void nni_tcp_cancel_nego(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -140,7 +140,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv) p->user_negaio = NULL; nni_mtx_unlock(&p->mtx); - nni_aio_cancel(p->negaio, rv); + nni_aio_abort(p->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -164,18 +164,20 @@ nni_tcp_pipe_nego_cb(void *arg) } if (p->gottxhead < p->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead; - aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead]; + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; // send it down... + nni_aio_set_iov(aio, 1, &iov); nni_plat_tcp_pipe_send(p->tpp, aio); nni_mtx_unlock(&p->mtx); return; } if (p->gotrxhead < p->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead; - aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead]; + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, aio); nni_mtx_unlock(&p->mtx); return; @@ -206,7 +208,7 @@ nni_tcp_pipe_send_cb(void *arg) int rv; nni_aio * aio; size_t n; - nng_msg * msg; + nni_msg * msg; nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); @@ -226,20 +228,8 @@ nni_tcp_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); return; @@ -261,7 +251,7 @@ nni_tcp_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio = p->rxaio; + nni_aio * rxaio; nni_mtx_lock(&p->mtx); @@ -271,26 +261,15 @@ nni_tcp_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(p->rxaio)) != 0) { + rxaio = p->rxaio; + + if ((rv = nni_aio_result(rxaio)) != 0) { goto recv_error; } - n = nni_aio_count(p->rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - rxaio->a_niov--; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - } - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; @@ -311,17 +290,18 @@ nni_tcp_pipe_recv_cb(void *arg) goto recv_error; } - if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { goto recv_error; } // Submit the rest of the data for a read -- we want to // read the entire message now. if (len != 0) { - rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; @@ -348,7 +328,7 @@ recv_error: static void nni_tcp_cancel_tx(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { @@ -359,7 +339,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->txaio, rv); + nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -371,6 +351,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -385,22 +366,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - niov = 0; - txaio = p->txaio; - txaio->a_iov[niov].iov_buf = p->txlen; - txaio->a_iov[niov].iov_len = sizeof(p->txlen); + niov = 0; + txaio = p->txaio; + iov[niov].iov_buf = p->txlen; + iov[niov].iov_len = sizeof(p->txlen); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); @@ -409,7 +390,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) static void nni_tcp_cancel_rx(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { @@ -420,7 +401,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->rxaio, rv); + nni_aio_abort(p->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -429,6 +410,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&p->mtx); @@ -441,10 +423,10 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the TCP header. - rxaio = p->rxaio; - rxaio->a_iov[0].iov_buf = p->rxlen; - rxaio->a_iov[0].iov_len = sizeof(p->rxlen); - rxaio->a_niov = 1; + rxaio = p->rxaio; + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); @@ -463,7 +445,7 @@ nni_tcp_pipe_getopt_locaddr(void *arg, void *v, size_t *szp) { nni_tcp_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) { @@ -477,7 +459,7 @@ nni_tcp_pipe_getopt_remaddr(void *arg, void *v, size_t *szp) { nni_tcp_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) { @@ -492,6 +474,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&p->mtx); p->txlen[0] = 0; @@ -501,15 +484,15 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&p->txlen[4], p->proto); NNI_PUT16(&p->txlen[6], 0); - p->user_negaio = aio; - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - negaio = p->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &p->txlen[0]; + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(negaio, 1, &iov); if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { nni_mtx_unlock(&p->mtx); return; @@ -572,7 +555,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) if (mode == NNI_EP_MODE_DIAL) { passive = 0; lsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &rsa; + nni_aio_set_input(aio, 0, &rsa); if ((host == NULL) || (serv == NULL)) { nni_aio_fini(aio); return (NNG_EADDRINVAL); @@ -580,7 +563,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } else { passive = 1; rsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &lsa; + nni_aio_set_input(aio, 0, &lsa); } nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, passive, aio); @@ -685,7 +668,7 @@ nni_tcp_ep_cb(void *arg) static void nni_tcp_cancel_ep(nni_aio *aio, int rv) { - nni_tcp_ep *ep = aio->a_prov_data; + nni_tcp_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&ep->mtx); if (ep->user_aio != aio) { @@ -695,7 +678,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index f6c5bc6e..09c59582 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -138,7 +138,7 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp) static void nni_tls_cancel_nego(nni_aio *aio, int rv) { - nni_tls_pipe *p = aio->a_prov_data; + nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -148,7 +148,7 @@ nni_tls_cancel_nego(nni_aio *aio, int rv) p->user_negaio = NULL; nni_mtx_unlock(&p->mtx); - nni_aio_cancel(p->negaio, rv); + nni_aio_abort(p->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -172,18 +172,20 @@ nni_tls_pipe_nego_cb(void *arg) } if (p->gottxhead < p->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead; - aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead]; + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; + nni_aio_set_iov(aio, 1, &iov); // send it down... nni_tls_send(p->tls, aio); nni_mtx_unlock(&p->mtx); return; } if (p->gotrxhead < p->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead; - aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead]; + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_tls_recv(p->tls, aio); nni_mtx_unlock(&p->mtx); return; @@ -234,20 +236,8 @@ nni_tls_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { nni_tls_send(p->tls, txaio); nni_mtx_unlock(&p->mtx); return; @@ -269,7 +259,7 @@ nni_tls_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio = p->rxaio; + nni_aio * rxaio; nni_mtx_lock(&p->mtx); @@ -279,26 +269,16 @@ nni_tls_pipe_recv_cb(void *arg) return; } + rxaio = p->rxaio; + if ((rv = nni_aio_result(p->rxaio)) != 0) { goto recv_error; } - n = nni_aio_count(p->rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - rxaio->a_niov--; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - } - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { + // Was this a partial read? If so then resubmit for the rest. nni_tls_recv(p->tls, rxaio); nni_mtx_unlock(&p->mtx); return; @@ -319,16 +299,17 @@ nni_tls_pipe_recv_cb(void *arg) goto recv_error; } - if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { goto recv_error; } // Submit the rest of the data for a read -- we want to // read the entire message now. if (len != 0) { - rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_tls_recv(p->tls, rxaio); nni_mtx_unlock(&p->mtx); @@ -356,7 +337,7 @@ recv_error: static void nni_tls_cancel_tx(nni_aio *aio, int rv) { - nni_tls_pipe *p = aio->a_prov_data; + nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { @@ -367,7 +348,7 @@ nni_tls_cancel_tx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->txaio, rv); + nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -379,6 +360,7 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -393,23 +375,23 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - niov = 0; - txaio = p->txaio; - txaio->a_iov[niov].iov_buf = p->txlen; - txaio->a_iov[niov].iov_len = sizeof(p->txlen); + niov = 0; + txaio = p->txaio; + iov[niov].iov_buf = p->txlen; + iov[niov].iov_len = sizeof(p->txlen); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_tls_send(p->tls, txaio); nni_mtx_unlock(&p->mtx); } @@ -417,7 +399,7 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) static void nni_tls_cancel_rx(nni_aio *aio, int rv) { - nni_tls_pipe *p = aio->a_prov_data; + nni_tls_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { @@ -428,7 +410,7 @@ nni_tls_cancel_rx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->rxaio, rv); + nni_aio_abort(p->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -437,6 +419,7 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&p->mtx); @@ -449,10 +432,11 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the TCP header. - rxaio = p->rxaio; - rxaio->a_iov[0].iov_buf = p->rxlen; - rxaio->a_iov[0].iov_len = sizeof(p->rxlen); - rxaio->a_niov = 1; + rxaio = p->rxaio; + + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); nni_tls_recv(p->tls, rxaio); nni_mtx_unlock(&p->mtx); @@ -471,7 +455,7 @@ nni_tls_pipe_getopt_locaddr(void *arg, void *v, size_t *szp) { nni_tls_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_sockname(p->tls, &sa)) == 0) { @@ -485,7 +469,7 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp) { nni_tls_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_tls_peername(p->tls, &sa)) == 0) { @@ -499,6 +483,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&p->mtx); p->txlen[0] = 0; @@ -508,15 +493,15 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&p->txlen[4], p->proto); NNI_PUT16(&p->txlen[6], 0); - p->user_negaio = aio; - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - negaio = p->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &p->txlen[0]; + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(negaio, 1, &iov); if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) { nni_mtx_unlock(&p->mtx); return; @@ -584,7 +569,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) tlsmode = NNG_TLS_MODE_CLIENT; authmode = NNG_TLS_AUTH_MODE_REQUIRED; lsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &rsa; + nni_aio_set_input(aio, 0, &rsa); if ((host == NULL) || (serv == NULL)) { nni_aio_fini(aio); return (NNG_EADDRINVAL); @@ -594,7 +579,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) tlsmode = NNG_TLS_MODE_SERVER; authmode = NNG_TLS_AUTH_MODE_NONE; rsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &lsa; + nni_aio_set_input(aio, 0, &lsa); } // XXX: arguably we could defer this part to the point we do a bind @@ -705,7 +690,7 @@ nni_tls_ep_cb(void *arg) static void nni_tls_cancel_ep(nni_aio *aio, int rv) { - nni_tls_ep *ep = aio->a_prov_data; + nni_tls_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&ep->mtx); if (ep->user_aio != aio) { @@ -715,7 +700,7 @@ nni_tls_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index aead6f59..2fe7804c 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -111,13 +111,13 @@ ws_pipe_recv_cb(void *arg) static void ws_pipe_recv_cancel(nni_aio *aio, int rv) { - ws_pipe *p = aio->a_prov_data; + ws_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { nni_mtx_unlock(&p->mtx); return; } - nni_aio_cancel(p->rxaio, rv); + nni_aio_abort(p->rxaio, rv); p->user_rxaio = NULL; nni_aio_finish_error(aio, rv); nni_mtx_unlock(&p->mtx); @@ -142,14 +142,14 @@ ws_pipe_recv(void *arg, nni_aio *aio) static void ws_pipe_send_cancel(nni_aio *aio, int rv) { - ws_pipe *p = aio->a_prov_data; + ws_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { nni_mtx_unlock(&p->mtx); return; } p->user_txaio = NULL; - nni_aio_cancel(p->txaio, rv); + nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); nni_mtx_unlock(&p->mtx); } @@ -255,7 +255,7 @@ ws_hook(void *arg, nni_http_req *req, nni_http_res *res) NNI_LIST_FOREACH (&ep->headers, h) { int rv; - rv = nni_http_res_set_header(res, h->name, h->value); + rv = nng_http_res_set_header(res, h->name, h->value); if (rv != 0) { return (rv); } @@ -279,7 +279,7 @@ ws_ep_bind(void *arg) static void ws_ep_cancel(nni_aio *aio, int rv) { - ws_ep *ep = aio->a_prov_data; + ws_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&ep->mtx); if (nni_aio_list_active(aio)) { @@ -484,7 +484,7 @@ ws_pipe_getopt_locaddr(void *arg, void *v, size_t *szp) { ws_pipe * p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_ws_sock_addr(p->ws, &sa)) == 0) { @@ -498,7 +498,7 @@ ws_pipe_getopt_remaddr(void *arg, void *v, size_t *szp) { ws_pipe * p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_ws_peer_addr(p->ws, &sa)) == 0) { diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index a5ca739c..d6762d25 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -367,18 +367,19 @@ zt_node_rcv4_cb(void *arg) // XXX: CHECK THIS, if it fails then we have a fatal error with // the znode, and have to shut everything down. ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, - ztn->zn_rcv4_buf, aio->a_count, &now); + ztn->zn_rcv4_buf, nni_aio_count(aio), &now); // Schedule background work zt_node_resched(ztn, now); // Schedule another receive. if (ztn->zn_udp4 != NULL) { - aio->a_niov = 1; - aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; - aio->a_iov[0].iov_len = zt_rcv_bufsize; - aio->a_addr = &ztn->zn_rcv4_addr; - aio->a_count = 0; + nni_iov iov; + iov.iov_buf = ztn->zn_rcv4_buf; + iov.iov_len = zt_rcv_bufsize; + nni_aio_set_iov(aio, 1, &iov); + + nni_aio_set_input(aio, 0, &ztn->zn_rcv4_addr); nni_plat_udp_recv(ztn->zn_udp4, aio); } @@ -416,18 +417,18 @@ zt_node_rcv6_cb(void *arg) // We are not going to perform any validation of the data; we // just pass this straight into the ZeroTier core. ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, - ztn->zn_rcv6_buf, aio->a_count, &now); + ztn->zn_rcv6_buf, nni_aio_count(aio), &now); // Schedule background work zt_node_resched(ztn, now); // Schedule another receive. if (ztn->zn_udp6 != NULL) { - aio->a_niov = 1; - aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; - aio->a_iov[0].iov_len = zt_rcv_bufsize; - aio->a_addr = &ztn->zn_rcv6_addr; - aio->a_count = 0; + nni_iov iov; + iov.iov_buf = ztn->zn_rcv6_buf; + iov.iov_len = zt_rcv_bufsize; + nni_aio_set_iov(aio, 1, &iov); + nni_aio_set_input(aio, 0, &ztn->zn_rcv6_addr); nni_plat_udp_recv(ztn->zn_udp6, aio); } nni_mtx_unlock(&zt_lk); @@ -1309,6 +1310,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, uint16_t port; uint8_t * buf; zt_send_hdr * hdr; + nni_iov iov; NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(socket); @@ -1353,11 +1355,11 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, nni_aio_set_data(aio, 0, hdr); hdr->sa = addr; hdr->len = len; + nni_aio_set_input(aio, 0, &hdr->sa); - aio->a_addr = &hdr->sa; - aio->a_niov = 1; - aio->a_iov[0].iov_buf = buf; - aio->a_iov[0].iov_len = len; + iov.iov_buf = buf; + iov.iov_len = len; + nni_aio_set_iov(aio, 1, &iov); // This should be non-blocking/best-effort, so while // not great that we're holding the lock, also not tragic. @@ -1423,6 +1425,7 @@ zt_node_create(zt_node **ztnp, const char *path) nng_sockaddr sa6; int rv; enum ZT_ResultCode zrv; + nni_iov iov; // We want to bind to any address we can (for now). // Note that at the moment we only support IPv4. Its @@ -1487,16 +1490,14 @@ zt_node_create(zt_node **ztnp, const char *path) zt_node_resched(ztn, 1); // Schedule receive - ztn->zn_rcv4_aio->a_niov = 1; - ztn->zn_rcv4_aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf; - ztn->zn_rcv4_aio->a_iov[0].iov_len = zt_rcv_bufsize; - ztn->zn_rcv4_aio->a_addr = &ztn->zn_rcv4_addr; - ztn->zn_rcv4_aio->a_count = 0; - ztn->zn_rcv6_aio->a_niov = 1; - ztn->zn_rcv6_aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf; - ztn->zn_rcv6_aio->a_iov[0].iov_len = zt_rcv_bufsize; - ztn->zn_rcv6_aio->a_addr = &ztn->zn_rcv6_addr; - ztn->zn_rcv6_aio->a_count = 0; + iov.iov_buf = ztn->zn_rcv4_buf; + iov.iov_len = zt_rcv_bufsize; + nni_aio_set_iov(ztn->zn_rcv4_aio, 1, &iov); + nni_aio_set_input(ztn->zn_rcv4_aio, 0, &ztn->zn_rcv4_addr); + iov.iov_buf = ztn->zn_rcv6_buf; + iov.iov_len = zt_rcv_bufsize; + nni_aio_set_iov(ztn->zn_rcv6_aio, 1, &iov); + nni_aio_set_input(ztn->zn_rcv6_aio, 0, &ztn->zn_rcv6_addr); nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio); nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio); @@ -1812,7 +1813,7 @@ zt_pipe_send(void *arg, nni_aio *aio) static void zt_pipe_cancel_recv(nni_aio *aio, int rv) { - zt_pipe *p = aio->a_prov_data; + zt_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&zt_lk); if (p->zp_user_rxaio == aio) { p->zp_user_rxaio = NULL; @@ -1968,7 +1969,7 @@ zt_pipe_get_node(void *arg, void *buf, size_t *szp) static void zt_pipe_cancel_ping(nni_aio *aio, int rv) { - zt_pipe *p = aio->a_prov_data; + zt_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&zt_lk); if (p->zp_ping_active) { @@ -2195,7 +2196,7 @@ zt_ep_close(void *arg) zt_node *ztn; nni_aio *aio; - nni_aio_cancel(ep->ze_creq_aio, NNG_ECLOSED); + nni_aio_abort(ep->ze_creq_aio, NNG_ECLOSED); // Cancel any outstanding user operation(s) - they should have // been aborted by the above cancellation, but we need to be @@ -2293,12 +2294,12 @@ zt_ep_bind(void *arg) static void zt_ep_cancel(nni_aio *aio, int rv) { - zt_ep *ep = aio->a_prov_data; + zt_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&zt_lk); if (nni_aio_list_active(aio)) { if (ep->ze_aio != NULL) { - nni_aio_cancel(ep->ze_aio, rv); + nni_aio_abort(ep->ze_aio, rv); } nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -2370,7 +2371,7 @@ zt_ep_accept(void *arg, nni_aio *aio) static void zt_ep_conn_req_cancel(nni_aio *aio, int rv) { - zt_ep *ep = aio->a_prov_data; + zt_ep *ep = nni_aio_get_prov_data(aio); // We don't have much to do here. The AIO will have been // canceled as a result of the "parent" AIO canceling. nni_mtx_lock(&zt_lk); |
