aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c8
-rw-r--r--src/core/dialer.h19
-rw-r--r--src/core/listener.c8
-rw-r--r--src/core/listener.h1
-rw-r--r--src/core/pipe.c2
-rw-r--r--src/core/transport.h19
-rw-r--r--src/supplemental/websocket/websocket.c29
-rw-r--r--src/transport/inproc/inproc.c33
-rw-r--r--src/transport/ipc/ipc.c26
-rw-r--r--src/transport/tcp/tcp.c26
-rw-r--r--src/transport/tls/tls.c26
-rw-r--r--src/transport/ws/websocket.c58
-rw-r--r--src/transport/zerotier/zerotier.c88
13 files changed, 222 insertions, 121 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 77ecbe7f..b76bff6d 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -112,7 +112,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
- ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
+ ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
nni_dialer_destroy(d);
@@ -302,6 +302,12 @@ nni_dialer_start(nni_dialer *d, int flags)
return (rv);
}
+nni_sock *
+nni_dialer_sock(nni_dialer *d)
+{
+ return (d->d_sock);
+}
+
int
nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
nni_opt_type t)
diff --git a/src/core/dialer.h b/src/core/dialer.h
index 4a3e127c..361c6d5e 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -11,15 +11,16 @@
#ifndef CORE_DIALER_H
#define CORE_DIALER_H
-extern int nni_dialer_sys_init(void);
-extern void nni_dialer_sys_fini(void);
-extern int nni_dialer_find(nni_dialer **, uint32_t);
-extern int nni_dialer_hold(nni_dialer *);
-extern void nni_dialer_rele(nni_dialer *);
-extern uint32_t nni_dialer_id(nni_dialer *);
-extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
-extern void nni_dialer_close(nni_dialer *);
-extern int nni_dialer_start(nni_dialer *, int);
+extern int nni_dialer_sys_init(void);
+extern void nni_dialer_sys_fini(void);
+extern int nni_dialer_find(nni_dialer **, uint32_t);
+extern int nni_dialer_hold(nni_dialer *);
+extern void nni_dialer_rele(nni_dialer *);
+extern uint32_t nni_dialer_id(nni_dialer *);
+extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
+extern void nni_dialer_close(nni_dialer *);
+extern int nni_dialer_start(nni_dialer *, int);
+extern nni_sock *nni_dialer_sock(nni_dialer *);
extern int nni_dialer_setopt(
nni_dialer *, const char *, const void *, size_t, nni_opt_type);
diff --git a/src/core/listener.c b/src/core/listener.c
index c2e68bb5..1fe6faab 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -110,7 +110,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
- ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) ||
+ ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) ||
((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) ||
((rv = nni_sock_add_listener(s, l)) != 0)) {
nni_listener_destroy(l);
@@ -284,6 +284,12 @@ nni_listener_start(nni_listener *l, int flags)
return (0);
}
+nni_sock *
+nni_listener_sock(nni_listener *l)
+{
+ return (l->l_sock);
+}
+
int
nni_listener_setopt(nni_listener *l, const char *name, const void *val,
size_t sz, nni_opt_type t)
diff --git a/src/core/listener.h b/src/core/listener.h
index 5806fc72..67a782bd 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -20,6 +20,7 @@ extern uint32_t nni_listener_id(nni_listener *);
extern int nni_listener_create(nni_listener **, nni_sock *, const char *);
extern void nni_listener_close(nni_listener *);
extern int nni_listener_start(nni_listener *, int);
+extern nni_sock *nni_listener_sock(nni_listener *);
extern int nni_listener_setopt(
nni_listener *, const char *, const void *, size_t, nni_opt_type);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index b79b6a3a..bdbc76e0 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -216,7 +216,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
nni_mtx_unlock(&nni_pipe_lk);
- if ((rv != 0) ||
+ if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
nni_pipe_close(p);
nni_pipe_rele(p);
diff --git a/src/core/transport.h b/src/core/transport.h
index 8b08e366..8e5bba48 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -11,13 +11,6 @@
#ifndef CORE_TRANSPORT_H
#define CORE_TRANSPORT_H
-// Endpoint modes. Currently used by transports. Remove this when we make
-// transport dialers and listeners explicit.
-enum nni_ep_mode {
- NNI_EP_MODE_DIAL = 1,
- NNI_EP_MODE_LISTEN = 2,
-};
-
// We quite intentionally use a signature where the upper word is nonzero,
// which ensures that if we get garbage we will reject it. This is more
// likely to mismatch than all zero bytes would. The actual version is
@@ -31,7 +24,8 @@ enum nni_ep_mode {
#define NNI_TRANSPORT_V2 0x54520002
#define NNI_TRANSPORT_V3 0x54520003
#define NNI_TRANSPORT_V4 0x54520004
-#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4
+#define NNI_TRANSPORT_V5 0x54520005
+#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V5
// Option handlers.
struct nni_tran_option {
@@ -68,7 +62,7 @@ struct nni_tran_option {
struct nni_tran_dialer_ops {
// d_init creates a vanilla dialer. The value created is
// used for the first argument for all other dialer functions.
- int (*d_init)(void **, nni_url *, nni_sock *);
+ int (*d_init)(void **, nni_url *, nni_dialer *);
// d_fini frees the resources associated with the dialer.
// The dialer will already have been closed.
@@ -93,7 +87,7 @@ struct nni_tran_dialer_ops {
struct nni_tran_listener_ops {
// l_init creates a vanilla listener. The value created is
// used for the first argument for all other listener functions.
- int (*l_init)(void **, nni_url *, nni_sock *);
+ int (*l_init)(void **, nni_url *, nni_listener *);
// l_fini frees the resources associated with the listener.
// The listener will already have been closed.
@@ -126,6 +120,11 @@ struct nni_tran_listener_ops {
// pointers back to socket or even enclosing pipe state, are not
// provided.)
struct nni_tran_pipe_ops {
+ // p_init initializes the pipe data structures. The main
+ // purpose of this is so that the pipe will see the upper
+ // layer nni_pipe and get a chance to register stats and such.
+ int (*p_init)(void *, nni_pipe *);
+
// p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used
// by the pipe. After this call returns, the system will not
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index a1553704..f0a1a269 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -32,7 +32,7 @@ typedef struct ws_header {
struct nni_ws {
nni_list_node node;
nni_reap_item reap;
- int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ bool server;
bool closed;
bool ready;
bool wclose;
@@ -296,10 +296,10 @@ ws_msg_init_control(
frame->buf = frame->sdata;
frame->bufsz = 0;
- if (ws->mode == NNI_EP_MODE_DIAL) {
- ws_mask_frame(frame);
- } else {
+ if (ws->server) {
frame->masked = false;
+ } else {
+ ws_mask_frame(frame);
}
wm->aio = NULL;
@@ -375,10 +375,10 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
frame->hlen += 8;
}
- if (ws->mode == NNI_EP_MODE_DIAL) {
- ws_mask_frame(frame);
- } else {
+ if (ws->server) {
frame->masked = false;
+ } else {
+ ws_mask_frame(frame);
}
} while (len);
@@ -960,12 +960,12 @@ ws_read_cb(void *arg)
// here, because we don't have data yet.)
if (frame->masked) {
memcpy(frame->mask, frame->head + frame->hlen - 4, 4);
- if (ws->mode == NNI_EP_MODE_DIAL) {
+ if (!ws->server) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
nni_mtx_unlock(&ws->mtx);
return;
}
- } else if (ws->mode == NNI_EP_MODE_LISTEN) {
+ } else if (ws->server) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
nni_mtx_unlock(&ws->mtx);
return;
@@ -1369,13 +1369,10 @@ ws_http_cb(void *arg)
nni_ws * ws = arg;
nni_aio *aio = ws->httpaio;
- switch (ws->mode) {
- case NNI_EP_MODE_LISTEN:
+ if (ws->server) {
ws_http_cb_listener(ws, aio);
- break;
- case NNI_EP_MODE_DIAL:
+ } else {
ws_http_cb_dialer(ws, aio);
- break;
}
}
@@ -1586,7 +1583,7 @@ ws_handler(nni_aio *aio)
ws->http = conn;
ws->req = req;
ws->res = res;
- ws->mode = NNI_EP_MODE_LISTEN;
+ ws->server = true;
ws->maxframe = l->maxframe;
// XXX: Inherit fragmentation? (Frag is limited for now).
@@ -2071,7 +2068,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
}
ws->dialer = d;
ws->useraio = aio;
- ws->mode = NNI_EP_MODE_DIAL;
+ ws->server = false;
ws->maxframe = d->maxframe;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 3194a56d..ab6486bd 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -33,6 +33,7 @@ struct nni_inproc_pipe {
nni_inproc_pair *pair;
nni_msgq * rq;
nni_msgq * wq;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -57,6 +58,8 @@ struct nni_inproc_ep {
nni_list aios;
size_t rcvmax;
nni_mtx mtx;
+ nni_dialer * ndialer;
+ nni_listener *nlistener;
};
// nni_inproc is our global state - this contains the list of active endpoints
@@ -103,7 +106,7 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
}
static int
-nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
+nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
{
nni_inproc_pipe *pipe;
@@ -120,6 +123,14 @@ nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep)
return (0);
}
+static int
+nni_inproc_pipe_init(void *arg, nni_pipe *p)
+{
+ nni_inproc_pipe *pipe = arg;
+ pipe->npipe = p;
+ return (0);
+}
+
static void
nni_inproc_pipe_fini(void *arg)
{
@@ -196,9 +207,10 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static int
-nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer)
{
nni_inproc_ep *ep;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -208,6 +220,7 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
ep->listener = false;
ep->proto = nni_sock_proto_id(sock);
ep->rcvmax = 0;
+ ep->ndialer = ndialer;
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -215,19 +228,22 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock)
*epp = ep;
return (0);
}
+
static int
-nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock)
+nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener)
{
nni_inproc_ep *ep;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
- ep->listener = true;
- ep->proto = nni_sock_proto_id(sock);
- ep->rcvmax = 0;
+ ep->listener = true;
+ ep->proto = nni_sock_proto_id(sock);
+ ep->rcvmax = 0;
+ ep->nlistener = nlistener;
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -330,8 +346,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv)
nni_mtx_init(&pair->mx);
spipe = cpipe = NULL;
- if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) ||
- ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) ||
+ if (((rv = nni_inproc_pipe_alloc(&cpipe, cli)) != 0) ||
+ ((rv = nni_inproc_pipe_alloc(&spipe, srv)) != 0) ||
((rv = nni_msgq_init(&pair->q[0], 1)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) {
@@ -522,6 +538,7 @@ static nni_tran_option nni_inproc_pipe_options[] = {
};
static nni_tran_pipe_ops nni_inproc_pipe_ops = {
+ .p_init = nni_inproc_pipe_init,
.p_fini = nni_inproc_pipe_fini,
.p_send = nni_inproc_pipe_send,
.p_recv = nni_inproc_pipe_recv,
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 05e0b895..58fff1a7 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -32,6 +32,7 @@ struct ipctran_pipe {
bool closed;
nni_sockaddr sa;
ipctran_ep * ep;
+ nni_pipe * npipe;
nni_list_node node;
nni_atomic_flag reaped;
nni_reap_item reap;
@@ -64,6 +65,8 @@ struct ipctran_ep {
nni_ipc_dialer * dialer;
nni_ipc_listener *listener;
nni_reap_item reap;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void ipctran_pipe_send_start(ipctran_pipe *);
@@ -113,6 +116,14 @@ ipctran_pipe_stop(void *arg)
nni_aio_stop(p->connaio);
}
+static int
+ipctran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ ipctran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
ipctran_pipe_fini(void *arg)
{
@@ -154,7 +165,7 @@ ipctran_pipe_reap(ipctran_pipe *p)
}
static int
-ipctran_pipe_init(ipctran_pipe **pipep, ipctran_ep *ep)
+ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep)
{
ipctran_pipe *p;
int rv;
@@ -749,11 +760,12 @@ ipctran_ep_close(void *arg)
}
static int
-ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
ipctran_ep *ep;
int rv;
size_t sz;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -764,6 +776,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
sz = sizeof(ep->sa.s_ipc.sa_path);
ep->sa.s_ipc.sa_family = NNG_AF_IPC;
ep->proto = nni_sock_proto_id(sock);
+ ep->ndialer = ndialer;
if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
ipctran_ep_fini(ep);
@@ -780,11 +793,12 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
}
static int
-ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock)
+ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener)
{
ipctran_ep *ep;
int rv;
size_t sz;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -795,6 +809,7 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock)
sz = sizeof(ep->sa.s_ipc.sa_path);
ep->sa.s_ipc.sa_family = NNG_AF_IPC;
ep->proto = nni_sock_proto_id(sock);
+ ep->nlistener = nlistener;
if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) {
ipctran_ep_fini(ep);
@@ -821,7 +836,7 @@ ipctran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_init(&p, ep)) != 0) {
+ if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -887,7 +902,7 @@ ipctran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_init(&p, ep)) != 0) {
+ if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1006,6 +1021,7 @@ static nni_tran_option ipctran_pipe_options[] = {
};
static nni_tran_pipe_ops ipctran_pipe_ops = {
+ .p_init = ipctran_pipe_init,
.p_fini = ipctran_pipe_fini,
.p_stop = ipctran_pipe_stop,
.p_send = ipctran_pipe_send,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 017b4ccf..fed8872c 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -23,6 +23,7 @@ typedef struct tcptran_ep tcptran_ep;
// tcp_pipe is one end of a TCP connection.
struct tcptran_pipe {
nni_tcp_conn * conn;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -69,6 +70,8 @@ struct tcptran_ep {
nni_reap_item reap;
nni_tcp_dialer * dialer;
nni_tcp_listener *listener;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void tcptran_pipe_send_start(tcptran_pipe *);
@@ -121,6 +124,14 @@ tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rslvaio);
}
+static int
+tcptran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ tcptran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
tcptran_pipe_fini(void *arg)
{
@@ -162,7 +173,7 @@ tcptran_pipe_reap(tcptran_pipe *p)
}
static int
-tcptran_pipe_init(tcptran_pipe **pipep, tcptran_ep *ep)
+tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
{
tcptran_pipe *p;
int rv;
@@ -750,13 +761,14 @@ tcptran_ep_close(void *arg)
}
static int
-tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+tcptran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
tcptran_ep * ep;
int rv;
uint16_t af;
char * host;
nng_sockaddr srcsa;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if (strcmp(url->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -788,6 +800,7 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
ep->nodelay = true;
ep->keepalive = false;
ep->url = url;
+ ep->ndialer = ndialer;
// Detect an embedded local interface name in the hostname. This
// syntax is only valid with dialers.
@@ -839,13 +852,14 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
return (0);
}
static int
-tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
+tcptran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
{
tcptran_ep *ep;
int rv;
char * host;
nni_aio * aio;
uint16_t af;
+ nni_sock * sock = nni_listener_sock(nlistener);
if (strcmp(url->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -876,6 +890,7 @@ tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
ep->nodelay = true;
ep->keepalive = false;
ep->url = url;
+ ep->nlistener = nlistener;
if (strlen(url->u_hostname) == 0) {
host = NULL;
@@ -926,7 +941,7 @@ tcptran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1065,7 +1080,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1135,6 +1150,7 @@ static nni_tran_option tcptran_pipe_options[] = {
};
static nni_tran_pipe_ops tcptran_pipe_ops = {
+ .p_init = tcptran_pipe_init,
.p_fini = tcptran_pipe_fini,
.p_stop = tcptran_pipe_stop,
.p_send = tcptran_pipe_send,
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 49bc932d..cf478796 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -30,6 +30,7 @@ typedef struct tlstran_pipe tlstran_pipe;
// tlstran_pipe is one end of a TLS connection.
struct tlstran_pipe {
nni_tls * tls;
+ nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
@@ -77,6 +78,8 @@ struct tlstran_ep {
nni_tcp_listener *listener;
nng_sockaddr sa;
nng_sockaddr bsa;
+ nni_dialer * ndialer;
+ nni_listener * nlistener;
};
static void tlstran_pipe_send_start(tlstran_pipe *);
@@ -125,6 +128,14 @@ tlstran_pipe_stop(void *arg)
nni_aio_stop(p->rslvaio);
}
+static int
+tlstran_pipe_init(void *arg, nni_pipe *npipe)
+{
+ tlstran_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
tlstran_pipe_fini(void *arg)
{
@@ -153,7 +164,7 @@ tlstran_pipe_fini(void *arg)
}
static int
-tlstran_pipe_init(tlstran_pipe **pipep, tlstran_ep *ep)
+tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep)
{
tlstran_pipe *p;
int rv;
@@ -742,13 +753,14 @@ tlstran_ep_close(void *arg)
}
static int
-tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
+tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
tlstran_ep *ep;
int rv;
uint16_t af;
char * host = url->u_hostname;
char * port = url->u_port;
+ nni_sock * sock = nni_dialer_sock(ndialer);
if (strcmp(url->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -782,6 +794,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
ep->proto = nni_sock_proto_id(sock);
ep->nodelay = true;
ep->keepalive = false;
+ ep->ndialer = ndialer;
if (((rv = nni_tcp_dialer_init(&ep->dialer)) != 0) ||
((rv = nni_tls_config_init(&ep->cfg, NNG_TLS_MODE_CLIENT)) != 0) ||
@@ -796,13 +809,14 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock)
}
static int
-tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
+tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
{
tlstran_ep *ep;
int rv;
uint16_t af;
char * host = url->u_hostname;
nni_aio * aio;
+ nni_sock * sock = nni_listener_sock(nlistener);
if (strcmp(url->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
@@ -835,6 +849,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock)
ep->proto = nni_sock_proto_id(sock);
ep->nodelay = true;
ep->keepalive = false;
+ ep->nlistener = nlistener;
if (strlen(host) == 0) {
host = NULL;
@@ -879,7 +894,7 @@ tlstran_ep_connect(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -923,7 +938,7 @@ tlstran_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_init(&p, ep)) != 0) {
+ if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -1224,6 +1239,7 @@ static nni_tran_option tlstran_pipe_options[] = {
};
static nni_tran_pipe_ops tlstran_pipe_ops = {
+ .p_init = tlstran_pipe_init,
.p_fini = tlstran_pipe_fini,
.p_stop = tlstran_pipe_stop,
.p_send = tlstran_pipe_send,
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 9b3f67f4..baa9ea34 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -42,6 +42,7 @@ struct ws_dialer {
nni_ws_dialer *dialer;
nni_list headers; // req headers
bool started;
+ nni_dialer * ndialer;
};
struct ws_listener {
@@ -55,19 +56,21 @@ struct ws_listener {
nni_ws_listener *listener;
nni_list headers; // res headers
bool started;
+ nni_listener * nlistener;
};
struct ws_pipe {
- nni_mtx mtx;
- size_t rcvmax;
- bool closed;
- uint16_t rproto;
- uint16_t lproto;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
- nni_aio *txaio;
- nni_aio *rxaio;
- nni_ws * ws;
+ nni_mtx mtx;
+ nni_pipe *npipe;
+ size_t rcvmax;
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio * user_txaio;
+ nni_aio * user_rxaio;
+ nni_aio * txaio;
+ nni_aio * rxaio;
+ nni_ws * ws;
};
static void
@@ -201,6 +204,14 @@ ws_pipe_stop(void *arg)
nni_aio_stop(p->txaio);
}
+static int
+ws_pipe_init(void *arg, nni_pipe *npipe)
+{
+ ws_pipe *p = arg;
+ p->npipe = npipe;
+ return (0);
+}
+
static void
ws_pipe_fini(void *arg)
{
@@ -230,7 +241,7 @@ ws_pipe_close(void *arg)
}
static int
-ws_pipe_init(ws_pipe **pipep, void *ws)
+ws_pipe_alloc(ws_pipe **pipep, void *ws)
{
ws_pipe *p;
int rv;
@@ -663,6 +674,7 @@ static nni_tran_option ws_pipe_options[] = {
};
static nni_tran_pipe_ops ws_pipe_ops = {
+ .p_init = ws_pipe_init,
.p_fini = ws_pipe_fini,
.p_stop = ws_pipe_stop,
.p_send = ws_pipe_send,
@@ -782,7 +794,7 @@ ws_connect_cb(void *arg)
NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
- } else if ((rv = ws_pipe_init(&p, ws)) != 0) {
+ } else if ((rv = ws_pipe_alloc(&p, ws)) != 0) {
nni_ws_fini(ws);
nni_aio_finish_error(uaio, rv);
} else {
@@ -835,7 +847,7 @@ ws_accept_cb(void *arg)
ws_pipe *p;
// Make a pipe
nni_aio_list_remove(uaio);
- if ((rv = ws_pipe_init(&p, ws)) != 0) {
+ if ((rv = ws_pipe_alloc(&p, ws)) != 0) {
nni_ws_close(ws);
nni_aio_finish_error(uaio, rv);
} else {
@@ -855,9 +867,10 @@ ws_accept_cb(void *arg)
}
static int
-ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
+ws_dialer_init(void **dp, nni_url *url, nni_dialer *ndialer)
{
ws_dialer * d;
+ nni_sock * s = nni_dialer_sock(ndialer);
const char *n;
int rv;
@@ -869,9 +882,10 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
nni_aio_list_init(&d->aios);
- d->lproto = nni_sock_proto_id(s);
- d->rproto = nni_sock_peer_id(s);
- n = nni_sock_peer_name(s);
+ d->lproto = nni_sock_proto_id(s);
+ d->rproto = nni_sock_peer_id(s);
+ d->ndialer = ndialer;
+ n = nni_sock_peer_name(s);
if (((rv = nni_ws_dialer_init(&d->dialer, url)) != 0) ||
((rv = nni_aio_init(&d->connaio, ws_connect_cb, d)) != 0) ||
@@ -886,11 +900,12 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s)
}
static int
-ws_listener_init(void **lp, nni_url *url, nni_sock *sock)
+ws_listener_init(void **lp, nni_url *url, nni_listener *nlistener)
{
ws_listener *l;
const char * n;
int rv;
+ nni_sock * sock = nni_listener_sock(nlistener);
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
@@ -900,9 +915,10 @@ ws_listener_init(void **lp, nni_url *url, nni_sock *sock)
nni_aio_list_init(&l->aios);
- l->lproto = nni_sock_proto_id(sock);
- l->rproto = nni_sock_peer_id(sock);
- n = nni_sock_proto_name(sock);
+ l->lproto = nni_sock_proto_id(sock);
+ l->rproto = nni_sock_peer_id(sock);
+ n = nni_sock_proto_name(sock);
+ l->nlistener = nlistener;
if (((rv = nni_ws_listener_init(&l->listener, url)) != 0) ||
((rv = nni_aio_init(&l->accaio, ws_accept_cb, l)) != 0) ||
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 0a68c1ca..0b0303d9 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -186,6 +186,7 @@ struct zt_pipe {
nni_list_node zp_link;
const char * zp_addr;
zt_node * zp_ztn;
+ nni_pipe * zp_npipe;
uint64_t zp_nwid;
uint64_t zp_laddr;
uint64_t zp_raddr;
@@ -219,7 +220,6 @@ struct zt_ep {
char ze_home[NNG_MAXADDRLEN]; // should be enough
zt_node * ze_ztn;
uint64_t ze_nwid;
- int ze_mode;
int ze_running;
uint64_t ze_raddr; // remote node address
uint64_t ze_laddr; // local node address
@@ -241,9 +241,11 @@ struct zt_ep {
// established connection/pipe unless the application calls
// accept. Since the "application" is our library, that should
// be pretty much as fast we can run.
- zt_creq ze_creqs[zt_listenq];
- int ze_creq_head;
- int ze_creq_tail;
+ zt_creq ze_creqs[zt_listenq];
+ int ze_creq_head;
+ int ze_creq_tail;
+ nni_dialer * ze_ndialer;
+ nni_listener *ze_nlistener;
};
// Locking strategy. At present the ZeroTier core is not reentrant or fully
@@ -274,7 +276,7 @@ static void zt_ep_send_conn_req(zt_ep *);
static void zt_ep_conn_req_cb(void *);
static void zt_ep_doaccept(zt_ep *);
static void zt_pipe_dorecv(zt_pipe *);
-static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t);
+static int zt_pipe_alloc(zt_pipe **, zt_ep *, uint64_t, uint64_t);
static void zt_pipe_ping_cb(void *);
static void zt_fraglist_clear(zt_fraglist *);
static void zt_fraglist_free(zt_fraglist *);
@@ -524,7 +526,7 @@ zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid,
}
ep->ze_mtu = config->mtu;
- if ((ep->ze_mode == NNI_EP_MODE_DIAL) &&
+ if ((ep->ze_ndialer != NULL) &&
(nni_list_first(&ep->ze_aios) != NULL)) {
zt_ep_send_conn_req(ep);
}
@@ -642,7 +644,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int rv;
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
+ if (ep->ze_ndialer == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -665,7 +667,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
return;
}
- if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) {
+ if ((rv = zt_pipe_alloc(&p, ep, raddr, ep->ze_laddr)) != 0) {
// We couldn't create the pipe, just drop it.
nni_aio_finish_error(aio, rv);
return;
@@ -689,7 +691,7 @@ zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
zt_pipe *p;
int i;
- if (ep->ze_mode != NNI_EP_MODE_LISTEN) {
+ if (ep->ze_nlistener == NULL) {
zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr,
zt_err_proto, "Inappropriate operation");
return;
@@ -742,8 +744,8 @@ zt_ep_recv_error(zt_ep *ep, const uint8_t *data, size_t len)
// is that when we have an outstanding CON_REQ, we would like to
// process that appropriately.
- if (ep->ze_mode != NNI_EP_MODE_DIAL) {
- // Drop it.
+ if (ep->ze_ndialer == NULL) {
+ // Not a dialer. Drop it.
return;
}
@@ -1637,6 +1639,14 @@ zt_pipe_close(void *arg)
nni_mtx_unlock(&zt_lk);
}
+static int
+zt_pipe_init(void *arg, nni_pipe *npipe)
+{
+ zt_pipe *p = arg;
+ p->zp_npipe = npipe;
+ return (0);
+}
+
static void
zt_pipe_fini(void *arg)
{
@@ -1669,7 +1679,7 @@ zt_pipe_reap(zt_pipe *p)
}
static int
-zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
+zt_pipe_alloc(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
{
zt_pipe *p;
int rv;
@@ -1698,10 +1708,12 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr)
p->zp_ping_try = 0;
nni_atomic_flag_reset(&p->zp_reaped);
- if (ep->ze_mode == NNI_EP_MODE_DIAL) {
- rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
- } else {
+ if (ep->ze_nlistener != NULL) {
+ // listener
rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p);
+ } else {
+ // dialer
+ rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
}
if ((rv != 0) ||
((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) ||
@@ -2129,7 +2141,8 @@ zt_parsedec(const char **sp, uint64_t *valp)
}
static int
-zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
+zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer,
+ nni_listener *nlistener)
{
zt_ep * ep;
uint64_t node;
@@ -2141,7 +2154,6 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (NNG_ENOMEM);
}
- ep->ze_mode = mode;
ep->ze_mtu = ZT_MIN_MTU;
ep->ze_aio = NULL;
ep->ze_ping_tries = zt_ping_tries;
@@ -2149,6 +2161,8 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
ep->ze_conn_time = zt_conn_time;
ep->ze_conn_tries = zt_conn_tries;
ep->ze_proto = nni_sock_proto_id(sock);
+ ep->ze_ndialer = ndialer;
+ ep->ze_nlistener = nlistener;
nni_aio_list_init(&ep->ze_aios);
@@ -2181,26 +2195,23 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
// Parse the URL.
- switch (mode) {
- case NNI_EP_MODE_DIAL:
- /// We have to have a non-zero port number to connect to.
+ if (nlistener != NULL) {
+ // listener
+ ep->ze_laddr = node;
+ ep->ze_laddr <<= 24;
+ ep->ze_laddr |= port;
+ ep->ze_raddr = 0;
+ ep->ze_nlistener = nlistener;
+ } else {
+ // dialer
if (port == 0) {
return (NNG_EADDRINVAL);
}
ep->ze_raddr = node;
ep->ze_raddr <<= 24;
ep->ze_raddr |= port;
- ep->ze_laddr = 0;
- break;
- case NNI_EP_MODE_LISTEN:
- ep->ze_laddr = node;
- ep->ze_laddr <<= 24;
- ep->ze_laddr |= port;
- ep->ze_raddr = 0;
- break;
- default:
- NNI_ASSERT(0);
- break;
+ ep->ze_laddr = 0;
+ ep->ze_ndialer = ndialer;
}
nni_mtx_lock(&zt_lk);
@@ -2217,15 +2228,15 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
static int
-zt_dialer_init(void **epp, nni_url *url, nni_sock *sock)
+zt_dialer_init(void **epp, nni_url *url, nni_dialer *d)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL));
+ return (zt_ep_init(epp, url, nni_dialer_sock(d), d, NULL));
}
static int
-zt_listener_init(void **epp, nni_url *url, nni_sock *sock)
+zt_listener_init(void **epp, nni_url *url, nni_listener *l)
{
- return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN));
+ return (zt_ep_init(epp, url, nni_listener_sock(l), NULL, l));
}
static void
@@ -2380,7 +2391,7 @@ zt_ep_doaccept(zt_ep *ep)
// We remove this AIO. This keeps it from being canceled.
nni_aio_list_remove(aio);
- rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr);
+ rv = zt_pipe_alloc(&p, ep, creq.cr_raddr, ep->ze_laddr);
if (rv != 0) {
zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr,
ep->ze_laddr, zt_err_unknown,
@@ -2439,8 +2450,6 @@ zt_ep_conn_req_cb(void *arg)
nni_aio *uaio;
int rv;
- NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL);
-
nni_mtx_lock(&zt_lk);
ep->ze_creq_active = 0;
@@ -2642,7 +2651,7 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t)
uint64_t addr;
nni_mtx_lock(&zt_lk);
- addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr;
+ addr = ep->ze_nlistener != NULL ? ep->ze_laddr : ep->ze_raddr;
snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u",
(unsigned long long) addr >> zt_port_shift,
(unsigned long long) ep->ze_nwid,
@@ -2957,6 +2966,7 @@ static nni_tran_option zt_pipe_options[] = {
};
static nni_tran_pipe_ops zt_pipe_ops = {
+ .p_init = zt_pipe_init,
.p_fini = zt_pipe_fini,
.p_send = zt_pipe_send,
.p_recv = zt_pipe_recv,