diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 145 |
1 files changed, 89 insertions, 56 deletions
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 75e25dd8..ddb8779e 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -211,6 +211,7 @@ struct zt_pipe { zt_fraglist zp_recvq[zt_recvq]; int zp_ping_try; int zp_ping_count; + int zp_ping_active; nni_duration zp_ping_time; nni_aio * zp_ping_aio; uint8_t * zp_send_buf; @@ -238,6 +239,7 @@ struct zt_ep { size_t ze_rcvmax; nni_aio * ze_aio; nni_aio * ze_creq_aio; + int ze_creq_active; int ze_creq_try; nni_list ze_aios; int ze_mtu; @@ -783,8 +785,9 @@ zt_ep_recv_error(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) break; } - if (ep->ze_creq_try > 0) { - ep->ze_creq_try = 0; + if (ep->ze_creq_active) { + ep->ze_creq_try = 0; + ep->ze_creq_active = 0; nni_aio_finish_error(ep->ze_creq_aio, code); } } @@ -1817,12 +1820,11 @@ zt_pipe_cancel_recv(nni_aio *aio, int rv) { zt_pipe *p = aio->a_prov_data; nni_mtx_lock(&zt_lk); - if (p->zp_user_rxaio != aio) { - nni_mtx_unlock(&zt_lk); + if (p->zp_user_rxaio == aio) { + p->zp_user_rxaio = NULL; + nni_aio_finish_error(aio, rv); } - p->zp_user_rxaio = NULL; nni_mtx_unlock(&zt_lk); - nni_aio_finish_error(aio, rv); } static void @@ -1970,7 +1972,14 @@ zt_pipe_get_node(void *arg, void *buf, size_t *szp) static void zt_pipe_cancel_ping(nni_aio *aio, int rv) { - nni_aio_finish_error(aio, rv); + zt_pipe *p = aio->a_prov_data; + + nni_mtx_lock(&zt_lk); + if (p->zp_ping_active) { + p->zp_ping_active = 0; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&zt_lk); } static void @@ -1980,6 +1989,8 @@ zt_pipe_ping_cb(void *arg) nni_aio *aio = p->zp_ping_aio; nni_mtx_lock(&zt_lk); + + p->zp_ping_active = 0; if (p->zp_closed || aio == NULL || (p->zp_ping_count == 0) || (p->zp_ping_time == NNI_TIME_NEVER) || (p->zp_ping_time == NNI_TIME_ZERO)) { @@ -1993,20 +2004,22 @@ zt_pipe_ping_cb(void *arg) if (p->zp_ping_try < p->zp_ping_count) { nni_time now = nni_clock(); nni_aio_set_timeout(aio, now + p->zp_ping_time); - if (now > (p->zp_last_recv + p->zp_ping_time)) { - // We have to send a ping to keep the session up. - if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + // We want pings. We only send one if needed, but we + // use the the timer to wake us up even if we aren't + // going to send a ping. (We don't increment the try count + // unless we actually do send one though.) + if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) { + p->zp_ping_active = 1; + if (now > (p->zp_last_recv + p->zp_ping_time)) { + // We have to send a ping to keep the session + // up. p->zp_ping_try++; zt_pipe_send_ping(p); } - } else { - // We still need the timer to wake us up in case - // we haven't seen traffic for a while. - nni_aio_start(aio, zt_pipe_cancel_ping, p); } } else { - // Close the pipe, but no need to send a reason to the - // peer, it is already AFK. + // Ping count exceeded; the other side is AFK. + // Close the pipe, but no need to send a reason to the peer. zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL); } nni_mtx_unlock(&zt_lk); @@ -2018,13 +2031,17 @@ zt_pipe_start(void *arg, nni_aio *aio) zt_pipe *p = arg; nni_mtx_lock(&zt_lk); + p->zp_ping_active = 0; // send a gratuitous ping, and start the ping interval timer. if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) && (p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) { p->zp_ping_try = 0; nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time); - nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p); - zt_pipe_send_ping(p); + if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) == + 0) { + p->zp_ping_active = 1; + zt_pipe_send_ping(p); + } } nni_aio_finish(aio, 0, 0); nni_mtx_unlock(&zt_lk); @@ -2380,9 +2397,15 @@ zt_ep_accept(void *arg, nni_aio *aio) static void zt_ep_conn_req_cancel(nni_aio *aio, int rv) { + zt_ep *ep = aio->a_prov_data; // We don't have much to do here. The AIO will have been // canceled as a result of the "parent" AIO canceling. - nni_aio_finish_error(aio, rv); + nni_mtx_lock(&zt_lk); + if (ep->ze_creq_active) { + ep->ze_creq_active = 0; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&zt_lk); } static void @@ -2397,43 +2420,53 @@ zt_ep_conn_req_cb(void *arg) NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL); nni_mtx_lock(&zt_lk); - rv = nni_aio_result(aio); - switch (rv) { + + ep->ze_creq_active = 0; + switch ((rv = nni_aio_result(aio))) { case 0: // Already canceled, or already handled? - if (((uaio = nni_list_first(&ep->ze_aios)) == NULL) || - ((p = nni_aio_get_pipe(aio)) == NULL)) { - nni_mtx_unlock(&zt_lk); - return; + if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_pipe(uaio, nni_aio_get_pipe(aio)); + } else { + // We have a pipe, but nowhere to stick it. + // Just discard it. + zt_pipe_fini(nni_aio_get_pipe(aio)); } ep->ze_creq_try = 0; - nni_aio_list_remove(uaio); - nni_aio_finish_pipe(uaio, p); - nni_mtx_unlock(&zt_lk); - return; + break; case NNG_ETIMEDOUT: - if (ep->ze_creq_try <= zt_conn_attempts) { - // Timed out, but we can try again. - ep->ze_creq_try++; - nni_aio_set_timeout( - aio, nni_clock() + zt_conn_interval); - nni_aio_start(aio, zt_ep_conn_req_cancel, ep); - zt_ep_send_conn_req(ep); - nni_mtx_unlock(&zt_lk); - return; + if (ep->ze_creq_try > zt_conn_attempts) { + // Final timeout attempt. + if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + // reset the counter. + ep->ze_creq_try = 0; + } } break; - } - // These are failure modes. Either we timed out too many - // times, or an error occurred. + default: + // Failed hard? + if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + ep->ze_creq_try = 0; + break; + } - ep->ze_creq_try = 0; - while ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { - nni_aio_list_remove(uaio); - nni_aio_finish_error(uaio, rv); + if (nni_list_first(&ep->ze_aios) != NULL) { + nni_aio_set_timeout(aio, nni_clock() + zt_conn_interval); + if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) { + ep->ze_creq_active = 1; + ep->ze_creq_try++; + zt_ep_send_conn_req(ep); + } } + nni_mtx_unlock(&zt_lk); } @@ -2466,19 +2499,19 @@ zt_ep_connect(void *arg, nni_aio *aio) } nni_aio_list_append(&ep->ze_aios, aio); - ep->ze_creq_try = 1; - ep->ze_running = 1; + ep->ze_running = 1; nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval); - // This can't fail -- the only way the ze_creq_aio gets - // terminated would have required us to have also - // canceled the user AIO and held the lock. - (void) nni_aio_start( - ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); - - // We send out the first connect message; it we are not - // yet attached to the network the message will be dropped. - zt_ep_send_conn_req(ep); + + if (nni_aio_start( + ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) { + + // Send out the first connect message; if not + // yet attached to network message will be dropped. + ep->ze_creq_try = 1; + ep->ze_creq_active = 1; + zt_ep_send_conn_req(ep); + } } nni_mtx_unlock(&zt_lk); } |
