diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 158 |
1 files changed, 62 insertions, 96 deletions
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index a2163e3f..d1072931 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -183,27 +183,28 @@ struct zt_fraglist { }; struct zt_pipe { - nni_list_node zp_link; - const char * zp_addr; - zt_node * zp_ztn; - uint64_t zp_nwid; - uint64_t zp_laddr; - uint64_t zp_raddr; - uint16_t zp_peer; - uint16_t zp_proto; - uint16_t zp_next_msgid; - size_t zp_rcvmax; - size_t zp_mtu; - int zp_closed; - nni_aio * zp_user_rxaio; - nni_time zp_last_recv; - zt_fraglist zp_recvq[zt_recvq]; - int zp_ping_try; - int zp_ping_tries; - int zp_ping_active; - nni_duration zp_ping_time; - nni_aio * zp_ping_aio; - uint8_t * zp_send_buf; + nni_list_node zp_link; + const char * zp_addr; + zt_node * zp_ztn; + uint64_t zp_nwid; + uint64_t zp_laddr; + uint64_t zp_raddr; + uint16_t zp_peer; + uint16_t zp_proto; + uint16_t zp_next_msgid; + size_t zp_rcvmax; + size_t zp_mtu; + nni_aio * zp_user_rxaio; + nni_time zp_last_recv; + zt_fraglist zp_recvq[zt_recvq]; + int zp_ping_try; + int zp_ping_tries; + bool zp_closed; + nni_duration zp_ping_time; + nni_aio * zp_ping_aio; + uint8_t * zp_send_buf; + nni_atomic_flag zp_reaped; + nni_reap_item zp_reap; }; typedef struct zt_creq zt_creq; @@ -280,6 +281,7 @@ static void zt_fraglist_free(zt_fraglist *); static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **, uint64_t, uint64_t, unsigned int, unsigned int, const void *, unsigned int); +static void zt_pipe_start_ping(zt_pipe *); static int64_t zt_now(void) @@ -805,10 +807,8 @@ zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg) p->zp_user_rxaio = NULL; nni_aio_finish_error(aio, err); } - if ((aio = p->zp_ping_aio) != NULL) { - nni_aio_finish_error(aio, NNG_ECLOSED); - } - p->zp_closed = 1; + nni_aio_close(p->zp_ping_aio); + p->zp_closed = true; if (msg != NULL) { zt_pipe_send_err(p, code, msg); } @@ -974,7 +974,7 @@ zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len) // Don't bother to check the length, going to disconnect anyway. if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; - p->zp_closed = 1; + p->zp_closed = true; nni_aio_finish_error(aio, NNG_ECLOSED); } } @@ -990,7 +990,7 @@ zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len) // the day, the details are just not that interesting. if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; - p->zp_closed = 1; + p->zp_closed = true; nni_aio_finish_error(aio, NNG_ETRANERR); } } @@ -1627,7 +1627,8 @@ zt_pipe_close(void *arg) nni_aio *aio; nni_mtx_lock(&zt_lk); - p->zp_closed = 1; + p->zp_closed = true; + nni_aio_close(p->zp_ping_aio); if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); @@ -1659,6 +1660,14 @@ zt_pipe_fini(void *arg) NNI_FREE_STRUCT(p); } +static void +zt_pipe_reap(zt_pipe *p) +{ + if (!nni_atomic_flag_test_and_set(&p->zp_reaped)) { + nni_reap(&p->zp_reap, zt_pipe_fini, p); + } +} + static int zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) { @@ -1687,6 +1696,7 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) p->zp_ping_time = ep->ze_ping_time; p->zp_next_msgid = (uint16_t) nni_random(); p->zp_ping_try = 0; + nni_atomic_flag_reset(&p->zp_reaped); if (ep->ze_mode == NNI_EP_MODE_DIAL) { rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); @@ -1696,7 +1706,8 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) if ((rv != 0) || ((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) || ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { - zt_pipe_fini(p); + zt_pipe_reap(p); + return (rv); } // The largest fragment we can accept on this pipe. The MTU is @@ -1717,7 +1728,7 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) fl->fl_missingsz = (maxfrags + 7) / 8; fl->fl_missing = nni_alloc(fl->fl_missingsz); if (fl->fl_missing == NULL) { - zt_pipe_fini(p); + zt_pipe_reap(p); return (NNG_ENOMEM); } } @@ -2016,96 +2027,50 @@ zt_pipe_get_node(void *arg, void *buf, size_t *szp, nni_opt_type t) } static void -zt_pipe_cancel_ping(nni_aio *aio, int rv) -{ - zt_pipe *p = nni_aio_get_prov_data(aio); - - nni_mtx_lock(&zt_lk); - if (p->zp_ping_active) { - p->zp_ping_active = 0; - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&zt_lk); -} - -static void zt_pipe_ping_cb(void *arg) { zt_pipe *p = arg; nni_aio *aio = p->zp_ping_aio; + int rv; + if ((rv = nni_aio_result(aio)) != 0) { + // We were canceled. That means we're done. + return; + } nni_mtx_lock(&zt_lk); - - p->zp_ping_active = 0; if (p->zp_closed || aio == NULL || (p->zp_ping_tries == 0) || (p->zp_ping_time == NNG_DURATION_INFINITE) || (p->zp_ping_time == NNG_DURATION_ZERO)) { nni_mtx_unlock(&zt_lk); return; } - if (nni_aio_result(aio) != NNG_ETIMEDOUT) { - nni_mtx_unlock(&zt_lk); - return; - } - if (p->zp_ping_try < p->zp_ping_tries) { - nni_time now = nni_clock(); - nni_aio_set_timeout(aio, p->zp_ping_time); - // We want pings. We only send one if needed, but we - // use the the timer to wake us up even if we aren't - // going to send a ping. (We don't increment the try count - // unless we actually do send one though.) - if (nni_aio_begin(aio) == 0) { - int rv; - rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p); - if (rv != 0) { - nni_mtx_unlock(&zt_lk); - nni_aio_finish_error(aio, rv); - return; - } - p->zp_ping_active = 1; - if (now > (p->zp_last_recv + p->zp_ping_time)) { - p->zp_ping_try++; - zt_pipe_send_ping(p); - } - } - } else { + if (p->zp_ping_try >= p->zp_ping_tries) { // Ping count exceeded; the other side is AFK. // Close the pipe, but no need to send a reason to the peer. zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL); + nni_mtx_unlock(&zt_lk); + return; + } + + if (nni_clock() > (p->zp_last_recv + p->zp_ping_time)) { + p->zp_ping_try++; + zt_pipe_send_ping(p); } + + nni_sleep_aio(p->zp_ping_time, aio); // Schedule a recheck. nni_mtx_unlock(&zt_lk); } static void -zt_pipe_start(void *arg, nni_aio *aio) +zt_pipe_start_ping(zt_pipe *p) { - zt_pipe *p = arg; - - if (nni_aio_begin(aio) != 0) { - return; - } - nni_mtx_lock(&zt_lk); - p->zp_ping_active = 0; // send a gratuitous ping, and start the ping interval timer. if ((p->zp_ping_tries > 0) && (p->zp_ping_time != NNG_DURATION_ZERO) && - (p->zp_ping_time != NNG_DURATION_INFINITE) && - (p->zp_ping_aio != NULL)) { + (p->zp_ping_time != NNG_DURATION_INFINITE)) { p->zp_ping_try = 0; - nni_aio_set_timeout(aio, p->zp_ping_time); - if (nni_aio_begin(p->zp_ping_aio) == 0) { - int rv; - rv = nni_aio_schedule( - p->zp_ping_aio, zt_pipe_cancel_ping, p); - if (rv != 0) { - nni_aio_finish_error(p->zp_ping_aio, rv); - } else { - p->zp_ping_active = 1; - zt_pipe_send_ping(p); - } - } + zt_pipe_send_ping(p); + nni_sleep_aio(p->zp_ping_time, p->zp_ping_aio); } - nni_aio_finish(aio, 0, 0); - nni_mtx_unlock(&zt_lk); } static void @@ -2425,6 +2390,7 @@ zt_ep_doaccept(zt_ep *ep) } p->zp_peer = creq.cr_proto; zt_pipe_send_conn_ack(p); + zt_pipe_start_ping(p); nni_aio_set_output(aio, 0, p); nni_aio_finish(aio, 0, 0); } @@ -2484,6 +2450,7 @@ zt_ep_conn_req_cb(void *arg) // Already canceled, or already handled? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); + zt_pipe_start_ping(p); nni_aio_set_output(uaio, 0, p); nni_aio_finish(uaio, 0, 0); } else { @@ -2991,7 +2958,6 @@ static nni_tran_option zt_pipe_options[] = { static nni_tran_pipe_ops zt_pipe_ops = { .p_fini = zt_pipe_fini, - .p_start = zt_pipe_start, .p_send = zt_pipe_send, .p_recv = zt_pipe_recv, .p_close = zt_pipe_close, |
