aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/transport
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c14
-rw-r--r--src/transport/ipc/ipc.c36
-rw-r--r--src/transport/tcp/tcp.c36
-rw-r--r--src/transport/tls/tls.c34
-rw-r--r--src/transport/ws/websocket.c29
-rw-r--r--src/transport/zerotier/zerotier.c68
6 files changed, 177 insertions, 40 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 8bfb097e..0f159d3a 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,6 +349,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -375,7 +376,11 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -407,6 +412,7 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -416,7 +422,11 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c91606c6..2347e24c 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,12 +418,17 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,14 +479,18 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- // Transports never have a zero length timeout.
- (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -496,11 +505,17 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_ipc_pipe *pipe = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_start, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
pipe->txhead[2] = 'P';
@@ -517,7 +532,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -728,6 +742,7 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -735,7 +750,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -746,6 +765,7 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -753,7 +773,11 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 3d738a98..f2cdf8ac 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -409,12 +409,17 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -465,12 +470,18 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -535,11 +546,17 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_tcp_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -556,7 +573,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -749,6 +765,7 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -756,7 +773,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -767,6 +788,7 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -774,7 +796,11 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 78fdd622..c863a85e 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -416,12 +416,17 @@ static void
nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tls_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -472,13 +477,18 @@ static void
nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
- nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -542,11 +552,17 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_tls_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -563,7 +579,6 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -769,13 +784,18 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
@@ -785,13 +805,17 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 9d967668..12f3aeb5 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -128,12 +128,17 @@ static void
ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_recv_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_rxaio = aio;
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
@@ -158,12 +163,17 @@ static void
ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_send_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_send_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -289,6 +299,7 @@ static void
ws_ep_accept(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -297,7 +308,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- nni_aio_schedule(aio, ws_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -309,6 +324,7 @@ static void
ws_ep_connect(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -326,9 +342,12 @@ ws_ep_connect(void *arg, nni_aio *aio)
}
nni_mtx_lock(&ep->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(nni_list_empty(&ep->aios));
-
- nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 54e402be..f7139b94 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1902,6 +1902,7 @@ static void
zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1912,7 +1913,11 @@ zt_pipe_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_user_rxaio = aio;
zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
@@ -2049,7 +2054,13 @@ zt_pipe_ping_cb(void *arg)
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ int rv;
+ rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ if (rv != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
p->zp_ping_try++;
@@ -2081,10 +2092,15 @@ zt_pipe_start(void *arg, nni_aio *aio)
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_begin(p->zp_ping_aio) == 0) {
- nni_aio_schedule(
+ int rv;
+ rv = nni_aio_schedule(
p->zp_ping_aio, zt_pipe_cancel_ping, p);
- p->zp_ping_active = 1;
- zt_pipe_send_ping(p);
+ if (rv != 0) {
+ nni_aio_finish_error(p->zp_ping_aio, rv);
+ } else {
+ p->zp_ping_active = 1;
+ zt_pipe_send_ping(p);
+ }
}
}
nni_aio_finish(aio, 0, 0);
@@ -2405,12 +2421,17 @@ static void
zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&zt_lk);
- nni_aio_schedule(aio, zt_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
@@ -2486,10 +2507,14 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
- ep->ze_creq_active = 1;
- ep->ze_creq_try++;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ ep->ze_creq_active = 1;
+ ep->ze_creq_try++;
+ zt_ep_send_conn_req(ep);
+ }
}
}
@@ -2523,18 +2548,27 @@ zt_ep_connect(void *arg, nni_aio *aio)
if ((ep->ze_raddr >> 24) == 0) {
ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
}
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
ep->ze_running = 1;
- nni_aio_schedule(aio, zt_ep_cancel, ep);
nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
if (nni_aio_begin(ep->ze_creq_aio) == 0) {
- nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(
+ ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(ep->ze_creq_aio, rv);
+ } else {
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
+ }
}
nni_mtx_unlock(&zt_lk);
}