aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c41
-rw-r--r--src/transport/ipc/ipc.c124
-rw-r--r--src/transport/tcp/tcp.c137
-rw-r--r--src/transport/tls/tls.c135
-rw-r--r--src/transport/ws/websocket.c16
-rw-r--r--src/transport/zerotier/zerotier.c65
6 files changed, 234 insertions, 284 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 4cd1c97d..6329a627 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -143,7 +143,7 @@ static void
nni_inproc_pipe_send(void *arg, nni_aio *aio)
{
nni_inproc_pipe *pipe = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
char * h;
size_t l;
int rv;
@@ -153,7 +153,7 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio)
h = nni_msg_header(msg);
l = nni_msg_header_len(msg);
if ((rv = nni_msg_insert(msg, h, l)) != 0) {
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_aio_finish(aio, rv, nni_aio_count(aio));
return;
}
nni_msg_header_chop(msg, l);
@@ -219,12 +219,12 @@ nni_inproc_ep_fini(void *arg)
static void
nni_inproc_conn_finish(nni_aio *aio, int rv)
{
- nni_inproc_ep *ep = aio->a_prov_extra[0];
+ nni_inproc_ep *ep = nni_aio_get_prov_data(aio);
void * pipe;
nni_aio_list_remove(aio);
- pipe = aio->a_pipe;
- aio->a_pipe = NULL;
+ pipe = nni_aio_get_pipe(aio);
+ nni_aio_set_pipe(aio, NULL);
if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) &&
nni_list_empty(&ep->aios)) {
@@ -298,8 +298,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
nni_mtx_init(&pair->mx);
- pair->pipes[0] = caio->a_pipe;
- pair->pipes[1] = saio->a_pipe;
+ pair->pipes[0] = nni_aio_get_pipe(caio);
+ pair->pipes[1] = nni_aio_get_pipe(saio);
pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1];
pair->pipes[0]->pair = pair->pipes[1]->pair = pair;
@@ -324,15 +324,15 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
static void
nni_inproc_ep_cancel(nni_aio *aio, int rv)
{
- nni_inproc_ep * ep = aio->a_prov_data;
+ nni_inproc_ep * ep = nni_aio_get_prov_data(aio);
nni_inproc_pipe *pipe;
nni_mtx_lock(&nni_inproc.mx);
if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_list_node_remove(&ep->node);
- if ((pipe = aio->a_pipe) != NULL) {
- aio->a_pipe = NULL;
+ if ((pipe = nni_aio_get_pipe(aio)) != NULL) {
+ nni_aio_set_pipe(aio, NULL);
nni_inproc_pipe_fini(pipe);
}
nni_aio_finish_error(aio, rv);
@@ -343,9 +343,10 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv)
static void
nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
- nni_inproc_ep *ep = arg;
- nni_inproc_ep *server;
- int rv;
+ nni_inproc_ep * ep = arg;
+ nni_inproc_ep * server;
+ int rv;
+ nni_inproc_pipe *pipe;
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
@@ -358,12 +359,12 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
return;
}
- aio->a_prov_extra[0] = ep;
- if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) {
+ if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
+ nni_aio_set_pipe(aio, pipe);
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
@@ -406,8 +407,9 @@ nni_inproc_ep_bind(void *arg)
static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
- nni_inproc_ep *ep = arg;
- int rv;
+ nni_inproc_ep * ep = arg;
+ nni_inproc_pipe *pipe;
+ int rv;
nni_mtx_lock(&nni_inproc.mx);
@@ -416,15 +418,14 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
return;
}
- aio->a_prov_extra[0] = ep;
-
// We are already on the master list of servers, thanks to bind.
- if ((rv = nni_inproc_pipe_init((void *) &aio->a_pipe, ep)) != 0) {
+ if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
+ nni_aio_set_pipe(aio, pipe);
// Insert us into the pending server aios, and then run the
// accept list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 9e8a3829..6475e43b 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -28,7 +28,7 @@ struct nni_ipc_pipe {
uint16_t peer;
uint16_t proto;
size_t rcvmax;
- nng_sockaddr sa;
+ nni_sockaddr sa;
uint8_t txhead[1 + sizeof(uint64_t)];
uint8_t rxhead[1 + sizeof(uint64_t)];
@@ -133,7 +133,7 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
static void
nni_ipc_cancel_start(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_negaio != aio) {
@@ -143,7 +143,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv)
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->negaio, rv);
+ nni_aio_abort(pipe->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -167,18 +167,20 @@ nni_ipc_pipe_nego_cb(void *arg)
}
if (pipe->gottxhead < pipe->wanttxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
- aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead];
+ nni_iov iov;
+ iov.iov_len = pipe->wanttxhead - pipe->gottxhead;
+ iov.iov_buf = &pipe->txhead[pipe->gottxhead];
+ nni_aio_set_iov(aio, 1, &iov);
// send it down...
nni_plat_ipc_pipe_send(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
}
if (pipe->gotrxhead < pipe->wantrxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
- aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_iov iov;
+ iov.iov_len = pipe->wantrxhead - pipe->gotrxhead;
+ iov.iov_buf = &pipe->rxhead[pipe->gotrxhead];
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, aio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -229,20 +231,8 @@ nni_ipc_pipe_send_cb(void *arg)
}
n = nni_aio_count(txaio);
- while (n) {
- NNI_ASSERT(txaio->a_niov != 0);
- if (txaio->a_iov[0].iov_len > n) {
- txaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(txaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= txaio->a_iov[0].iov_len;
- for (int i = 0; i < txaio->a_niov; i++) {
- txaio->a_iov[i] = txaio->a_iov[i + 1];
- }
- txaio->a_niov--;
- }
- if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(txaio, n);
+ if (nni_aio_iov_count(txaio) != 0) {
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -281,22 +271,9 @@ nni_ipc_pipe_recv_cb(void *arg)
}
n = nni_aio_count(rxaio);
- while (n) {
- NNI_ASSERT(rxaio->a_niov != 0);
- if (rxaio->a_iov[0].iov_len > n) {
- rxaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(rxaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= rxaio->a_iov[0].iov_len;
- for (int i = 0; i < rxaio->a_niov; i++) {
- rxaio->a_iov[i] = rxaio->a_iov[i + 1];
- }
- rxaio->a_niov--;
- }
-
- // Was this a partial read? If so then resubmit for the rest.
- if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(rxaio, n);
+ if (nni_aio_iov_count(rxaio) != 0) {
+ // Was this a partial read? If so then resubmit for the rest.
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -329,17 +306,17 @@ nni_ipc_pipe_recv_cb(void *arg)
// lock for the read side in the future, so that we allow
// transmits to proceed normally. In practice this is
// unlikely to be much of an issue though.
- if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
if (len != 0) {
+ nni_iov iov;
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- rxaio->a_iov[0].iov_len = (size_t) len;
- rxaio->a_niov = 1;
-
+ iov.iov_buf = nni_msg_body(pipe->rxmsg);
+ iov.iov_len = (size_t) len;
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
@@ -368,7 +345,7 @@ recv_error:
static void
nni_ipc_cancel_tx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_txaio != aio) {
@@ -378,7 +355,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->txaio, rv);
+ nni_aio_abort(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -390,6 +367,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
uint64_t len;
nni_aio * txaio;
int niov;
+ nni_iov iov[3];
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -404,22 +382,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- txaio = pipe->txaio;
- niov = 0;
- txaio->a_iov[niov].iov_buf = pipe->txhead;
- txaio->a_iov[niov].iov_len = sizeof(pipe->txhead);
+ txaio = pipe->txaio;
+ niov = 0;
+ iov[0].iov_buf = pipe->txhead;
+ iov[0].iov_len = sizeof(pipe->txhead);
niov++;
if (nni_msg_header_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
- txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ iov[niov].iov_len = nni_msg_header_len(msg);
niov++;
}
if (nni_msg_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
- txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ iov[niov].iov_len = nni_msg_len(msg);
niov++;
}
- txaio->a_niov = niov;
+ nni_aio_set_iov(txaio, niov, iov);
nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
@@ -428,7 +406,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
static void
nni_ipc_cancel_rx(nni_aio *aio, int rv)
{
- nni_ipc_pipe *pipe = aio->a_prov_data;
+ nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio);
nni_mtx_lock(&pipe->mtx);
if (pipe->user_rxaio != aio) {
@@ -438,7 +416,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(pipe->rxaio, rv);
+ nni_aio_abort(pipe->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -447,6 +425,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
nni_aio * rxaio;
+ nni_iov iov;
nni_mtx_lock(&pipe->mtx);
@@ -459,10 +438,10 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
- rxaio = pipe->rxaio;
- rxaio->a_iov[0].iov_buf = pipe->rxhead;
- rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead);
- rxaio->a_niov = 1;
+ rxaio = pipe->rxaio;
+ iov.iov_buf = pipe->rxhead;
+ iov.iov_len = sizeof(pipe->rxhead);
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
@@ -474,6 +453,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_ipc_pipe *pipe = arg;
int rv;
nni_aio * negaio;
+ nni_iov iov;
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
@@ -483,15 +463,15 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txhead[4], pipe->proto);
NNI_PUT16(&pipe->txhead[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- negaio = pipe->negaio;
- negaio->a_niov = 1;
- negaio->a_iov[0].iov_len = 8;
- negaio->a_iov[0].iov_buf = &pipe->txhead[0];
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ negaio = pipe->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &pipe->txhead[0];
+ nni_aio_set_iov(negaio, 1, &iov);
rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
if (rv != 0) {
nni_mtx_unlock(&pipe->mtx);
@@ -638,7 +618,7 @@ nni_ipc_ep_cb(void *arg)
static void
nni_ipc_cancel_ep(nni_aio *aio, int rv)
{
- nni_ipc_ep *ep = aio->a_prov_data;
+ nni_ipc_ep *ep = nni_aio_get_prov_data(aio);
NNI_ASSERT(rv != 0);
nni_mtx_lock(&ep->mtx);
@@ -649,7 +629,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(ep->aio, rv);
+ nni_aio_abort(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 28ec9438..1504d0ee 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -130,7 +130,7 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
static void
nni_tcp_cancel_nego(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_negaio != aio) {
@@ -140,7 +140,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(p->negaio, rv);
+ nni_aio_abort(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -164,18 +164,20 @@ nni_tcp_pipe_nego_cb(void *arg)
}
if (p->gottxhead < p->wanttxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead;
- aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead];
+ nni_iov iov;
+ iov.iov_len = p->wanttxhead - p->gottxhead;
+ iov.iov_buf = &p->txlen[p->gottxhead];
// send it down...
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_tcp_pipe_send(p->tpp, aio);
nni_mtx_unlock(&p->mtx);
return;
}
if (p->gotrxhead < p->wantrxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead;
- aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead];
+ nni_iov iov;
+ iov.iov_len = p->wantrxhead - p->gotrxhead;
+ iov.iov_buf = &p->rxlen[p->gotrxhead];
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, aio);
nni_mtx_unlock(&p->mtx);
return;
@@ -206,7 +208,7 @@ nni_tcp_pipe_send_cb(void *arg)
int rv;
nni_aio * aio;
size_t n;
- nng_msg * msg;
+ nni_msg * msg;
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
@@ -226,20 +228,8 @@ nni_tcp_pipe_send_cb(void *arg)
}
n = nni_aio_count(txaio);
- while (n) {
- NNI_ASSERT(txaio->a_niov != 0);
- if (txaio->a_iov[0].iov_len > n) {
- txaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(txaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= txaio->a_iov[0].iov_len;
- for (int i = 0; i < txaio->a_niov; i++) {
- txaio->a_iov[i] = txaio->a_iov[i + 1];
- }
- txaio->a_niov--;
- }
- if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(txaio, n);
+ if (nni_aio_iov_count(txaio) > 0) {
nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -261,7 +251,7 @@ nni_tcp_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio = p->rxaio;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -271,26 +261,15 @@ nni_tcp_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(p->rxaio)) != 0) {
+ rxaio = p->rxaio;
+
+ if ((rv = nni_aio_result(rxaio)) != 0) {
goto recv_error;
}
- n = nni_aio_count(p->rxaio);
- while (n) {
- NNI_ASSERT(rxaio->a_niov != 0);
- if (rxaio->a_iov[0].iov_len > n) {
- rxaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(rxaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= rxaio->a_iov[0].iov_len;
- rxaio->a_niov--;
- for (int i = 0; i < rxaio->a_niov; i++) {
- rxaio->a_iov[i] = rxaio->a_iov[i + 1];
- }
- }
- // Was this a partial read? If so then resubmit for the rest.
- if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ n = nni_aio_count(rxaio);
+ nni_aio_iov_advance(rxaio, n);
+ if (nni_aio_iov_count(rxaio) > 0) {
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -311,17 +290,18 @@ nni_tcp_pipe_recv_cb(void *arg)
goto recv_error;
}
- if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
// Submit the rest of the data for a read -- we want to
// read the entire message now.
if (len != 0) {
- rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- rxaio->a_iov[0].iov_len = (size_t) len;
- rxaio->a_niov = 1;
+ nni_iov iov;
+ iov.iov_buf = nni_msg_body(p->rxmsg);
+ iov.iov_len = (size_t) len;
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -348,7 +328,7 @@ recv_error:
static void
nni_tcp_cancel_tx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_txaio != aio) {
@@ -359,7 +339,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->txaio, rv);
+ nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -371,6 +351,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
uint64_t len;
nni_aio * txaio;
int niov;
+ nni_iov iov[3];
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -385,22 +366,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- niov = 0;
- txaio = p->txaio;
- txaio->a_iov[niov].iov_buf = p->txlen;
- txaio->a_iov[niov].iov_len = sizeof(p->txlen);
+ niov = 0;
+ txaio = p->txaio;
+ iov[niov].iov_buf = p->txlen;
+ iov[niov].iov_len = sizeof(p->txlen);
niov++;
if (nni_msg_header_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
- txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ iov[niov].iov_len = nni_msg_header_len(msg);
niov++;
}
if (nni_msg_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
- txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ iov[niov].iov_len = nni_msg_len(msg);
niov++;
}
- txaio->a_niov = niov;
+ nni_aio_set_iov(txaio, niov, iov);
nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
@@ -409,7 +390,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
static void
nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_rxaio != aio) {
@@ -420,7 +401,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->rxaio, rv);
+ nni_aio_abort(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -429,6 +410,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
nni_aio * rxaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
@@ -441,10 +423,10 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- rxaio = p->rxaio;
- rxaio->a_iov[0].iov_buf = p->rxlen;
- rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
- rxaio->a_niov = 1;
+ rxaio = p->rxaio;
+ iov.iov_buf = p->rxlen;
+ iov.iov_len = sizeof(p->rxlen);
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
@@ -463,7 +445,7 @@ nni_tcp_pipe_getopt_locaddr(void *arg, void *v, size_t *szp)
{
nni_tcp_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) {
@@ -477,7 +459,7 @@ nni_tcp_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
{
nni_tcp_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) {
@@ -492,6 +474,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
nni_aio * negaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -501,15 +484,15 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- negaio = p->negaio;
- negaio->a_niov = 1;
- negaio->a_iov[0].iov_len = 8;
- negaio->a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(negaio, 1, &iov);
if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
@@ -572,7 +555,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
if (mode == NNI_EP_MODE_DIAL) {
passive = 0;
lsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &rsa;
+ nni_aio_set_input(aio, 0, &rsa);
if ((host == NULL) || (serv == NULL)) {
nni_aio_fini(aio);
return (NNG_EADDRINVAL);
@@ -580,7 +563,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
} else {
passive = 1;
rsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &lsa;
+ nni_aio_set_input(aio, 0, &lsa);
}
nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, passive, aio);
@@ -685,7 +668,7 @@ nni_tcp_ep_cb(void *arg)
static void
nni_tcp_cancel_ep(nni_aio *aio, int rv)
{
- nni_tcp_ep *ep = aio->a_prov_data;
+ nni_tcp_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ep->mtx);
if (ep->user_aio != aio) {
@@ -695,7 +678,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(ep->aio, rv);
+ nni_aio_abort(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index f6c5bc6e..09c59582 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -138,7 +138,7 @@ nni_tls_pipe_init(nni_tls_pipe **pipep, nni_tls_ep *ep, void *tpp)
static void
nni_tls_cancel_nego(nni_aio *aio, int rv)
{
- nni_tls_pipe *p = aio->a_prov_data;
+ nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_negaio != aio) {
@@ -148,7 +148,7 @@ nni_tls_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(p->negaio, rv);
+ nni_aio_abort(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -172,18 +172,20 @@ nni_tls_pipe_nego_cb(void *arg)
}
if (p->gottxhead < p->wanttxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead;
- aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead];
+ nni_iov iov;
+ iov.iov_len = p->wanttxhead - p->gottxhead;
+ iov.iov_buf = &p->txlen[p->gottxhead];
+ nni_aio_set_iov(aio, 1, &iov);
// send it down...
nni_tls_send(p->tls, aio);
nni_mtx_unlock(&p->mtx);
return;
}
if (p->gotrxhead < p->wantrxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead;
- aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead];
+ nni_iov iov;
+ iov.iov_len = p->wantrxhead - p->gotrxhead;
+ iov.iov_buf = &p->rxlen[p->gotrxhead];
+ nni_aio_set_iov(aio, 1, &iov);
nni_tls_recv(p->tls, aio);
nni_mtx_unlock(&p->mtx);
return;
@@ -234,20 +236,8 @@ nni_tls_pipe_send_cb(void *arg)
}
n = nni_aio_count(txaio);
- while (n) {
- NNI_ASSERT(txaio->a_niov != 0);
- if (txaio->a_iov[0].iov_len > n) {
- txaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(txaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= txaio->a_iov[0].iov_len;
- for (int i = 0; i < txaio->a_niov; i++) {
- txaio->a_iov[i] = txaio->a_iov[i + 1];
- }
- txaio->a_niov--;
- }
- if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(txaio, n);
+ if (nni_aio_iov_count(txaio) > 0) {
nni_tls_send(p->tls, txaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -269,7 +259,7 @@ nni_tls_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio = p->rxaio;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -279,26 +269,16 @@ nni_tls_pipe_recv_cb(void *arg)
return;
}
+ rxaio = p->rxaio;
+
if ((rv = nni_aio_result(p->rxaio)) != 0) {
goto recv_error;
}
- n = nni_aio_count(p->rxaio);
- while (n) {
- NNI_ASSERT(rxaio->a_niov != 0);
- if (rxaio->a_iov[0].iov_len > n) {
- rxaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(rxaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= rxaio->a_iov[0].iov_len;
- rxaio->a_niov--;
- for (int i = 0; i < rxaio->a_niov; i++) {
- rxaio->a_iov[i] = rxaio->a_iov[i + 1];
- }
- }
- // Was this a partial read? If so then resubmit for the rest.
- if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ n = nni_aio_count(rxaio);
+ nni_aio_iov_advance(rxaio, n);
+ if (nni_aio_iov_count(rxaio) > 0) {
+ // Was this a partial read? If so then resubmit for the rest.
nni_tls_recv(p->tls, rxaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -319,16 +299,17 @@ nni_tls_pipe_recv_cb(void *arg)
goto recv_error;
}
- if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
// Submit the rest of the data for a read -- we want to
// read the entire message now.
if (len != 0) {
- rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- rxaio->a_iov[0].iov_len = (size_t) len;
- rxaio->a_niov = 1;
+ nni_iov iov;
+ iov.iov_buf = nni_msg_body(p->rxmsg);
+ iov.iov_len = (size_t) len;
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_tls_recv(p->tls, rxaio);
nni_mtx_unlock(&p->mtx);
@@ -356,7 +337,7 @@ recv_error:
static void
nni_tls_cancel_tx(nni_aio *aio, int rv)
{
- nni_tls_pipe *p = aio->a_prov_data;
+ nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_txaio != aio) {
@@ -367,7 +348,7 @@ nni_tls_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->txaio, rv);
+ nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -379,6 +360,7 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
uint64_t len;
nni_aio * txaio;
int niov;
+ nni_iov iov[3];
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -393,23 +375,23 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- niov = 0;
- txaio = p->txaio;
- txaio->a_iov[niov].iov_buf = p->txlen;
- txaio->a_iov[niov].iov_len = sizeof(p->txlen);
+ niov = 0;
+ txaio = p->txaio;
+ iov[niov].iov_buf = p->txlen;
+ iov[niov].iov_len = sizeof(p->txlen);
niov++;
if (nni_msg_header_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
- txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ iov[niov].iov_len = nni_msg_header_len(msg);
niov++;
}
if (nni_msg_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
- txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ iov[niov].iov_len = nni_msg_len(msg);
niov++;
}
- txaio->a_niov = niov;
+ nni_aio_set_iov(txaio, niov, iov);
nni_tls_send(p->tls, txaio);
nni_mtx_unlock(&p->mtx);
}
@@ -417,7 +399,7 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
static void
nni_tls_cancel_rx(nni_aio *aio, int rv)
{
- nni_tls_pipe *p = aio->a_prov_data;
+ nni_tls_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_rxaio != aio) {
@@ -428,7 +410,7 @@ nni_tls_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->rxaio, rv);
+ nni_aio_abort(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -437,6 +419,7 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
nni_aio * rxaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
@@ -449,10 +432,11 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- rxaio = p->rxaio;
- rxaio->a_iov[0].iov_buf = p->rxlen;
- rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
- rxaio->a_niov = 1;
+ rxaio = p->rxaio;
+
+ iov.iov_buf = p->rxlen;
+ iov.iov_len = sizeof(p->rxlen);
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_tls_recv(p->tls, rxaio);
nni_mtx_unlock(&p->mtx);
@@ -471,7 +455,7 @@ nni_tls_pipe_getopt_locaddr(void *arg, void *v, size_t *szp)
{
nni_tls_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_tls_sockname(p->tls, &sa)) == 0) {
@@ -485,7 +469,7 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
{
nni_tls_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_tls_peername(p->tls, &sa)) == 0) {
@@ -499,6 +483,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
nni_aio * negaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -508,15 +493,15 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- negaio = p->negaio;
- negaio->a_niov = 1;
- negaio->a_iov[0].iov_len = 8;
- negaio->a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(negaio, 1, &iov);
if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
@@ -584,7 +569,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
tlsmode = NNG_TLS_MODE_CLIENT;
authmode = NNG_TLS_AUTH_MODE_REQUIRED;
lsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &rsa;
+ nni_aio_set_input(aio, 0, &rsa);
if ((host == NULL) || (serv == NULL)) {
nni_aio_fini(aio);
return (NNG_EADDRINVAL);
@@ -594,7 +579,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
tlsmode = NNG_TLS_MODE_SERVER;
authmode = NNG_TLS_AUTH_MODE_NONE;
rsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &lsa;
+ nni_aio_set_input(aio, 0, &lsa);
}
// XXX: arguably we could defer this part to the point we do a bind
@@ -705,7 +690,7 @@ nni_tls_ep_cb(void *arg)
static void
nni_tls_cancel_ep(nni_aio *aio, int rv)
{
- nni_tls_ep *ep = aio->a_prov_data;
+ nni_tls_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ep->mtx);
if (ep->user_aio != aio) {
@@ -715,7 +700,7 @@ nni_tls_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(ep->aio, rv);
+ nni_aio_abort(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index aead6f59..2fe7804c 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -111,13 +111,13 @@ ws_pipe_recv_cb(void *arg)
static void
ws_pipe_recv_cancel(nni_aio *aio, int rv)
{
- ws_pipe *p = aio->a_prov_data;
+ ws_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_rxaio != aio) {
nni_mtx_unlock(&p->mtx);
return;
}
- nni_aio_cancel(p->rxaio, rv);
+ nni_aio_abort(p->rxaio, rv);
p->user_rxaio = NULL;
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
@@ -142,14 +142,14 @@ ws_pipe_recv(void *arg, nni_aio *aio)
static void
ws_pipe_send_cancel(nni_aio *aio, int rv)
{
- ws_pipe *p = aio->a_prov_data;
+ ws_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_txaio != aio) {
nni_mtx_unlock(&p->mtx);
return;
}
p->user_txaio = NULL;
- nni_aio_cancel(p->txaio, rv);
+ nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
}
@@ -255,7 +255,7 @@ ws_hook(void *arg, nni_http_req *req, nni_http_res *res)
NNI_LIST_FOREACH (&ep->headers, h) {
int rv;
- rv = nni_http_res_set_header(res, h->name, h->value);
+ rv = nng_http_res_set_header(res, h->name, h->value);
if (rv != 0) {
return (rv);
}
@@ -279,7 +279,7 @@ ws_ep_bind(void *arg)
static void
ws_ep_cancel(nni_aio *aio, int rv)
{
- ws_ep *ep = aio->a_prov_data;
+ ws_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ep->mtx);
if (nni_aio_list_active(aio)) {
@@ -484,7 +484,7 @@ ws_pipe_getopt_locaddr(void *arg, void *v, size_t *szp)
{
ws_pipe * p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_ws_sock_addr(p->ws, &sa)) == 0) {
@@ -498,7 +498,7 @@ ws_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
{
ws_pipe * p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_ws_peer_addr(p->ws, &sa)) == 0) {
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index a5ca739c..d6762d25 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -367,18 +367,19 @@ zt_node_rcv4_cb(void *arg)
// XXX: CHECK THIS, if it fails then we have a fatal error with
// the znode, and have to shut everything down.
ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa,
- ztn->zn_rcv4_buf, aio->a_count, &now);
+ ztn->zn_rcv4_buf, nni_aio_count(aio), &now);
// Schedule background work
zt_node_resched(ztn, now);
// Schedule another receive.
if (ztn->zn_udp4 != NULL) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf;
- aio->a_iov[0].iov_len = zt_rcv_bufsize;
- aio->a_addr = &ztn->zn_rcv4_addr;
- aio->a_count = 0;
+ nni_iov iov;
+ iov.iov_buf = ztn->zn_rcv4_buf;
+ iov.iov_len = zt_rcv_bufsize;
+ nni_aio_set_iov(aio, 1, &iov);
+
+ nni_aio_set_input(aio, 0, &ztn->zn_rcv4_addr);
nni_plat_udp_recv(ztn->zn_udp4, aio);
}
@@ -416,18 +417,18 @@ zt_node_rcv6_cb(void *arg)
// We are not going to perform any validation of the data; we
// just pass this straight into the ZeroTier core.
ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa,
- ztn->zn_rcv6_buf, aio->a_count, &now);
+ ztn->zn_rcv6_buf, nni_aio_count(aio), &now);
// Schedule background work
zt_node_resched(ztn, now);
// Schedule another receive.
if (ztn->zn_udp6 != NULL) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf;
- aio->a_iov[0].iov_len = zt_rcv_bufsize;
- aio->a_addr = &ztn->zn_rcv6_addr;
- aio->a_count = 0;
+ nni_iov iov;
+ iov.iov_buf = ztn->zn_rcv6_buf;
+ iov.iov_len = zt_rcv_bufsize;
+ nni_aio_set_iov(aio, 1, &iov);
+ nni_aio_set_input(aio, 0, &ztn->zn_rcv6_addr);
nni_plat_udp_recv(ztn->zn_udp6, aio);
}
nni_mtx_unlock(&zt_lk);
@@ -1309,6 +1310,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
uint16_t port;
uint8_t * buf;
zt_send_hdr * hdr;
+ nni_iov iov;
NNI_ARG_UNUSED(thr);
NNI_ARG_UNUSED(socket);
@@ -1353,11 +1355,11 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
nni_aio_set_data(aio, 0, hdr);
hdr->sa = addr;
hdr->len = len;
+ nni_aio_set_input(aio, 0, &hdr->sa);
- aio->a_addr = &hdr->sa;
- aio->a_niov = 1;
- aio->a_iov[0].iov_buf = buf;
- aio->a_iov[0].iov_len = len;
+ iov.iov_buf = buf;
+ iov.iov_len = len;
+ nni_aio_set_iov(aio, 1, &iov);
// This should be non-blocking/best-effort, so while
// not great that we're holding the lock, also not tragic.
@@ -1423,6 +1425,7 @@ zt_node_create(zt_node **ztnp, const char *path)
nng_sockaddr sa6;
int rv;
enum ZT_ResultCode zrv;
+ nni_iov iov;
// We want to bind to any address we can (for now).
// Note that at the moment we only support IPv4. Its
@@ -1487,16 +1490,14 @@ zt_node_create(zt_node **ztnp, const char *path)
zt_node_resched(ztn, 1);
// Schedule receive
- ztn->zn_rcv4_aio->a_niov = 1;
- ztn->zn_rcv4_aio->a_iov[0].iov_buf = ztn->zn_rcv4_buf;
- ztn->zn_rcv4_aio->a_iov[0].iov_len = zt_rcv_bufsize;
- ztn->zn_rcv4_aio->a_addr = &ztn->zn_rcv4_addr;
- ztn->zn_rcv4_aio->a_count = 0;
- ztn->zn_rcv6_aio->a_niov = 1;
- ztn->zn_rcv6_aio->a_iov[0].iov_buf = ztn->zn_rcv6_buf;
- ztn->zn_rcv6_aio->a_iov[0].iov_len = zt_rcv_bufsize;
- ztn->zn_rcv6_aio->a_addr = &ztn->zn_rcv6_addr;
- ztn->zn_rcv6_aio->a_count = 0;
+ iov.iov_buf = ztn->zn_rcv4_buf;
+ iov.iov_len = zt_rcv_bufsize;
+ nni_aio_set_iov(ztn->zn_rcv4_aio, 1, &iov);
+ nni_aio_set_input(ztn->zn_rcv4_aio, 0, &ztn->zn_rcv4_addr);
+ iov.iov_buf = ztn->zn_rcv6_buf;
+ iov.iov_len = zt_rcv_bufsize;
+ nni_aio_set_iov(ztn->zn_rcv6_aio, 1, &iov);
+ nni_aio_set_input(ztn->zn_rcv6_aio, 0, &ztn->zn_rcv6_addr);
nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio);
nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio);
@@ -1812,7 +1813,7 @@ zt_pipe_send(void *arg, nni_aio *aio)
static void
zt_pipe_cancel_recv(nni_aio *aio, int rv)
{
- zt_pipe *p = aio->a_prov_data;
+ zt_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&zt_lk);
if (p->zp_user_rxaio == aio) {
p->zp_user_rxaio = NULL;
@@ -1968,7 +1969,7 @@ zt_pipe_get_node(void *arg, void *buf, size_t *szp)
static void
zt_pipe_cancel_ping(nni_aio *aio, int rv)
{
- zt_pipe *p = aio->a_prov_data;
+ zt_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&zt_lk);
if (p->zp_ping_active) {
@@ -2195,7 +2196,7 @@ zt_ep_close(void *arg)
zt_node *ztn;
nni_aio *aio;
- nni_aio_cancel(ep->ze_creq_aio, NNG_ECLOSED);
+ nni_aio_abort(ep->ze_creq_aio, NNG_ECLOSED);
// Cancel any outstanding user operation(s) - they should have
// been aborted by the above cancellation, but we need to be
@@ -2293,12 +2294,12 @@ zt_ep_bind(void *arg)
static void
zt_ep_cancel(nni_aio *aio, int rv)
{
- zt_ep *ep = aio->a_prov_data;
+ zt_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&zt_lk);
if (nni_aio_list_active(aio)) {
if (ep->ze_aio != NULL) {
- nni_aio_cancel(ep->ze_aio, rv);
+ nni_aio_abort(ep->ze_aio, rv);
}
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -2370,7 +2371,7 @@ zt_ep_accept(void *arg, nni_aio *aio)
static void
zt_ep_conn_req_cancel(nni_aio *aio, int rv)
{
- zt_ep *ep = aio->a_prov_data;
+ zt_ep *ep = nni_aio_get_prov_data(aio);
// We don't have much to do here. The AIO will have been
// canceled as a result of the "parent" AIO canceling.
nni_mtx_lock(&zt_lk);