aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c14
-rw-r--r--src/transport/ipc/ipc.c36
-rw-r--r--src/transport/tcp/tcp.c36
-rw-r--r--src/transport/tls/tls.c34
-rw-r--r--src/transport/ws/websocket.c29
-rw-r--r--src/transport/zerotier/zerotier.c68
6 files changed, 177 insertions, 40 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 8bfb097e..0f159d3a 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,6 +349,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -375,7 +376,11 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -407,6 +412,7 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -416,7 +422,11 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c91606c6..2347e24c 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,12 +418,17 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,14 +479,18 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- // Transports never have a zero length timeout.
- (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -496,11 +505,17 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_ipc_pipe *pipe = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_start, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
pipe->txhead[2] = 'P';
@@ -517,7 +532,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -728,6 +742,7 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -735,7 +750,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -746,6 +765,7 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -753,7 +773,11 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 3d738a98..f2cdf8ac 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -409,12 +409,17 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -465,12 +470,18 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -535,11 +546,17 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_tcp_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -556,7 +573,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -749,6 +765,7 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -756,7 +773,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -767,6 +788,7 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -774,7 +796,11 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 78fdd622..c863a85e 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -416,12 +416,17 @@ static void
nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tls_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -472,13 +477,18 @@ static void
nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
- nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -542,11 +552,17 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_tls_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -563,7 +579,6 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -769,13 +784,18 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
@@ -785,13 +805,17 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 9d967668..12f3aeb5 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -128,12 +128,17 @@ static void
ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_recv_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_rxaio = aio;
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
@@ -158,12 +163,17 @@ static void
ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_send_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_send_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -289,6 +299,7 @@ static void
ws_ep_accept(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -297,7 +308,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- nni_aio_schedule(aio, ws_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -309,6 +324,7 @@ static void
ws_ep_connect(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -326,9 +342,12 @@ ws_ep_connect(void *arg, nni_aio *aio)
}
nni_mtx_lock(&ep->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(nni_list_empty(&ep->aios));
-
- nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 54e402be..f7139b94 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1902,6 +1902,7 @@ static void
zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1912,7 +1913,11 @@ zt_pipe_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_user_rxaio = aio;
zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
@@ -2049,7 +2054,13 @@ zt_pipe_ping_cb(void *arg)
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ int rv;
+ rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ if (rv != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
p->zp_ping_try++;
@@ -2081,10 +2092,15 @@ zt_pipe_start(void *arg, nni_aio *aio)
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_begin(p->zp_ping_aio) == 0) {
- nni_aio_schedule(
+ int rv;
+ rv = nni_aio_schedule(
p->zp_ping_aio, zt_pipe_cancel_ping, p);
- p->zp_ping_active = 1;
- zt_pipe_send_ping(p);
+ if (rv != 0) {
+ nni_aio_finish_error(p->zp_ping_aio, rv);
+ } else {
+ p->zp_ping_active = 1;
+ zt_pipe_send_ping(p);
+ }
}
}
nni_aio_finish(aio, 0, 0);
@@ -2405,12 +2421,17 @@ static void
zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&zt_lk);
- nni_aio_schedule(aio, zt_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
@@ -2486,10 +2507,14 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
- ep->ze_creq_active = 1;
- ep->ze_creq_try++;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ ep->ze_creq_active = 1;
+ ep->ze_creq_try++;
+ zt_ep_send_conn_req(ep);
+ }
}
}
@@ -2523,18 +2548,27 @@ zt_ep_connect(void *arg, nni_aio *aio)
if ((ep->ze_raddr >> 24) == 0) {
ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
}
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
ep->ze_running = 1;
- nni_aio_schedule(aio, zt_ep_cancel, ep);
nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
if (nni_aio_begin(ep->ze_creq_aio) == 0) {
- nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(
+ ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(ep->ze_creq_aio, rv);
+ } else {
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
+ }
}
nni_mtx_unlock(&zt_lk);
}