summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-03-07 17:06:16 -0800
committerGarrett D'Amore <garrett@damore.org>2018-03-07 17:12:53 -0800
commit8a20acfa34e1eedf34254354b8a219bd8fbca2ed (patch)
tree66d8860b7576ebd6f1fa24c9042e929fb7848c93 /src
parent52ee317a007ad973a091b9cbd69e1b8ef29904ee (diff)
downloadnng-8a20acfa34e1eedf34254354b8a219bd8fbca2ed.tar.gz
nng-8a20acfa34e1eedf34254354b8a219bd8fbca2ed.tar.bz2
nng-8a20acfa34e1eedf34254354b8a219bd8fbca2ed.zip
fixes #269 nngcat unreliable with ZeroTier transport
This does a few things. First it closes a case where a dropped message could prevent subsequent connection attempts from getting through. Second, it changes the rate at which we retry, and the timeout, to be a lot more aggressive when attempting to establish a connection. We retry every 500 ms, for up to 2 minutes, before giving up. This gives a lot more resilience in the face of message loss that is typical of ZeroTier in some environments when first establishing communication. Third, makes the values for the connection attempts *tunable*, so that applications can adjust for different deployment scenarios. Fourth, it includes the ability to get the UDP socket name. This was needed during some debug, and may be useful for a real UDP transport later, so we're keeping it. Finally, we added documentation for the above items.
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h2
-rw-r--r--src/platform/posix/posix_udp.c15
-rw-r--r--src/platform/windows/win_udp.c14
-rw-r--r--src/transport/zerotier/zerotier.c120
-rw-r--r--src/transport/zerotier/zerotier.h12
5 files changed, 123 insertions, 40 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index d5fc40f7..5666b0ed 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -371,6 +371,8 @@ extern void nni_plat_pipe_clear(int);
// routine.
extern void nni_plat_pipe_close(int, int);
+extern int nni_plat_udp_sockname(nni_plat_udp *, nni_sockaddr *);
+
//
// File/Store Support
//
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 61546667..8a402acc 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -306,4 +306,19 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
nni_mtx_unlock(&udp->udp_mtx);
}
+int
+nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
+{
+ struct sockaddr_storage ss;
+ socklen_t sz;
+ int rv;
+
+ sz = sizeof(ss);
+ if ((rv = getsockname(udp->udp_fd, (struct sockaddr *) &ss, &sz)) <
+ 0) {
+ return (nni_plat_errno(errno));
+ }
+ return (nni_posix_sockaddr2nn(sa, &ss));
+}
+
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c
index 81aa2c06..ba6b400c 100644
--- a/src/platform/windows/win_udp.c
+++ b/src/platform/windows/win_udp.c
@@ -290,6 +290,20 @@ nni_win_udp_finish_tx(nni_win_event *evt, nni_aio *aio)
}
int
+nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa)
+{
+ SOCKADDR_STORAGE ss;
+ int sz;
+ int rv;
+
+ sz = sizeof(ss);
+ if ((rv = getsockname(udp->s, (SOCKADDR *) &ss, &sz)) < 0) {
+ return (nni_win_error(GetLastError()));
+ }
+ return (nni_win_sockaddr2nn(sa, &ss));
+}
+
+int
nni_win_udp_sysinit(void)
{
WSADATA data;
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 8f83881c..254684b7 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -80,25 +80,25 @@ typedef struct zt_fraglist zt_fraglist;
(ptr)[2] = (uint8_t)((uint32_t)(u)); \
} while (0)
-static const uint16_t zt_ethertype = 0x901;
-static const uint8_t zt_version = 0x01;
-static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports
-static const uint32_t zt_max_port = 0xffffffu; // largest port
-static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports
-static const uint32_t zt_port_shift = 24;
+static const uint16_t zt_ethertype = 0x901;
+static const uint8_t zt_version = 0x01;
+static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports
+static const uint32_t zt_max_port = 0xffffffu; // largest port
+static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports
+static const uint32_t zt_port_shift = 24;
+static const int zt_conn_tries = 240; // max connect attempts
+static const nng_duration zt_conn_time = 500; // between attempts (msec)
+static const int zt_ping_tries = 5; // max keepalive attempts
+static const nng_duration zt_ping_time = 60000; // keepalive time (msec)
// These are compile time tunables for now.
enum zt_tunables {
zt_listenq = 128, // backlog queue length
zt_listen_expire = 60000, // maximum time in backlog (msec)
zt_rcv_bufsize = 4096, // max UDP recv
- zt_conn_attempts = 12, // connection attempts (default)
- zt_conn_interval = 5000, // between attempts (msec)
zt_udp_sendq = 16, // outgoing UDP queue length
zt_recvq = 2, // max pending recv (per pipe)
zt_recv_stale = 1000, // frags older than are stale (msec)
- zt_ping_time = 60000, // keepalive time (msec)
- zt_ping_count = 5, // keepalive attempts
};
enum zt_op_codes {
@@ -210,7 +210,7 @@ struct zt_pipe {
nni_time zp_last_recv;
zt_fraglist zp_recvq[zt_recvq];
int zp_ping_try;
- int zp_ping_count;
+ int zp_ping_tries;
int zp_ping_active;
nni_duration zp_ping_time;
nni_aio * zp_ping_aio;
@@ -241,8 +241,10 @@ struct zt_ep {
int ze_creq_try;
nni_list ze_aios;
int ze_mtu;
- int ze_ping_count;
+ int ze_ping_tries;
nni_duration ze_ping_time;
+ nni_duration ze_conn_time;
+ int ze_conn_tries;
// Incoming connection requests (server only). We only
// only have "accepted" requests -- that is we won't have an
@@ -535,11 +537,6 @@ zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
(nni_list_first(&ep->ze_aios) != NULL)) {
zt_ep_send_conn_req(ep);
}
- // if (ep->ze_mode == NNI_EP
- // zt_send_
- // nni_aio_finish(ep->ze_join_aio, 0);
- // }
- // XXX: schedule creqs if needed!
}
break;
case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY:
@@ -1034,6 +1031,9 @@ zt_pipe_virtual_recv(zt_pipe *p, uint8_t op, const uint8_t *data, size_t len)
case zt_op_error:
zt_pipe_recv_error(p, data, len);
return;
+ case zt_op_conn_req:
+ zt_pipe_send_conn_ack(p);
+ return;
}
}
@@ -1446,13 +1446,10 @@ zt_node_create(zt_node **ztnp, const char *path)
enum ZT_ResultCode zrv;
nni_iov iov;
+ // XXX: Right now we depend on having both IPv6 and IPv4 available.
+ // Probably we should support coping with the lack of either of them.
+
// We want to bind to any address we can (for now).
- // Note that at the moment we only support IPv4. Its
- // unclear how we are meant to handle underlying IPv6
- // in ZeroTier. Probably we can use IPv6 dual stock
- // sockets if they exist, but not all platforms support
- // dual-stack. Furhtermore, IPv6 is not available
- // everywhere, and the root servers may be IPv4 only.
memset(&sa4, 0, sizeof(sa4));
sa4.s_un.s_in.sa_family = NNG_AF_INET;
memset(&sa6, 0, sizeof(sa6));
@@ -1701,7 +1698,7 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
p->zp_nwid = ep->ze_nwid;
p->zp_mtu = ep->ze_mtu;
p->zp_rcvmax = ep->ze_rcvmax;
- p->zp_ping_count = ep->ze_ping_count;
+ p->zp_ping_tries = ep->ze_ping_tries;
p->zp_ping_time = ep->ze_ping_time;
p->zp_next_msgid = (uint16_t) nni_random();
p->zp_ping_try = 0;
@@ -2021,7 +2018,7 @@ zt_pipe_ping_cb(void *arg)
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
- if (p->zp_closed || aio == NULL || (p->zp_ping_count == 0) ||
+ if (p->zp_closed || aio == NULL || (p->zp_ping_tries == 0) ||
(p->zp_ping_time == NNG_DURATION_INFINITE) ||
(p->zp_ping_time == NNG_DURATION_ZERO)) {
nni_mtx_unlock(&zt_lk);
@@ -2031,7 +2028,7 @@ zt_pipe_ping_cb(void *arg)
nni_mtx_unlock(&zt_lk);
return;
}
- if (p->zp_ping_try < p->zp_ping_count) {
+ if (p->zp_ping_try < p->zp_ping_tries) {
nni_time now = nni_clock();
nni_aio_set_timeout(aio, p->zp_ping_time);
// We want pings. We only send one if needed, but we
@@ -2063,7 +2060,7 @@ zt_pipe_start(void *arg, nni_aio *aio)
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
- if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNG_DURATION_ZERO) &&
+ if ((p->zp_ping_tries > 0) && (p->zp_ping_time != NNG_DURATION_ZERO) &&
(p->zp_ping_time != NNG_DURATION_INFINITE) &&
(p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
@@ -2149,8 +2146,10 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
ep->ze_mode = mode;
ep->ze_mtu = ZT_MIN_MTU;
ep->ze_aio = NULL;
- ep->ze_ping_count = zt_ping_count;
+ ep->ze_ping_tries = zt_ping_tries;
ep->ze_ping_time = zt_ping_time;
+ ep->ze_conn_time = zt_conn_time;
+ ep->ze_conn_tries = zt_conn_tries;
ep->ze_proto = nni_sock_proto(sock);
nni_aio_list_init(&ep->ze_aios);
@@ -2444,7 +2443,8 @@ zt_ep_conn_req_cb(void *arg)
break;
case NNG_ETIMEDOUT:
- if (ep->ze_creq_try > zt_conn_attempts) {
+ if ((ep->ze_creq_try > ep->ze_conn_tries) &&
+ (ep->ze_conn_tries > 0)) {
// Final timeout attempt.
if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) {
nni_aio_list_remove(uaio);
@@ -2466,7 +2466,7 @@ zt_ep_conn_req_cb(void *arg)
}
if (nni_list_first(&ep->ze_aios) != NULL) {
- nni_aio_set_timeout(aio, zt_conn_interval);
+ nni_aio_set_timeout(aio, ep->ze_conn_time);
if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
ep->ze_creq_active = 1;
ep->ze_creq_try++;
@@ -2507,7 +2507,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
ep->ze_running = 1;
- nni_aio_set_timeout(ep->ze_creq_aio, zt_conn_interval);
+ nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
if (nni_aio_start(
ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
@@ -2679,20 +2679,54 @@ zt_ep_getopt_ping_time(void *arg, void *data, size_t *szp)
}
static int
-zt_ep_setopt_ping_count(void *arg, const void *data, size_t sz)
+zt_ep_setopt_ping_tries(void *arg, const void *data, size_t sz)
+{
+ zt_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_int(data, sz, 0, 1000000));
+ }
+ return (nni_setopt_int(&ep->ze_ping_tries, data, sz, 0, 1000000));
+}
+
+static int
+zt_ep_getopt_ping_tries(void *arg, void *data, size_t *szp)
+{
+ zt_ep *ep = arg;
+ return (nni_getopt_int(ep->ze_ping_tries, data, szp));
+}
+
+static int
+zt_ep_setopt_conn_time(void *arg, const void *data, size_t sz)
+{
+ zt_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_ms(data, sz));
+ }
+ return (nni_setopt_ms(&ep->ze_conn_time, data, sz));
+}
+
+static int
+zt_ep_getopt_conn_time(void *arg, void *data, size_t *szp)
+{
+ zt_ep *ep = arg;
+ return (nni_getopt_ms(ep->ze_conn_time, data, szp));
+}
+
+static int
+zt_ep_setopt_conn_tries(void *arg, const void *data, size_t sz)
{
zt_ep *ep = arg;
if (ep == NULL) {
return (nni_chkopt_int(data, sz, 0, 1000000));
}
- return (nni_setopt_int(&ep->ze_ping_count, data, sz, 0, 1000000));
+ return (nni_setopt_int(&ep->ze_conn_tries, data, sz, 0, 1000000));
}
static int
-zt_ep_getopt_ping_count(void *arg, void *data, size_t *szp)
+zt_ep_getopt_conn_tries(void *arg, void *data, size_t *szp)
{
zt_ep *ep = arg;
- return (nni_getopt_int(ep->ze_ping_count, data, szp));
+ return (nni_getopt_int(ep->ze_conn_tries, data, szp));
}
static int
@@ -2793,9 +2827,19 @@ static nni_tran_ep_option zt_ep_options[] = {
.eo_setopt = zt_ep_setopt_ping_time,
},
{
- .eo_name = NNG_OPT_ZT_PING_COUNT,
- .eo_getopt = zt_ep_getopt_ping_count,
- .eo_setopt = zt_ep_setopt_ping_count,
+ .eo_name = NNG_OPT_ZT_PING_TRIES,
+ .eo_getopt = zt_ep_getopt_ping_tries,
+ .eo_setopt = zt_ep_setopt_ping_tries,
+ },
+ {
+ .eo_name = NNG_OPT_ZT_CONN_TIME,
+ .eo_getopt = zt_ep_getopt_conn_time,
+ .eo_setopt = zt_ep_setopt_conn_time,
+ },
+ {
+ .eo_name = NNG_OPT_ZT_CONN_TRIES,
+ .eo_getopt = zt_ep_getopt_conn_tries,
+ .eo_setopt = zt_ep_setopt_conn_tries,
},
{
.eo_name = NNG_OPT_ZT_ORBIT,
diff --git a/src/transport/zerotier/zerotier.h b/src/transport/zerotier/zerotier.h
index 9083f665..a191b106 100644
--- a/src/transport/zerotier/zerotier.h
+++ b/src/transport/zerotier/zerotier.h
@@ -78,7 +78,7 @@
// is presented as an ASCIIZ string.
#define NNG_OPT_ZT_NETWORK_NAME "zt:network-name"
-// NNG_OPT_ZT_PING_TIME and NNG_OPT_ZT_PING_COUNT are used to send ping
+// NNG_OPT_ZT_PING_TIME and NNG_OPT_ZT_PING_TRIES are used to send ping
// requests when a connection appears to be idled. If a logical session
// has not received traffic from it's peer for ping-time, then a ping packet
// is sent. This will be done up to ping-count times. If no traffic from
@@ -88,7 +88,15 @@
// NNG_OPT_ZT_PING_COUNT is an integer.) This ping process can be disabled
// by setting either ping-time or ping-count to zero.
#define NNG_OPT_ZT_PING_TIME "zt:ping-time"
-#define NNG_OPT_ZT_PING_COUNT "zt:ping-count"
+#define NNG_OPT_ZT_PING_TRIES "zt:ping-tries"
+
+// NNG_OPT_ZT_CONN_TIME and NNG_OPT_ZT_CONN_TRIES are used to control
+// the interval between connection attempts, and the maximum number of
+// connection attempts to make before assuming that the peer is absent
+// (and returning NNG_ETIMEDOUT). The NNG_OPT_ZT_CONN_TIME is a duration,
+// the NNG_OPT_ZT_CONN_TRIES is an integer.
+#define NNG_OPT_ZT_CONN_TIME "zt:conn-time"
+#define NNG_OPT_ZT_CONN_TRIES "zt:conn-tries"
// NNG_OPT_ZT_MTU is a read-only size_t and contains the ZeroTier virtual
// network MTU (i.e. the L2 payload MTU). Messages that are larger than this