aboutsummaryrefslogtreecommitdiff
path: root/src/transport/zerotier
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-27 11:14:33 -0700
committerGarrett D'Amore <garrett@damore.org>2018-08-27 15:33:29 -0700
commitde8aca84eba4f52741fd49d1a57d1fe20a2ec7f5 (patch)
tree32ceb0fb7da277c9ac45529afd06b84edba5c35a /src/transport/zerotier
parent84a1e7455c158441dd7b33d2eb296cc33dd5a6df (diff)
downloadnng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.tar.gz
nng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.tar.bz2
nng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.zip
fixes #673 transports could benefit from access to upper layer
Diffstat (limited to 'src/transport/zerotier')
-rw-r--r--src/transport/zerotier/zerotier.c88
1 files changed, 49 insertions, 39 deletions
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 0a68c1ca..0b0303d9 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -186,6 +186,7 @@ struct zt_pipe {
nni_list_node zp_link;
const char * zp_addr;
zt_node * zp_ztn;
+ nni_pipe * zp_npipe;
uint64_t zp_nwid;
uint64_t zp_laddr;
uint64_t zp_raddr;
@@ -219,7 +220,6 @@ struct zt_ep {
char ze_home[NNG_MAXADDRLEN]; // should be enough
zt_node * ze_ztn;
uint64_t ze_nwid;
- int ze_mode;
int ze_running;
uint64_t ze_raddr; // remote node address
uint64_t ze_laddr; // local node address
@@ -241,9 +241,11 @@ struct zt_ep {
// established connection/pipe unless the application calls
// accept. Since the "application" is our library, that should
// be pretty much as fast we can run.
- zt_creq ze_creqs[zt_listenq];
- int ze_creq_head;
- int ze_creq_tail;
+ zt_creq ze_creqs[zt_listenq];
+ int ze_creq_head;
+ int ze_creq_tail;
+ nni_dialer * ze_ndialer;
+ nni_listener *ze_nlistener;
};
// Locking strategy. At present the ZeroTier core is not reentrant or fully
@@ -274,7 +276,7 @@ static void zt_ep_send_conn_req(zt_ep *);
static void zt_ep_conn_req_cb(void *);
static void zt_ep_doaccept(zt_ep *);
static void zt_pipe_dorecv(zt_pipe *);
-static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t);
+static int zt_pipe_alloc(zt_pipe **, zt_ep *, uint64_t, uint64_t);
static void zt_pipe_ping_cb(void *);
static void zt_fraglist_clear(zt_fraglist *);
static void zt_fraglist_free(zt_fraglist *);
@@ -524,7 +526,7 @@ zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
}
ep->ze_mtu = config->mtu;
- if ((ep->ze_mode == NNI_EP_MODE_DIAL) &&
+ if ((ep->ze_ndialer != NULL) &&
(nni_list_first(&ep->ze_aios) != NULL)) {
zt_ep_send_conn_req(ep);
}
@@ -642,7 +644,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int rv;
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
+ if (ep->ze_ndialer == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -665,7 +667,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
return;
}
- if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) {
+ if ((rv = zt_pipe_alloc(&p, ep, raddr, ep->ze_laddr)) != 0) {
// We couldn't create the pipe, just drop it.
nni_aio_finish_error(aio, rv);
return;
@@ -689,7 +691,7 @@ zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int i;
- if (ep->ze_mode != NNI_EP_MODE_LISTEN) {
+ if (ep->ze_nlistener == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -742,8 +744,8 @@ zt_ep_recv_error(zt_ep *ep, const uint8_t *data, size_t len)
// is that when we have an outstanding CON_REQ, we would like to
// process that appropriately.
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
- // Drop it.
+ if (ep->ze_ndialer == NULL) {
+ // Not a dialer. Drop it.
return;
}
@@ -1637,6 +1639,14 @@ zt_pipe_close(void *arg)
nni_mtx_unlock(&zt_lk);
}
+static int
+zt_pipe_init(void *arg, nni_pipe *npipe)
+{
+ zt_pipe *p = arg;
+ p->zp_npipe = npipe;
+ return (0);
+}
+
static void
zt_pipe_fini(void *arg)
{
@@ -1669,7 +1679,7 @@ zt_pipe_reap(zt_pipe *p)
}
static int
-zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
+zt_pipe_alloc(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
{
zt_pipe *p;
int rv;
@@ -1698,10 +1708,12 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
p->zp_ping_try = 0;
nni_atomic_flag_reset(&p->zp_reaped);
- if (ep->ze_mode == NNI_EP_MODE_DIAL) {
- rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
- } else {
+ if (ep->ze_nlistener != NULL) {
+ // listener
rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p);
+ } else {
+ // dialer
+ rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
}
if ((rv != 0) ||
((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) ||
@@ -2129,7 +2141,8 @@ zt_parsedec(const char **sp, uint64_t *valp)
}
static int
-zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer,
+ nni_listener *nlistener)
{
zt_ep * ep;
uint64_t node;
@@ -2141,7 +2154,6 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_ENOMEM);
}
- ep->ze_mode = mode;
ep->ze_mtu = ZT_MIN_MTU;
ep->ze_aio = NULL;
ep->ze_ping_tries = zt_ping_tries;
@@ -2149,6 +2161,8 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
ep->ze_conn_time = zt_conn_time;
ep->ze_conn_tries = zt_conn_tries;
ep->ze_proto = nni_sock_proto_id(sock);
+ ep->ze_ndialer = ndialer;
+ ep->ze_nlistener = nlistener;
nni_aio_list_init(&ep->ze_aios);
@@ -2181,26 +2195,23 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
// Parse the URL.
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- /// We have to have a non-zero port number to connect to.
+ if (nlistener != NULL) {
+ // listener
+ ep->ze_laddr = node;
+ ep->ze_laddr <<= 24;
+ ep->ze_laddr |= port;
+ ep->ze_raddr = 0;
+ ep->ze_nlistener = nlistener;
+ } else {
+ // dialer
if (port == 0) {
return (NNG_EADDRINVAL);
}
ep->ze_raddr = node;
ep->ze_raddr <<= 24;
ep->ze_raddr |= port;
- ep->ze_laddr = 0;
- break;
- case NNI_EP_MODE_LISTEN:
- ep->ze_laddr = node;
- ep->ze_laddr <<= 24;
- ep->ze_laddr |= port;
- ep->ze_raddr = 0;
- break;
- default:
- NNI_ASSERT(0);
- break;
+ ep->ze_laddr = 0;
+ ep->ze_ndialer = ndialer;
}
nni_mtx_lock(&zt_lk);
@@ -2217,15 +2228,15 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
static int
-zt_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+zt_dialer_init(void **epp, nni_url *url, nni_dialer *d)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ return (zt_ep_init(epp, url, nni_dialer_sock(d), d, NULL));
}
static int
-zt_listener_init(void **epp, nni_url *url, nni_sock *sock)
+zt_listener_init(void **epp, nni_url *url, nni_listener *l)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ return (zt_ep_init(epp, url, nni_listener_sock(l), NULL, l));
}
static void
@@ -2380,7 +2391,7 @@ zt_ep_doaccept(zt_ep *ep)
// We remove this AIO. This keeps it from being canceled.
nni_aio_list_remove(aio);
- rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr);
+ rv = zt_pipe_alloc(&p, ep, creq.cr_raddr, ep->ze_laddr);
if (rv != 0) {
zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr,
ep->ze_laddr, zt_err_unknown,
@@ -2439,8 +2450,6 @@ zt_ep_conn_req_cb(void *arg)
nni_aio *uaio;
int rv;
- NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL);
-
nni_mtx_lock(&zt_lk);
ep->ze_creq_active = 0;
@@ -2642,7 +2651,7 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t)
uint64_t addr;
nni_mtx_lock(&zt_lk);
- addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr;
+ addr = ep->ze_nlistener != NULL ? ep->ze_laddr : ep->ze_raddr;
snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u",
(unsigned long long) addr >> zt_port_shift,
(unsigned long long) ep->ze_nwid,
@@ -2957,6 +2966,7 @@ static nni_tran_option zt_pipe_options[] = {
};
static nni_tran_pipe_ops zt_pipe_ops = {
+ .p_init = zt_pipe_init,
.p_fini = zt_pipe_fini,
.p_send = zt_pipe_send,
.p_recv = zt_pipe_recv,