aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-27 11:14:33 -0700
committerGarrett D'Amore <garrett@damore.org>2018-08-27 15:33:29 -0700
commitde8aca84eba4f52741fd49d1a57d1fe20a2ec7f5 (patch)
tree32ceb0fb7da277c9ac45529afd06b84edba5c35a /src/transport
parent84a1e7455c158441dd7b33d2eb296cc33dd5a6df (diff)
downloadnng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.tar.gz
nng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.tar.bz2
nng-de8aca84eba4f52741fd49d1a57d1fe20a2ec7f5.zip
fixes #673 transports could benefit from access to upper layer
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c33
-rw-r--r--src/transport/ipc/ipc.c26
-rw-r--r--src/transport/tcp/tcp.c26
-rw-r--r--src/transport/tls/tls.c26
-rw-r--r--src/transport/ws/websocket.c58
-rw-r--r--src/transport/zerotier/zerotier.c88
6 files changed, 174 insertions, 83 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 3194a56d..ab6486bd 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -33,6 +33,7 @@ struct nni_inproc_pipe {
nni_inproc_pair *pair;
nni_msgq * rq;
nni_msgq * wq;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -57,6 +58,8 @@ struct nni_inproc_ep {
nni_list aios;
size_t rcvmax;
nni_mtx mtx;
+ nni_dialer * ndialer;
+ nni_listener *nlistener;
};
// nni_inproc is our global state - this contains the list of active endpoints
@@ -103,7 +106,7 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
}
static int
-nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
+nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
{
nni_inproc_pipe *pipe;
@@ -120,6 +123,14 @@ nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
return (0);
}
+static int
+nni_inproc_pipe_init(void *arg, nni_pipe *p)
+{
+ nni_inproc_pipe *pipe = arg;
+ pipe->npipe = p;
+ return (0);
+}
+
static void
nni_inproc_pipe_fini(void *arg)
{
@@ -196,9 +207,10 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static int
-nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer)
{
nni_inproc_ep *ep;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -208,6 +220,7 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
ep->listener = false;
ep->proto = nni_sock_proto_id(sock);
ep->rcvmax = 0;
+ ep->ndialer = ndialer;
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -215,19 +228,22 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
*epp = ep;
return (0);
}
+
static int
-nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock)
+nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener)
{
nni_inproc_ep *ep;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
- ep->listener = true;
- ep->proto = nni_sock_proto_id(sock);
- ep->rcvmax = 0;
+ ep->listener = true;
+ ep->proto = nni_sock_proto_id(sock);
+ ep->rcvmax = 0;
+ ep->nlistener = nlistener;
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -330,8 +346,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv)
nni_mtx_init(&pair->mx);
spipe = cpipe = NULL;
- if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) ||
- ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) ||
+ if (((rv = nni_inproc_pipe_alloc(&cpipe, cli)) != 0) ||
+ ((rv = nni_inproc_pipe_alloc(&spipe, srv)) != 0) ||
((rv = nni_msgq_init(&pair->q[0], 1)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) {
@@ -522,6 +538,7 @@ static nni_tran_option nni_inproc_pipe_options[] = {
};
static nni_tran_pipe_ops nni_inproc_pipe_ops = {
+ .p_init = nni_inproc_pipe_init,
.p_fini = nni_inproc_pipe_fini,
.p_send = nni_inproc_pipe_send,
.p_recv = nni_inproc_pipe_recv,
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 05e0b895..58fff1a7 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -32,6 +32,7 @@ struct ipctran_pipe {
bool closed;
nni_sockaddr sa;
ipctran_ep * ep;
+ nni_pipe * npipe;
nni_list_node node;
nni_atomic_flag reaped;
nni_reap_item reap;
@@ -64,6 +65,8 @@ struct ipctran_ep {
nni_ipc_dialer * dialer;
nni_ipc_listener *listener;
nni_reap_item reap;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void ipctran_pipe_send_start(ipctran_pipe *);
@@ -113,6 +116,14 @@ ipctran_pipe_stop(void *arg)
nni_aio_stop(p->connaio);
}
+static int
+ipctran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ ipctran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
ipctran_pipe_fini(void *arg)
{
@@ -154,7 +165,7 @@ ipctran_pipe_reap(ipctran_pipe *p)
}
static int
-ipctran_pipe_init(ipctran_pipe **pipep, ipctran_ep *ep)
+ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep)
{
ipctran_pipe *p;
int rv;
@@ -749,11 +760,12 @@ ipctran_ep_close(void *arg)
}
static int
-ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
ipctran_ep *ep;
int rv;
size_t sz;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -764,6 +776,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
sz = sizeof(ep->sa.s_ipc.sa_path);
ep->sa.s_ipc.sa_family = NNG_AF_IPC;
ep->proto = nni_sock_proto_id(sock);
+ ep->ndialer = ndialer;
if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
ipctran_ep_fini(ep);
@@ -780,11 +793,12 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
}
static int
-ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock)
+ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener)
{
ipctran_ep *ep;
int rv;
size_t sz;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -795,6 +809,7 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock)
sz = sizeof(ep->sa.s_ipc.sa_path);
ep->sa.s_ipc.sa_family = NNG_AF_IPC;
ep->proto = nni_sock_proto_id(sock);
+ ep->nlistener = nlistener;
if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
ipctran_ep_fini(ep);
@@ -821,7 +836,7 @@ ipctran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_init(&p, ep)) != 0) {
+ if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -887,7 +902,7 @@ ipctran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_init(&p, ep)) != 0) {
+ if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1006,6 +1021,7 @@ static nni_tran_option ipctran_pipe_options[] = {
};
static nni_tran_pipe_ops ipctran_pipe_ops = {
+ .p_init = ipctran_pipe_init,
.p_fini = ipctran_pipe_fini,
.p_stop = ipctran_pipe_stop,
.p_send = ipctran_pipe_send,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 017b4ccf..fed8872c 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -23,6 +23,7 @@ typedef struct tcptran_ep tcptran_ep;
// tcp_pipe is one end of a TCP connection.
struct tcptran_pipe {
nni_tcp_conn * conn;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -69,6 +70,8 @@ struct tcptran_ep {
nni_reap_item reap;
nni_tcp_dialer * dialer;
nni_tcp_listener *listener;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void tcptran_pipe_send_start(tcptran_pipe *);
@@ -121,6 +124,14 @@ tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rslvaio);
}
+static int
+tcptran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ tcptran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
tcptran_pipe_fini(void *arg)
{
@@ -162,7 +173,7 @@ tcptran_pipe_reap(tcptran_pipe *p)
}
static int
-tcptran_pipe_init(tcptran_pipe **pipep, tcptran_ep *ep)
+tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
{
tcptran_pipe *p;
int rv;
@@ -750,13 +761,14 @@ tcptran_ep_close(void *arg)
}
static int
-tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+tcptran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
tcptran_ep * ep;
int rv;
uint16_t af;
char * host;
nng_sockaddr srcsa;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if (strcmp(url->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -788,6 +800,7 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
ep->nodelay = true;
ep->keepalive = false;
ep->url = url;
+ ep->ndialer = ndialer;
// Detect an embedded local interface name in the hostname. This
// syntax is only valid with dialers.
@@ -839,13 +852,14 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
return (0);
}
static int
-tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
+tcptran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
{
tcptran_ep *ep;
int rv;
char * host;
nni_aio * aio;
uint16_t af;
+ nni_sock * sock = nni_listener_sock(nlistener);
if (strcmp(url->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -876,6 +890,7 @@ tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
ep->nodelay = true;
ep->keepalive = false;
ep->url = url;
+ ep->nlistener = nlistener;
if (strlen(url->u_hostname) == 0) {
host = NULL;
@@ -926,7 +941,7 @@ tcptran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1065,7 +1080,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1135,6 +1150,7 @@ static nni_tran_option tcptran_pipe_options[] = {
};
static nni_tran_pipe_ops tcptran_pipe_ops = {
+ .p_init = tcptran_pipe_init,
.p_fini = tcptran_pipe_fini,
.p_stop = tcptran_pipe_stop,
.p_send = tcptran_pipe_send,
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 49bc932d..cf478796 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -30,6 +30,7 @@ typedef struct tlstran_pipe tlstran_pipe;
// tlstran_pipe is one end of a TLS connection.
struct tlstran_pipe {
nni_tls * tls;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -77,6 +78,8 @@ struct tlstran_ep {
nni_tcp_listener *listener;
nng_sockaddr sa;
nng_sockaddr bsa;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void tlstran_pipe_send_start(tlstran_pipe *);
@@ -125,6 +128,14 @@ tlstran_pipe_stop(void *arg)
nni_aio_stop(p->rslvaio);
}
+static int
+tlstran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ tlstran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
tlstran_pipe_fini(void *arg)
{
@@ -153,7 +164,7 @@ tlstran_pipe_fini(void *arg)
}
static int
-tlstran_pipe_init(tlstran_pipe **pipep, tlstran_ep *ep)
+tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep)
{
tlstran_pipe *p;
int rv;
@@ -742,13 +753,14 @@ tlstran_ep_close(void *arg)
}
static int
-tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
tlstran_ep *ep;
int rv;
uint16_t af;
char * host = url->u_hostname;
char * port = url->u_port;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if (strcmp(url->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -782,6 +794,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
ep->proto = nni_sock_proto_id(sock);
ep->nodelay = true;
ep->keepalive = false;
+ ep->ndialer = ndialer;
if (((rv = nni_tcp_dialer_init(&ep->dialer)) != 0) ||
((rv = nni_tls_config_init(&ep->cfg, NNG_TLS_MODE_CLIENT)) != 0) ||
@@ -796,13 +809,14 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
}
static int
-tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
+tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
{
tlstran_ep *ep;
int rv;
uint16_t af;
char * host = url->u_hostname;
nni_aio * aio;
+ nni_sock * sock = nni_listener_sock(nlistener);
if (strcmp(url->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -835,6 +849,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
ep->proto = nni_sock_proto_id(sock);
ep->nodelay = true;
ep->keepalive = false;
+ ep->nlistener = nlistener;
if (strlen(host) == 0) {
host = NULL;
@@ -879,7 +894,7 @@ tlstran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -923,7 +938,7 @@ tlstran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1224,6 +1239,7 @@ static nni_tran_option tlstran_pipe_options[] = {
};
static nni_tran_pipe_ops tlstran_pipe_ops = {
+ .p_init = tlstran_pipe_init,
.p_fini = tlstran_pipe_fini,
.p_stop = tlstran_pipe_stop,
.p_send = tlstran_pipe_send,
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 9b3f67f4..baa9ea34 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -42,6 +42,7 @@ struct ws_dialer {
nni_ws_dialer *dialer;
nni_list headers; // req headers
bool started;
+ nni_dialer * ndialer;
};
struct ws_listener {
@@ -55,19 +56,21 @@ struct ws_listener {
nni_ws_listener *listener;
nni_list headers; // res headers
bool started;
+ nni_listener * nlistener;
};
struct ws_pipe {
- nni_mtx mtx;
- size_t rcvmax;
- bool closed;
- uint16_t rproto;
- uint16_t lproto;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
- nni_aio *txaio;
- nni_aio *rxaio;
- nni_ws * ws;
+ nni_mtx mtx;
+ nni_pipe *npipe;
+ size_t rcvmax;
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio * user_txaio;
+ nni_aio * user_rxaio;
+ nni_aio * txaio;
+ nni_aio * rxaio;
+ nni_ws * ws;
};
static void
@@ -201,6 +204,14 @@ ws_pipe_stop(void *arg)
nni_aio_stop(p->txaio);
}
+static int
+ws_pipe_init(void *arg, nni_pipe *npipe)
+{
+ ws_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
ws_pipe_fini(void *arg)
{
@@ -230,7 +241,7 @@ ws_pipe_close(void *arg)
}
static int
-ws_pipe_init(ws_pipe **pipep, void *ws)
+ws_pipe_alloc(ws_pipe **pipep, void *ws)
{
ws_pipe *p;
int rv;
@@ -663,6 +674,7 @@ static nni_tran_option ws_pipe_options[] = {
};
static nni_tran_pipe_ops ws_pipe_ops = {
+ .p_init = ws_pipe_init,
.p_fini = ws_pipe_fini,
.p_stop = ws_pipe_stop,
.p_send = ws_pipe_send,
@@ -782,7 +794,7 @@ ws_connect_cb(void *arg)
NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
- } else if ((rv = ws_pipe_init(&p, ws)) != 0) {
+ } else if ((rv = ws_pipe_alloc(&p, ws)) != 0) {
nni_ws_fini(ws);
nni_aio_finish_error(uaio, rv);
} else {
@@ -835,7 +847,7 @@ ws_accept_cb(void *arg)
ws_pipe *p;
// Make a pipe
nni_aio_list_remove(uaio);
- if ((rv = ws_pipe_init(&p, ws)) != 0) {
+ if ((rv = ws_pipe_alloc(&p, ws)) != 0) {
nni_ws_close(ws);
nni_aio_finish_error(uaio, rv);
} else {
@@ -855,9 +867,10 @@ ws_accept_cb(void *arg)
}
static int
-ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
+ws_dialer_init(void **dp, nni_url *url, nni_dialer *ndialer)
{
ws_dialer * d;
+ nni_sock * s = nni_dialer_sock(ndialer);
const char *n;
int rv;
@@ -869,9 +882,10 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
nni_aio_list_init(&d->aios);
- d->lproto = nni_sock_proto_id(s);
- d->rproto = nni_sock_peer_id(s);
- n = nni_sock_peer_name(s);
+ d->lproto = nni_sock_proto_id(s);
+ d->rproto = nni_sock_peer_id(s);
+ d->ndialer = ndialer;
+ n = nni_sock_peer_name(s);
if (((rv = nni_ws_dialer_init(&d->dialer, url)) != 0) ||
((rv = nni_aio_init(&d->connaio, ws_connect_cb, d)) != 0) ||
@@ -886,11 +900,12 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
}
static int
-ws_listener_init(void **lp, nni_url *url, nni_sock *sock)
+ws_listener_init(void **lp, nni_url *url, nni_listener *nlistener)
{
ws_listener *l;
const char * n;
int rv;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
@@ -900,9 +915,10 @@ ws_listener_init(void **lp, nni_url *url, nni_sock *sock)
nni_aio_list_init(&l->aios);
- l->lproto = nni_sock_proto_id(sock);
- l->rproto = nni_sock_peer_id(sock);
- n = nni_sock_proto_name(sock);
+ l->lproto = nni_sock_proto_id(sock);
+ l->rproto = nni_sock_peer_id(sock);
+ n = nni_sock_proto_name(sock);
+ l->nlistener = nlistener;
if (((rv = nni_ws_listener_init(&l->listener, url)) != 0) ||
((rv = nni_aio_init(&l->accaio, ws_accept_cb, l)) != 0) ||
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 0a68c1ca..0b0303d9 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -186,6 +186,7 @@ struct zt_pipe {
nni_list_node zp_link;
const char * zp_addr;
zt_node * zp_ztn;
+ nni_pipe * zp_npipe;
uint64_t zp_nwid;
uint64_t zp_laddr;
uint64_t zp_raddr;
@@ -219,7 +220,6 @@ struct zt_ep {
char ze_home[NNG_MAXADDRLEN]; // should be enough
zt_node * ze_ztn;
uint64_t ze_nwid;
- int ze_mode;
int ze_running;
uint64_t ze_raddr; // remote node address
uint64_t ze_laddr; // local node address
@@ -241,9 +241,11 @@ struct zt_ep {
// established connection/pipe unless the application calls
// accept. Since the "application" is our library, that should
// be pretty much as fast we can run.
- zt_creq ze_creqs[zt_listenq];
- int ze_creq_head;
- int ze_creq_tail;
+ zt_creq ze_creqs[zt_listenq];
+ int ze_creq_head;
+ int ze_creq_tail;
+ nni_dialer * ze_ndialer;
+ nni_listener *ze_nlistener;
};
// Locking strategy. At present the ZeroTier core is not reentrant or fully
@@ -274,7 +276,7 @@ static void zt_ep_send_conn_req(zt_ep *);
static void zt_ep_conn_req_cb(void *);
static void zt_ep_doaccept(zt_ep *);
static void zt_pipe_dorecv(zt_pipe *);
-static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t);
+static int zt_pipe_alloc(zt_pipe **, zt_ep *, uint64_t, uint64_t);
static void zt_pipe_ping_cb(void *);
static void zt_fraglist_clear(zt_fraglist *);
static void zt_fraglist_free(zt_fraglist *);
@@ -524,7 +526,7 @@ zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
}
ep->ze_mtu = config->mtu;
- if ((ep->ze_mode == NNI_EP_MODE_DIAL) &&
+ if ((ep->ze_ndialer != NULL) &&
(nni_list_first(&ep->ze_aios) != NULL)) {
zt_ep_send_conn_req(ep);
}
@@ -642,7 +644,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int rv;
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
+ if (ep->ze_ndialer == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -665,7 +667,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
return;
}
- if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) {
+ if ((rv = zt_pipe_alloc(&p, ep, raddr, ep->ze_laddr)) != 0) {
// We couldn't create the pipe, just drop it.
nni_aio_finish_error(aio, rv);
return;
@@ -689,7 +691,7 @@ zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int i;
- if (ep->ze_mode != NNI_EP_MODE_LISTEN) {
+ if (ep->ze_nlistener == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -742,8 +744,8 @@ zt_ep_recv_error(zt_ep *ep, const uint8_t *data, size_t len)
// is that when we have an outstanding CON_REQ, we would like to
// process that appropriately.
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
- // Drop it.
+ if (ep->ze_ndialer == NULL) {
+ // Not a dialer. Drop it.
return;
}
@@ -1637,6 +1639,14 @@ zt_pipe_close(void *arg)
nni_mtx_unlock(&zt_lk);
}
+static int
+zt_pipe_init(void *arg, nni_pipe *npipe)
+{
+ zt_pipe *p = arg;
+ p->zp_npipe = npipe;
+ return (0);
+}
+
static void
zt_pipe_fini(void *arg)
{
@@ -1669,7 +1679,7 @@ zt_pipe_reap(zt_pipe *p)
}
static int
-zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
+zt_pipe_alloc(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
{
zt_pipe *p;
int rv;
@@ -1698,10 +1708,12 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
p->zp_ping_try = 0;
nni_atomic_flag_reset(&p->zp_reaped);
- if (ep->ze_mode == NNI_EP_MODE_DIAL) {
- rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
- } else {
+ if (ep->ze_nlistener != NULL) {
+ // listener
rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p);
+ } else {
+ // dialer
+ rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
}
if ((rv != 0) ||
((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) ||
@@ -2129,7 +2141,8 @@ zt_parsedec(const char **sp, uint64_t *valp)
}
static int
-zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer,
+ nni_listener *nlistener)
{
zt_ep * ep;
uint64_t node;
@@ -2141,7 +2154,6 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_ENOMEM);
}
- ep->ze_mode = mode;
ep->ze_mtu = ZT_MIN_MTU;
ep->ze_aio = NULL;
ep->ze_ping_tries = zt_ping_tries;
@@ -2149,6 +2161,8 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
ep->ze_conn_time = zt_conn_time;
ep->ze_conn_tries = zt_conn_tries;
ep->ze_proto = nni_sock_proto_id(sock);
+ ep->ze_ndialer = ndialer;
+ ep->ze_nlistener = nlistener;
nni_aio_list_init(&ep->ze_aios);
@@ -2181,26 +2195,23 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
// Parse the URL.
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- /// We have to have a non-zero port number to connect to.
+ if (nlistener != NULL) {
+ // listener
+ ep->ze_laddr = node;
+ ep->ze_laddr <<= 24;
+ ep->ze_laddr |= port;
+ ep->ze_raddr = 0;
+ ep->ze_nlistener = nlistener;
+ } else {
+ // dialer
if (port == 0) {
return (NNG_EADDRINVAL);
}
ep->ze_raddr = node;
ep->ze_raddr <<= 24;
ep->ze_raddr |= port;
- ep->ze_laddr = 0;
- break;
- case NNI_EP_MODE_LISTEN:
- ep->ze_laddr = node;
- ep->ze_laddr <<= 24;
- ep->ze_laddr |= port;
- ep->ze_raddr = 0;
- break;
- default:
- NNI_ASSERT(0);
- break;
+ ep->ze_laddr = 0;
+ ep->ze_ndialer = ndialer;
}
nni_mtx_lock(&zt_lk);
@@ -2217,15 +2228,15 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
static int
-zt_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+zt_dialer_init(void **epp, nni_url *url, nni_dialer *d)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ return (zt_ep_init(epp, url, nni_dialer_sock(d), d, NULL));
}
static int
-zt_listener_init(void **epp, nni_url *url, nni_sock *sock)
+zt_listener_init(void **epp, nni_url *url, nni_listener *l)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ return (zt_ep_init(epp, url, nni_listener_sock(l), NULL, l));
}
static void
@@ -2380,7 +2391,7 @@ zt_ep_doaccept(zt_ep *ep)
// We remove this AIO. This keeps it from being canceled.
nni_aio_list_remove(aio);
- rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr);
+ rv = zt_pipe_alloc(&p, ep, creq.cr_raddr, ep->ze_laddr);
if (rv != 0) {
zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr,
ep->ze_laddr, zt_err_unknown,
@@ -2439,8 +2450,6 @@ zt_ep_conn_req_cb(void *arg)
nni_aio *uaio;
int rv;
- NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL);
-
nni_mtx_lock(&zt_lk);
ep->ze_creq_active = 0;
@@ -2642,7 +2651,7 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t)
uint64_t addr;
nni_mtx_lock(&zt_lk);
- addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr;
+ addr = ep->ze_nlistener != NULL ? ep->ze_laddr : ep->ze_raddr;
snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u",
(unsigned long long) addr >> zt_port_shift,
(unsigned long long) ep->ze_nwid,
@@ -2957,6 +2966,7 @@ static nni_tran_option zt_pipe_options[] = {
};
static nni_tran_pipe_ops zt_pipe_ops = {
+ .p_init = zt_pipe_init,
.p_fini = zt_pipe_fini,
.p_send = zt_pipe_send,
.p_recv = zt_pipe_recv,