aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-18 20:38:00 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-20 07:34:16 -0700
commit5902d02ad0a056a146231568f1293ffbcd59f61c (patch)
treebe38584c02d703ec2322ab941d4d723c752fe187 /src/transport
parent40542e7af0f5003d7ad67876ea580a59174031ca (diff)
downloadnng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.gz
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.bz2
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.zip
fixes #346 nng_recv() sometimes acts on null `msg` pointer
This closes a fundamental flaw in the way aio structures were handled. In paticular, aio expiration could race ahead, and fire before the aio was properly registered by the provider. This ultimately led to the possibility of duplicate completions on the same aio. The solution involved breaking up nni_aio_start into two functions. nni_aio_begin (which can be run outside of external locks) simply validates that nni_aio_fini() has not been called, and clears certain fields in the aio to make it ready for use by the provider. nni_aio_schedule does the work to register the aio with the expiration thread, and should only be called when the aio is actually scheduled for asynchronous completion. nni_aio_schedule_verify does the same thing, but returns NNG_ETIMEDOUT if the aio has a zero length timeout. This change has a small negative performance impact. We have plans to rectify that by converting nni_aio_begin to use a locklesss flag for the aio->a_fini bit. While we were here, we fixed some error paths in the POSIX subsystem, which would have returned incorrect error codes, and we made some optmizations in the message queues to reduce conditionals while holding locks in the hot code path.
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c29
-rw-r--r--src/transport/ipc/ipc.c47
-rw-r--r--src/transport/tcp/tcp.c41
-rw-r--r--src/transport/tls/tls.c49
-rw-r--r--src/transport/ws/websocket.c28
-rw-r--r--src/transport/zerotier/zerotier.c118
6 files changed, 150 insertions, 162 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 82f46fc4..8bfb097e 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
- int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
nni_mtx_lock(&nni_inproc.mx);
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
+ // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
+ // We don't have to worry about the case where a zero timeout
+ // on connect was specified, as there is no option to specify
+ // that in the upper API.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -404,15 +407,17 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&nni_inproc.mx);
-
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&nni_inproc.mx);
+
+ // We need not worry about the case where a non-blocking
+ // accept was tried -- there is no API to do such a thing.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ecc9a962..61b89f20 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+ nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+
+ // Transports never have a zero length timeout.
+ (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -492,10 +493,12 @@ static void
nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- int rv;
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
@@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -680,16 +679,14 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -700,18 +697,14 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report
- // either.
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7ea77035..a551c5be 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -721,16 +721,14 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -741,17 +739,14 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index a852ddd8..2ea14e93 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tls_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+
+ nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -743,18 +741,14 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -763,19 +757,14 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 0542d0c7..761ba824 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
p->user_rxaio = aio;
-
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_send_cancel, p);
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
// Otherwise we stick the AIO in the accept list.
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio)
int rv = 0;
ws_hdr *h;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (!ep->started) {
NNI_LIST_FOREACH (&ep->headers, h) {
rv = nni_ws_dialer_header(
@@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(nni_list_empty(&ep->aios));
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index d0790b37..05866cfb 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio)
size_t bytes;
nni_msg *m;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, NULL, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if ((m = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
+ nni_mtx_lock(&zt_lk);
+
if (p->zp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data);
- if ((m = nni_aio_get_msg(aio)) == NULL) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- nni_mtx_unlock(&zt_lk);
- return;
- };
-
bytes = nni_msg_header_len(m) + nni_msg_len(m);
if (bytes >= (0xfffe * fragsz)) {
nni_aio_finish_error(aio, NNG_EMSGSIZE);
@@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio)
zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr,
p->zp_laddr, data, fraglen + zt_offset_data_data);
} while (nni_msg_len(m) != 0);
+ nni_mtx_unlock(&zt_lk);
+
+ // NB, We never bothered to call nn_aio_sched, because we run this
+ // synchronously, relying on UDP to simply discard messages if we
+ // cannot deliver them. This means that pipe send operations with
+ // this transport are not cancellable.
nni_aio_set_msg(aio, NULL);
nni_msg_free(m);
nni_aio_finish(aio, 0, offset);
- nni_mtx_unlock(&zt_lk);
}
static void
@@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&zt_lk);
if (p->zp_closed) {
+ nni_mtx_unlock(&zt_lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
- } else {
- p->zp_user_rxaio = aio;
- zt_pipe_dorecv(p);
+ return;
}
+ nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ p->zp_user_rxaio = aio;
+ zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
}
@@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg)
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
- if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
- // We have to send a ping to keep the session
- // up.
p->zp_ping_try++;
zt_pipe_send_ping(p);
}
@@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
@@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
(p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
- if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
- 0) {
+ if (nni_aio_begin(p->zp_ping_aio) == 0) {
+ nni_aio_schedule(
+ p->zp_ping_aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
zt_pipe_send_ping(p);
}
@@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_aio_list_append(&ep->ze_aios, aio);
- zt_ep_doaccept(ep);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&zt_lk);
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+ nni_aio_list_append(&ep->ze_aios, aio);
+ zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
}
@@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
- if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
ep->ze_creq_active = 1;
ep->ze_creq_try++;
zt_ep_send_conn_req(ep);
@@ -2493,43 +2503,41 @@ static void
zt_ep_connect(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
// We bind locally. We'll use the address later when we give
// it to the pipe, but this allows us to receive the initial
// ack back from the server. (This gives us an ephemeral
// address to work with.)
nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- int rv;
-
- // Clear the port so we get an ephemeral port.
- ep->ze_laddr &= ~((uint64_t) zt_port_mask);
-
- if ((rv = zt_ep_bind_locked(ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&zt_lk);
- return;
- }
-
- if ((ep->ze_raddr >> 24) == 0) {
- ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
- }
- nni_aio_list_append(&ep->ze_aios, aio);
-
- ep->ze_running = 1;
-
- nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ // Clear the port so we get an ephemeral port.
+ ep->ze_laddr &= ~((uint64_t) zt_port_mask);
- if (nni_aio_start(
- ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if ((rv = zt_ep_bind_locked(ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
- }
+ if ((ep->ze_raddr >> 24) == 0) {
+ ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
+ }
+ nni_aio_list_append(&ep->ze_aios, aio);
+ ep->ze_running = 1;
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+
+ nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ if (nni_aio_begin(ep->ze_creq_aio) == 0) {
+ nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
}
nni_mtx_unlock(&zt_lk);
}