aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c29
-rw-r--r--src/transport/ipc/ipc.c47
-rw-r--r--src/transport/tcp/tcp.c41
-rw-r--r--src/transport/tls/tls.c49
-rw-r--r--src/transport/ws/websocket.c28
-rw-r--r--src/transport/zerotier/zerotier.c118
6 files changed, 150 insertions, 162 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 82f46fc4..8bfb097e 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
- int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
nni_mtx_lock(&nni_inproc.mx);
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
+ // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
+ // We don't have to worry about the case where a zero timeout
+ // on connect was specified, as there is no option to specify
+ // that in the upper API.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -404,15 +407,17 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&nni_inproc.mx);
-
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&nni_inproc.mx);
+
+ // We need not worry about the case where a non-blocking
+ // accept was tried -- there is no API to do such a thing.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ecc9a962..61b89f20 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+ nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+
+ // Transports never have a zero length timeout.
+ (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -492,10 +493,12 @@ static void
nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- int rv;
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
@@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -680,16 +679,14 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -700,18 +697,14 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report
- // either.
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7ea77035..a551c5be 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -721,16 +721,14 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -741,17 +739,14 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index a852ddd8..2ea14e93 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tls_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+
+ nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -743,18 +741,14 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -763,19 +757,14 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 0542d0c7..761ba824 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
p->user_rxaio = aio;
-
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_send_cancel, p);
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
// Otherwise we stick the AIO in the accept list.
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio)
int rv = 0;
ws_hdr *h;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (!ep->started) {
NNI_LIST_FOREACH (&ep->headers, h) {
rv = nni_ws_dialer_header(
@@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(nni_list_empty(&ep->aios));
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index d0790b37..05866cfb 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio)
size_t bytes;
nni_msg *m;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, NULL, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if ((m = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
+ nni_mtx_lock(&zt_lk);
+
if (p->zp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data);
- if ((m = nni_aio_get_msg(aio)) == NULL) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- nni_mtx_unlock(&zt_lk);
- return;
- };
-
bytes = nni_msg_header_len(m) + nni_msg_len(m);
if (bytes >= (0xfffe * fragsz)) {
nni_aio_finish_error(aio, NNG_EMSGSIZE);
@@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio)
zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr,
p->zp_laddr, data, fraglen + zt_offset_data_data);
} while (nni_msg_len(m) != 0);
+ nni_mtx_unlock(&zt_lk);
+
+ // NB, We never bothered to call nn_aio_sched, because we run this
+ // synchronously, relying on UDP to simply discard messages if we
+ // cannot deliver them. This means that pipe send operations with
+ // this transport are not cancellable.
nni_aio_set_msg(aio, NULL);
nni_msg_free(m);
nni_aio_finish(aio, 0, offset);
- nni_mtx_unlock(&zt_lk);
}
static void
@@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&zt_lk);
if (p->zp_closed) {
+ nni_mtx_unlock(&zt_lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
- } else {
- p->zp_user_rxaio = aio;
- zt_pipe_dorecv(p);
+ return;
}
+ nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ p->zp_user_rxaio = aio;
+ zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
}
@@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg)
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
- if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
- // We have to send a ping to keep the session
- // up.
p->zp_ping_try++;
zt_pipe_send_ping(p);
}
@@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
@@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
(p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
- if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
- 0) {
+ if (nni_aio_begin(p->zp_ping_aio) == 0) {
+ nni_aio_schedule(
+ p->zp_ping_aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
zt_pipe_send_ping(p);
}
@@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_aio_list_append(&ep->ze_aios, aio);
- zt_ep_doaccept(ep);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&zt_lk);
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+ nni_aio_list_append(&ep->ze_aios, aio);
+ zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
}
@@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
- if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
ep->ze_creq_active = 1;
ep->ze_creq_try++;
zt_ep_send_conn_req(ep);
@@ -2493,43 +2503,41 @@ static void
zt_ep_connect(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
// We bind locally. We'll use the address later when we give
// it to the pipe, but this allows us to receive the initial
// ack back from the server. (This gives us an ephemeral
// address to work with.)
nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- int rv;
-
- // Clear the port so we get an ephemeral port.
- ep->ze_laddr &= ~((uint64_t) zt_port_mask);
-
- if ((rv = zt_ep_bind_locked(ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&zt_lk);
- return;
- }
-
- if ((ep->ze_raddr >> 24) == 0) {
- ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
- }
- nni_aio_list_append(&ep->ze_aios, aio);
-
- ep->ze_running = 1;
-
- nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ // Clear the port so we get an ephemeral port.
+ ep->ze_laddr &= ~((uint64_t) zt_port_mask);
- if (nni_aio_start(
- ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if ((rv = zt_ep_bind_locked(ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
- }
+ if ((ep->ze_raddr >> 24) == 0) {
+ ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
+ }
+ nni_aio_list_append(&ep->ze_aios, aio);
+ ep->ze_running = 1;
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+
+ nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ if (nni_aio_begin(ep->ze_creq_aio) == 0) {
+ nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
}
nni_mtx_unlock(&zt_lk);
}