aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-14 14:57:35 +0500
committerGarrett D'Amore <garrett@damore.org>2018-08-14 14:57:35 +0500
commit6c6bdba20795166d2909adfecfa2d152de410101 (patch)
tree75ca0e819beaf9a174473607ee2a3210d48aa2e3
parent0ad1c6e19abd8173a8c188b194799b901e9ffec5 (diff)
downloadnng-6c6bdba20795166d2909adfecfa2d152de410101.tar.gz
nng-6c6bdba20795166d2909adfecfa2d152de410101.tar.bz2
nng-6c6bdba20795166d2909adfecfa2d152de410101.zip
fixes #645 remove start from ZeroTier transport
-rw-r--r--src/transport/zerotier/zerotier.c158
1 files changed, 62 insertions, 96 deletions
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index a2163e3f..d1072931 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -183,27 +183,28 @@ struct zt_fraglist {
};
struct zt_pipe {
- nni_list_node zp_link;
- const char * zp_addr;
- zt_node * zp_ztn;
- uint64_t zp_nwid;
- uint64_t zp_laddr;
- uint64_t zp_raddr;
- uint16_t zp_peer;
- uint16_t zp_proto;
- uint16_t zp_next_msgid;
- size_t zp_rcvmax;
- size_t zp_mtu;
- int zp_closed;
- nni_aio * zp_user_rxaio;
- nni_time zp_last_recv;
- zt_fraglist zp_recvq[zt_recvq];
- int zp_ping_try;
- int zp_ping_tries;
- int zp_ping_active;
- nni_duration zp_ping_time;
- nni_aio * zp_ping_aio;
- uint8_t * zp_send_buf;
+ nni_list_node zp_link;
+ const char * zp_addr;
+ zt_node * zp_ztn;
+ uint64_t zp_nwid;
+ uint64_t zp_laddr;
+ uint64_t zp_raddr;
+ uint16_t zp_peer;
+ uint16_t zp_proto;
+ uint16_t zp_next_msgid;
+ size_t zp_rcvmax;
+ size_t zp_mtu;
+ nni_aio * zp_user_rxaio;
+ nni_time zp_last_recv;
+ zt_fraglist zp_recvq[zt_recvq];
+ int zp_ping_try;
+ int zp_ping_tries;
+ bool zp_closed;
+ nni_duration zp_ping_time;
+ nni_aio * zp_ping_aio;
+ uint8_t * zp_send_buf;
+ nni_atomic_flag zp_reaped;
+ nni_reap_item zp_reap;
};
typedef struct zt_creq zt_creq;
@@ -280,6 +281,7 @@ static void zt_fraglist_free(zt_fraglist *);
static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **,
uint64_t, uint64_t, unsigned int, unsigned int, const void *,
unsigned int);
+static void zt_pipe_start_ping(zt_pipe *);
static int64_t
zt_now(void)
@@ -805,10 +807,8 @@ zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg)
p->zp_user_rxaio = NULL;
nni_aio_finish_error(aio, err);
}
- if ((aio = p->zp_ping_aio) != NULL) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- p->zp_closed = 1;
+ nni_aio_close(p->zp_ping_aio);
+ p->zp_closed = true;
if (msg != NULL) {
zt_pipe_send_err(p, code, msg);
}
@@ -974,7 +974,7 @@ zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len)
// Don't bother to check the length, going to disconnect anyway.
if ((aio = p->zp_user_rxaio) != NULL) {
p->zp_user_rxaio = NULL;
- p->zp_closed = 1;
+ p->zp_closed = true;
nni_aio_finish_error(aio, NNG_ECLOSED);
}
}
@@ -990,7 +990,7 @@ zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len)
// the day, the details are just not that interesting.
if ((aio = p->zp_user_rxaio) != NULL) {
p->zp_user_rxaio = NULL;
- p->zp_closed = 1;
+ p->zp_closed = true;
nni_aio_finish_error(aio, NNG_ETRANERR);
}
}
@@ -1627,7 +1627,8 @@ zt_pipe_close(void *arg)
nni_aio *aio;
nni_mtx_lock(&zt_lk);
- p->zp_closed = 1;
+ p->zp_closed = true;
+ nni_aio_close(p->zp_ping_aio);
if ((aio = p->zp_user_rxaio) != NULL) {
p->zp_user_rxaio = NULL;
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -1659,6 +1660,14 @@ zt_pipe_fini(void *arg)
NNI_FREE_STRUCT(p);
}
+static void
+zt_pipe_reap(zt_pipe *p)
+{
+ if (!nni_atomic_flag_test_and_set(&p->zp_reaped)) {
+ nni_reap(&p->zp_reap, zt_pipe_fini, p);
+ }
+}
+
static int
zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
{
@@ -1687,6 +1696,7 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
p->zp_ping_time = ep->ze_ping_time;
p->zp_next_msgid = (uint16_t) nni_random();
p->zp_ping_try = 0;
+ nni_atomic_flag_reset(&p->zp_reaped);
if (ep->ze_mode == NNI_EP_MODE_DIAL) {
rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
@@ -1696,7 +1706,8 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
if ((rv != 0) ||
((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) ||
((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) {
- zt_pipe_fini(p);
+ zt_pipe_reap(p);
+ return (rv);
}
// The largest fragment we can accept on this pipe. The MTU is
@@ -1717,7 +1728,7 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
fl->fl_missingsz = (maxfrags + 7) / 8;
fl->fl_missing = nni_alloc(fl->fl_missingsz);
if (fl->fl_missing == NULL) {
- zt_pipe_fini(p);
+ zt_pipe_reap(p);
return (NNG_ENOMEM);
}
}
@@ -2016,96 +2027,50 @@ zt_pipe_get_node(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static void
-zt_pipe_cancel_ping(nni_aio *aio, int rv)
-{
- zt_pipe *p = nni_aio_get_prov_data(aio);
-
- nni_mtx_lock(&zt_lk);
- if (p->zp_ping_active) {
- p->zp_ping_active = 0;
- nni_aio_finish_error(aio, rv);
- }
- nni_mtx_unlock(&zt_lk);
-}
-
-static void
zt_pipe_ping_cb(void *arg)
{
zt_pipe *p = arg;
nni_aio *aio = p->zp_ping_aio;
+ int rv;
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // We were canceled. That means we're done.
+ return;
+ }
nni_mtx_lock(&zt_lk);
-
- p->zp_ping_active = 0;
if (p->zp_closed || aio == NULL || (p->zp_ping_tries == 0) ||
(p->zp_ping_time == NNG_DURATION_INFINITE) ||
(p->zp_ping_time == NNG_DURATION_ZERO)) {
nni_mtx_unlock(&zt_lk);
return;
}
- if (nni_aio_result(aio) != NNG_ETIMEDOUT) {
- nni_mtx_unlock(&zt_lk);
- return;
- }
- if (p->zp_ping_try < p->zp_ping_tries) {
- nni_time now = nni_clock();
- nni_aio_set_timeout(aio, p->zp_ping_time);
- // We want pings. We only send one if needed, but we
- // use the the timer to wake us up even if we aren't
- // going to send a ping. (We don't increment the try count
- // unless we actually do send one though.)
- if (nni_aio_begin(aio) == 0) {
- int rv;
- rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
- if (rv != 0) {
- nni_mtx_unlock(&zt_lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
- p->zp_ping_active = 1;
- if (now > (p->zp_last_recv + p->zp_ping_time)) {
- p->zp_ping_try++;
- zt_pipe_send_ping(p);
- }
- }
- } else {
+ if (p->zp_ping_try >= p->zp_ping_tries) {
// Ping count exceeded; the other side is AFK.
// Close the pipe, but no need to send a reason to the peer.
zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
+
+ if (nni_clock() > (p->zp_last_recv + p->zp_ping_time)) {
+ p->zp_ping_try++;
+ zt_pipe_send_ping(p);
}
+
+ nni_sleep_aio(p->zp_ping_time, aio); // Schedule a recheck.
nni_mtx_unlock(&zt_lk);
}
static void
-zt_pipe_start(void *arg, nni_aio *aio)
+zt_pipe_start_ping(zt_pipe *p)
{
- zt_pipe *p = arg;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- nni_mtx_lock(&zt_lk);
- p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
if ((p->zp_ping_tries > 0) && (p->zp_ping_time != NNG_DURATION_ZERO) &&
- (p->zp_ping_time != NNG_DURATION_INFINITE) &&
- (p->zp_ping_aio != NULL)) {
+ (p->zp_ping_time != NNG_DURATION_INFINITE)) {
p->zp_ping_try = 0;
- nni_aio_set_timeout(aio, p->zp_ping_time);
- if (nni_aio_begin(p->zp_ping_aio) == 0) {
- int rv;
- rv = nni_aio_schedule(
- p->zp_ping_aio, zt_pipe_cancel_ping, p);
- if (rv != 0) {
- nni_aio_finish_error(p->zp_ping_aio, rv);
- } else {
- p->zp_ping_active = 1;
- zt_pipe_send_ping(p);
- }
- }
+ zt_pipe_send_ping(p);
+ nni_sleep_aio(p->zp_ping_time, p->zp_ping_aio);
}
- nni_aio_finish(aio, 0, 0);
- nni_mtx_unlock(&zt_lk);
}
static void
@@ -2425,6 +2390,7 @@ zt_ep_doaccept(zt_ep *ep)
}
p->zp_peer = creq.cr_proto;
zt_pipe_send_conn_ack(p);
+ zt_pipe_start_ping(p);
nni_aio_set_output(aio, 0, p);
nni_aio_finish(aio, 0, 0);
}
@@ -2484,6 +2450,7 @@ zt_ep_conn_req_cb(void *arg)
// Already canceled, or already handled?
if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) {
nni_aio_list_remove(uaio);
+ zt_pipe_start_ping(p);
nni_aio_set_output(uaio, 0, p);
nni_aio_finish(uaio, 0, 0);
} else {
@@ -2991,7 +2958,6 @@ static nni_tran_option zt_pipe_options[] = {
static nni_tran_pipe_ops zt_pipe_ops = {
.p_fini = zt_pipe_fini,
- .p_start = zt_pipe_start,
.p_send = zt_pipe_send,
.p_recv = zt_pipe_recv,
.p_close = zt_pipe_close,