diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 29 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 47 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 41 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 49 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 28 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 118 |
6 files changed, 150 insertions, 162 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 82f46fc4..8bfb097e 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; nni_inproc_ep *server; - int rv; + if (nni_aio_begin(aio) != 0) { + return; + } if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&nni_inproc.mx); - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&nni_inproc.mx); - return; - } - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); + // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; } + // We don't have to worry about the case where a zero timeout + // on connect was specified, as there is no option to specify + // that in the upper API. + nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -404,15 +407,17 @@ static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; - int rv; - nni_mtx_lock(&nni_inproc.mx); - - if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&nni_inproc.mx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&nni_inproc.mx); + + // We need not worry about the case where a non-blocking + // accept was tried -- there is no API to do such a thing. + nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index ecc9a962..61b89f20 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_mtx_lock(&pipe->mtx); - if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pipe->mtx); + nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe); nni_list_append(&pipe->sendq, aio); if (nni_list_first(&pipe->sendq) == aio) { nni_ipc_pipe_dosend(pipe, aio); @@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_mtx_lock(&pipe->mtx); - - if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { - nni_mtx_unlock(&pipe->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&pipe->mtx); + + // Transports never have a zero length timeout. + (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe); nni_list_append(&pipe->recvq, aio); if (nni_list_first(&pipe->recvq) == aio) { @@ -492,10 +493,12 @@ static void nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - int rv; nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; pipe->txhead[1] = 'S'; @@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &pipe->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); - if (rv != 0) { - nni_mtx_unlock(&pipe->mtx); - return; - } + nni_aio_schedule(aio, nni_ipc_cancel_start, pipe); nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -680,16 +679,14 @@ static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; - - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); ep->user_aio = aio; nni_plat_ipc_ep_accept(ep->iep, ep->aio); @@ -700,18 +697,14 @@ static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; - - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - // If we can't start, then its dying and we can't report - // either. - if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); ep->user_aio = aio; nni_plat_ipc_ep_connect(ep->iep, ep->aio); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 7ea77035..a551c5be 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tcp_cancel_tx, p); nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tcp_pipe_dosend(p, aio); @@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tcp_cancel_rx, p); nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tcp_pipe_dorecv(p); @@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&p->mtx); p->txlen[0] = 0; p->txlen[1] = 'S'; @@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } + nni_aio_schedule(aio, nni_tcp_cancel_nego, p); nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -721,16 +721,14 @@ static void nni_tcp_ep_accept(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); @@ -741,17 +739,14 @@ static void nni_tcp_ep_connect(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index a852ddd8..2ea14e93 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, nni_tls_cancel_tx, p); nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tls_pipe_dosend(p, aio); @@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; - nni_mtx_lock(&p->mtx); - - if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + + nni_aio_schedule(aio, nni_tls_cancel_rx, p); nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tls_pipe_dorecv(p); @@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) nni_aio * negaio; nni_iov iov; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&p->mtx); p->txlen[0] = 0; p->txlen[1] = 'S'; @@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) { - nni_mtx_unlock(&p->mtx); - return; - } + nni_aio_schedule(aio, nni_tls_cancel_nego, p); nni_tls_send(p->tls, negaio); nni_mtx_unlock(&p->mtx); } @@ -743,18 +741,14 @@ static void nni_tls_ep_accept(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tls_cancel_ep, ep); ep->user_aio = aio; - nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } @@ -763,19 +757,14 @@ static void nni_tls_ep_connect(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; - int rv; - nni_mtx_lock(&ep->mtx); - NNI_ASSERT(ep->user_aio == NULL); - - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } - + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + nni_aio_schedule(aio, nni_tls_cancel_ep, ep); ep->user_aio = aio; - nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); } diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 0542d0c7..761ba824 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, ws_pipe_recv_cancel, p); p->user_rxaio = aio; - nni_ws_recv_msg(p->ws, p->rxaio); nni_mtx_unlock(&p->mtx); } @@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; - nni_mtx_lock(&p->mtx); - if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) { - nni_mtx_unlock(&p->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&p->mtx); + nni_aio_schedule(aio, ws_pipe_send_cancel, p); p->user_txaio = aio; nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); @@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio) // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. // Otherwise we stick the AIO in the accept list. - nni_mtx_lock(&ep->mtx); - if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) { - nni_mtx_unlock(&ep->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&ep->mtx); + nni_aio_schedule(aio, ws_ep_cancel, ep); nni_list_append(&ep->aios, aio); if (aio == nni_list_first(&ep->aios)) { nni_ws_listener_accept(ep->listener, ep->accaio); @@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio) int rv = 0; ws_hdr *h; + if (nni_aio_begin(aio) != 0) { + return; + } if (!ep->started) { NNI_LIST_FOREACH (&ep->headers, h) { rv = nni_ws_dialer_header( @@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(nni_list_empty(&ep->aios)); - // If we can't start, then its dying and we can't report either. - if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } + nni_aio_schedule(aio, ws_ep_cancel, ep); ep->started = true; nni_list_append(&ep->aios, aio); nni_ws_dialer_dial(ep->dialer, ep->connaio); diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d0790b37..05866cfb 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio) size_t bytes; nni_msg *m; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, NULL, p) != 0) { - nni_mtx_unlock(&zt_lk); + if (nni_aio_begin(aio) != 0) { + return; + } + if ((m = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); return; } + nni_mtx_lock(&zt_lk); + if (p->zp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data); - if ((m = nni_aio_get_msg(aio)) == NULL) { - nni_aio_finish_error(aio, NNG_EINVAL); - nni_mtx_unlock(&zt_lk); - return; - }; - bytes = nni_msg_header_len(m) + nni_msg_len(m); if (bytes >= (0xfffe * fragsz)) { nni_aio_finish_error(aio, NNG_EMSGSIZE); @@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio) zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr, p->zp_laddr, data, fraglen + zt_offset_data_data); } while (nni_msg_len(m) != 0); + nni_mtx_unlock(&zt_lk); + + // NB, We never bothered to call nn_aio_sched, because we run this + // synchronously, relying on UDP to simply discard messages if we + // cannot deliver them. This means that pipe send operations with + // this transport are not cancellable. nni_aio_set_msg(aio, NULL); nni_msg_free(m); nni_aio_finish(aio, 0, offset); - nni_mtx_unlock(&zt_lk); } static void @@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio) { zt_pipe *p = arg; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) { - nni_mtx_unlock(&zt_lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&zt_lk); if (p->zp_closed) { + nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, NNG_ECLOSED); - } else { - p->zp_user_rxaio = aio; - zt_pipe_dorecv(p); + return; } + nni_aio_schedule(aio, zt_pipe_cancel_recv, p); + p->zp_user_rxaio = aio; + zt_pipe_dorecv(p); nni_mtx_unlock(&zt_lk); } @@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg) // use the the timer to wake us up even if we aren't // going to send a ping. (We don't increment the try count // unless we actually do send one though.) - if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + if (nni_aio_begin(aio) == 0) { + nni_aio_schedule(aio, zt_pipe_cancel_ping, p); p->zp_ping_active = 1; if (now > (p->zp_last_recv + p->zp_ping_time)) { - // We have to send a ping to keep the session - // up. p->zp_ping_try++; zt_pipe_send_ping(p); } @@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio) { zt_pipe *p = arg; + if (nni_aio_begin(aio) != 0) { + return; + } nni_mtx_lock(&zt_lk); p->zp_ping_active = 0; // send a gratuitous ping, and start the ping interval timer. @@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio) (p->zp_ping_aio != NULL)) { p->zp_ping_try = 0; nni_aio_set_timeout(aio, p->zp_ping_time); - if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) == - 0) { + if (nni_aio_begin(p->zp_ping_aio) == 0) { + nni_aio_schedule( + p->zp_ping_aio, zt_pipe_cancel_ping, p); p->zp_ping_active = 1; zt_pipe_send_ping(p); } @@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio) { zt_ep *ep = arg; - nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { - nni_aio_list_append(&ep->ze_aios, aio); - zt_ep_doaccept(ep); + if (nni_aio_begin(aio) != 0) { + return; } + nni_mtx_lock(&zt_lk); + nni_aio_schedule(aio, zt_ep_cancel, ep); + nni_aio_list_append(&ep->ze_aios, aio); + zt_ep_doaccept(ep); nni_mtx_unlock(&zt_lk); } @@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg) if (nni_list_first(&ep->ze_aios) != NULL) { nni_aio_set_timeout(aio, ep->ze_conn_time); - if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) { + if (nni_aio_begin(aio) == 0) { + nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); ep->ze_creq_active = 1; ep->ze_creq_try++; zt_ep_send_conn_req(ep); @@ -2493,43 +2503,41 @@ static void zt_ep_connect(void *arg, nni_aio *aio) { zt_ep *ep = arg; + int rv; + if (nni_aio_begin(aio) != 0) { + return; + } // We bind locally. We'll use the address later when we give // it to the pipe, but this allows us to receive the initial // ack back from the server. (This gives us an ephemeral // address to work with.) nni_mtx_lock(&zt_lk); - if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { - int rv; - - // Clear the port so we get an ephemeral port. - ep->ze_laddr &= ~((uint64_t) zt_port_mask); - - if ((rv = zt_ep_bind_locked(ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&zt_lk); - return; - } - - if ((ep->ze_raddr >> 24) == 0) { - ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); - } - nni_aio_list_append(&ep->ze_aios, aio); - - ep->ze_running = 1; - - nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); + // Clear the port so we get an ephemeral port. + ep->ze_laddr &= ~((uint64_t) zt_port_mask); - if (nni_aio_start( - ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) { + if ((rv = zt_ep_bind_locked(ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } - // Send out the first connect message; if not - // yet attached to network message will be dropped. - ep->ze_creq_try = 1; - ep->ze_creq_active = 1; - zt_ep_send_conn_req(ep); - } + if ((ep->ze_raddr >> 24) == 0) { + ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); + } + nni_aio_list_append(&ep->ze_aios, aio); + ep->ze_running = 1; + nni_aio_schedule(aio, zt_ep_cancel, ep); + + nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); + if (nni_aio_begin(ep->ze_creq_aio) == 0) { + nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + // Send out the first connect message; if not + // yet attached to network message will be dropped. + ep->ze_creq_try = 1; + ep->ze_creq_active = 1; + zt_ep_send_conn_req(ep); } nni_mtx_unlock(&zt_lk); } |
