diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 14 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 36 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 36 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 34 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 29 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 68 |
6 files changed, 177 insertions, 40 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 8bfb097e..0f159d3a 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -349,6 +349,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; nni_inproc_ep *server; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -375,7 +376,11 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. - nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -407,6 +412,7 @@ static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -416,7 +422,11 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. - nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c91606c6..2347e24c 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -418,12 +418,17 @@ static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); - nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&pipe->sendq, aio); if (nni_list_first(&pipe->sendq) == aio) { nni_ipc_pipe_dosend(pipe, aio); @@ -474,14 +479,18 @@ static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); - // Transports never have a zero length timeout. - (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&pipe->recvq, aio); if (nni_list_first(&pipe->recvq) == aio) { @@ -496,11 +505,17 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_ipc_pipe *pipe = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_start, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } pipe->txhead[0] = 0; pipe->txhead[1] = 'S'; pipe->txhead[2] = 'P'; @@ -517,7 +532,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &pipe->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_ipc_cancel_start, pipe); nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -728,6 +742,7 @@ static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -735,7 +750,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_ipc_ep_accept(ep->iep, ep->aio); @@ -746,6 +765,7 @@ static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -753,7 +773,11 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_ipc_ep_connect(ep->iep, ep->aio); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 3d738a98..f2cdf8ac 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -409,12 +409,17 @@ static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tcp_cancel_tx, p); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_tx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tcp_pipe_dosend(p, aio); @@ -465,12 +470,18 @@ static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tcp_cancel_rx, p); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_rx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tcp_pipe_dorecv(p); @@ -535,11 +546,17 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_tcp_pipe *p = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_nego, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->txlen[0] = 0; p->txlen[1] = 'S'; p->txlen[2] = 'P'; @@ -556,7 +573,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_tcp_cancel_nego, p); nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -749,6 +765,7 @@ static void nni_tcp_ep_accept(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -756,7 +773,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); @@ -767,6 +788,7 @@ static void nni_tcp_ep_connect(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -774,7 +796,11 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 78fdd622..c863a85e 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -416,12 +416,17 @@ static void nni_tls_pipe_send(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tls_cancel_tx, p); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_tx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tls_pipe_dosend(p, aio); @@ -472,13 +477,18 @@ static void nni_tls_pipe_recv(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_rx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } - nni_aio_schedule(aio, nni_tls_cancel_rx, p); nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tls_pipe_dorecv(p); @@ -542,11 +552,17 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) nni_tls_pipe *p = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_nego, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->txlen[0] = 0; p->txlen[1] = 'S'; p->txlen[2] = 'P'; @@ -563,7 +579,6 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_tls_cancel_nego, p); nni_tls_send(p->tls, negaio); nni_mtx_unlock(&p->mtx); } @@ -769,13 +784,18 @@ static void nni_tls_ep_accept(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tls_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); @@ -785,13 +805,17 @@ static void nni_tls_ep_connect(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tls_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + } ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 9d967668..12f3aeb5 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -128,12 +128,17 @@ static void ws_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, ws_pipe_recv_cancel, p); + if ((rv = nni_aio_schedule(aio, ws_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->user_rxaio = aio; nni_ws_recv_msg(p->ws, p->rxaio); nni_mtx_unlock(&p->mtx); @@ -158,12 +163,17 @@ static void ws_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, ws_pipe_send_cancel, p); + if ((rv = nni_aio_schedule(aio, ws_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->user_txaio = aio; nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); @@ -289,6 +299,7 @@ static void ws_ep_accept(void *arg, nni_aio *aio) { ws_ep *ep = arg; + int rv; // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. @@ -297,7 +308,11 @@ ws_ep_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - nni_aio_schedule(aio, ws_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&ep->aios, aio); if (aio == nni_list_first(&ep->aios)) { nni_ws_listener_accept(ep->listener, ep->accaio); @@ -309,6 +324,7 @@ static void ws_ep_connect(void *arg, nni_aio *aio) { ws_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -326,9 +342,12 @@ ws_ep_connect(void *arg, nni_aio *aio) } nni_mtx_lock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } NNI_ASSERT(nni_list_empty(&ep->aios)); - - nni_aio_schedule(aio, ws_ep_cancel, ep); ep->started = true; nni_list_append(&ep->aios, aio); nni_ws_dialer_dial(ep->dialer, ep->connaio); diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 54e402be..f7139b94 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1902,6 +1902,7 @@ static void zt_pipe_recv(void *arg, nni_aio *aio) { zt_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -1912,7 +1913,11 @@ zt_pipe_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_schedule(aio, zt_pipe_cancel_recv, p); + if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } p->zp_user_rxaio = aio; zt_pipe_dorecv(p); nni_mtx_unlock(&zt_lk); @@ -2049,7 +2054,13 @@ zt_pipe_ping_cb(void *arg) // going to send a ping. (We don't increment the try count // unless we actually do send one though.) if (nni_aio_begin(aio) == 0) { - nni_aio_schedule(aio, zt_pipe_cancel_ping, p); + int rv; + rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p); + if (rv != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } p->zp_ping_active = 1; if (now > (p->zp_last_recv + p->zp_ping_time)) { p->zp_ping_try++; @@ -2081,10 +2092,15 @@ zt_pipe_start(void *arg, nni_aio *aio) p->zp_ping_try = 0; nni_aio_set_timeout(aio, p->zp_ping_time); if (nni_aio_begin(p->zp_ping_aio) == 0) { - nni_aio_schedule( + int rv; + rv = nni_aio_schedule( p->zp_ping_aio, zt_pipe_cancel_ping, p); - p->zp_ping_active = 1; - zt_pipe_send_ping(p); + if (rv != 0) { + nni_aio_finish_error(p->zp_ping_aio, rv); + } else { + p->zp_ping_active = 1; + zt_pipe_send_ping(p); + } } } nni_aio_finish(aio, 0, 0); @@ -2405,12 +2421,17 @@ static void zt_ep_accept(void *arg, nni_aio *aio) { zt_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&zt_lk); - nni_aio_schedule(aio, zt_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&ep->ze_aios, aio); zt_ep_doaccept(ep); nni_mtx_unlock(&zt_lk); @@ -2486,10 +2507,14 @@ zt_ep_conn_req_cb(void *arg) if (nni_list_first(&ep->ze_aios) != NULL) { nni_aio_set_timeout(aio, ep->ze_conn_time); if (nni_aio_begin(aio) == 0) { - nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); - ep->ze_creq_active = 1; - ep->ze_creq_try++; - zt_ep_send_conn_req(ep); + rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); + if (rv != 0) { + nni_aio_finish_error(aio, rv); + } else { + ep->ze_creq_active = 1; + ep->ze_creq_try++; + zt_ep_send_conn_req(ep); + } } } @@ -2523,18 +2548,27 @@ zt_ep_connect(void *arg, nni_aio *aio) if ((ep->ze_raddr >> 24) == 0) { ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); } + if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } nni_aio_list_append(&ep->ze_aios, aio); ep->ze_running = 1; - nni_aio_schedule(aio, zt_ep_cancel, ep); nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); if (nni_aio_begin(ep->ze_creq_aio) == 0) { - nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); - // Send out the first connect message; if not - // yet attached to network message will be dropped. - ep->ze_creq_try = 1; - ep->ze_creq_active = 1; - zt_ep_send_conn_req(ep); + rv = nni_aio_schedule( + ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + if (rv != 0) { + nni_aio_finish_error(ep->ze_creq_aio, rv); + } else { + // Send out the first connect message; if not + // yet attached to network message will be dropped. + ep->ze_creq_try = 1; + ep->ze_creq_active = 1; + zt_ep_send_conn_req(ep); + } } nni_mtx_unlock(&zt_lk); } |
