diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/dialer.c | 8 | ||||
| -rw-r--r-- | src/core/dialer.h | 19 | ||||
| -rw-r--r-- | src/core/listener.c | 8 | ||||
| -rw-r--r-- | src/core/listener.h | 1 | ||||
| -rw-r--r-- | src/core/pipe.c | 2 | ||||
| -rw-r--r-- | src/core/transport.h | 19 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 29 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 33 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 26 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 26 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 26 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 58 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 88 |
13 files changed, 222 insertions, 121 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index 77ecbe7f..b76bff6d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -112,7 +112,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || - ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || + ((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) || ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { nni_dialer_destroy(d); @@ -302,6 +302,12 @@ nni_dialer_start(nni_dialer *d, int flags) return (rv); } +nni_sock * +nni_dialer_sock(nni_dialer *d) +{ + return (d->d_sock); +} + int nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, nni_opt_type t) diff --git a/src/core/dialer.h b/src/core/dialer.h index 4a3e127c..361c6d5e 100644 --- a/src/core/dialer.h +++ b/src/core/dialer.h @@ -11,15 +11,16 @@ #ifndef CORE_DIALER_H #define CORE_DIALER_H -extern int nni_dialer_sys_init(void); -extern void nni_dialer_sys_fini(void); -extern int nni_dialer_find(nni_dialer **, uint32_t); -extern int nni_dialer_hold(nni_dialer *); -extern void nni_dialer_rele(nni_dialer *); -extern uint32_t nni_dialer_id(nni_dialer *); -extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); -extern void nni_dialer_close(nni_dialer *); -extern int nni_dialer_start(nni_dialer *, int); +extern int nni_dialer_sys_init(void); +extern void nni_dialer_sys_fini(void); +extern int nni_dialer_find(nni_dialer **, uint32_t); +extern int nni_dialer_hold(nni_dialer *); +extern void nni_dialer_rele(nni_dialer *); +extern uint32_t nni_dialer_id(nni_dialer *); +extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); +extern void nni_dialer_close(nni_dialer *); +extern int nni_dialer_start(nni_dialer *, int); +extern nni_sock *nni_dialer_sock(nni_dialer *); extern int nni_dialer_setopt( nni_dialer *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/listener.c b/src/core/listener.c index c2e68bb5..1fe6faab 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -110,7 +110,7 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || - ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || + ((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) || ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { nni_listener_destroy(l); @@ -284,6 +284,12 @@ nni_listener_start(nni_listener *l, int flags) return (0); } +nni_sock * +nni_listener_sock(nni_listener *l) +{ + return (l->l_sock); +} + int nni_listener_setopt(nni_listener *l, const char *name, const void *val, size_t sz, nni_opt_type t) diff --git a/src/core/listener.h b/src/core/listener.h index 5806fc72..67a782bd 100644 --- a/src/core/listener.h +++ b/src/core/listener.h @@ -20,6 +20,7 @@ extern uint32_t nni_listener_id(nni_listener *); extern int nni_listener_create(nni_listener **, nni_sock *, const char *); extern void nni_listener_close(nni_listener *); extern int nni_listener_start(nni_listener *, int); +extern nni_sock *nni_listener_sock(nni_listener *); extern int nni_listener_setopt( nni_listener *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/pipe.c b/src/core/pipe.c index b79b6a3a..bdbc76e0 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -216,7 +216,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) } nni_mtx_unlock(&nni_pipe_lk); - if ((rv != 0) || + if ((rv != 0) || ((rv = tran->tran_pipe->p_init(tdata, p)) != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) { nni_pipe_close(p); nni_pipe_rele(p); diff --git a/src/core/transport.h b/src/core/transport.h index 8b08e366..8e5bba48 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -11,13 +11,6 @@ #ifndef CORE_TRANSPORT_H #define CORE_TRANSPORT_H -// Endpoint modes. Currently used by transports. Remove this when we make -// transport dialers and listeners explicit. -enum nni_ep_mode { - NNI_EP_MODE_DIAL = 1, - NNI_EP_MODE_LISTEN = 2, -}; - // We quite intentionally use a signature where the upper word is nonzero, // which ensures that if we get garbage we will reject it. This is more // likely to mismatch than all zero bytes would. The actual version is @@ -31,7 +24,8 @@ enum nni_ep_mode { #define NNI_TRANSPORT_V2 0x54520002 #define NNI_TRANSPORT_V3 0x54520003 #define NNI_TRANSPORT_V4 0x54520004 -#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V4 +#define NNI_TRANSPORT_V5 0x54520005 +#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V5 // Option handlers. struct nni_tran_option { @@ -68,7 +62,7 @@ struct nni_tran_option { struct nni_tran_dialer_ops { // d_init creates a vanilla dialer. The value created is // used for the first argument for all other dialer functions. - int (*d_init)(void **, nni_url *, nni_sock *); + int (*d_init)(void **, nni_url *, nni_dialer *); // d_fini frees the resources associated with the dialer. // The dialer will already have been closed. @@ -93,7 +87,7 @@ struct nni_tran_dialer_ops { struct nni_tran_listener_ops { // l_init creates a vanilla listener. The value created is // used for the first argument for all other listener functions. - int (*l_init)(void **, nni_url *, nni_sock *); + int (*l_init)(void **, nni_url *, nni_listener *); // l_fini frees the resources associated with the listener. // The listener will already have been closed. @@ -126,6 +120,11 @@ struct nni_tran_listener_ops { // pointers back to socket or even enclosing pipe state, are not // provided.) struct nni_tran_pipe_ops { + // p_init initializes the pipe data structures. The main + // purpose of this is so that the pipe will see the upper + // layer nni_pipe and get a chance to register stats and such. + int (*p_init)(void *, nni_pipe *); + // p_fini destroys the pipe. This should clean up all local // resources, including closing files and freeing memory, used // by the pipe. After this call returns, the system will not diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index a1553704..f0a1a269 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -32,7 +32,7 @@ typedef struct ws_header { struct nni_ws { nni_list_node node; nni_reap_item reap; - int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN + bool server; bool closed; bool ready; bool wclose; @@ -296,10 +296,10 @@ ws_msg_init_control( frame->buf = frame->sdata; frame->bufsz = 0; - if (ws->mode == NNI_EP_MODE_DIAL) { - ws_mask_frame(frame); - } else { + if (ws->server) { frame->masked = false; + } else { + ws_mask_frame(frame); } wm->aio = NULL; @@ -375,10 +375,10 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) frame->hlen += 8; } - if (ws->mode == NNI_EP_MODE_DIAL) { - ws_mask_frame(frame); - } else { + if (ws->server) { frame->masked = false; + } else { + ws_mask_frame(frame); } } while (len); @@ -960,12 +960,12 @@ ws_read_cb(void *arg) // here, because we don't have data yet.) if (frame->masked) { memcpy(frame->mask, frame->head + frame->hlen - 4, 4); - if (ws->mode == NNI_EP_MODE_DIAL) { + if (!ws->server) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); nni_mtx_unlock(&ws->mtx); return; } - } else if (ws->mode == NNI_EP_MODE_LISTEN) { + } else if (ws->server) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); nni_mtx_unlock(&ws->mtx); return; @@ -1369,13 +1369,10 @@ ws_http_cb(void *arg) nni_ws * ws = arg; nni_aio *aio = ws->httpaio; - switch (ws->mode) { - case NNI_EP_MODE_LISTEN: + if (ws->server) { ws_http_cb_listener(ws, aio); - break; - case NNI_EP_MODE_DIAL: + } else { ws_http_cb_dialer(ws, aio); - break; } } @@ -1586,7 +1583,7 @@ ws_handler(nni_aio *aio) ws->http = conn; ws->req = req; ws->res = res; - ws->mode = NNI_EP_MODE_LISTEN; + ws->server = true; ws->maxframe = l->maxframe; // XXX: Inherit fragmentation? (Frag is limited for now). @@ -2071,7 +2068,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) } ws->dialer = d; ws->useraio = aio; - ws->mode = NNI_EP_MODE_DIAL; + ws->server = false; ws->maxframe = d->maxframe; nni_list_append(&d->wspend, ws); nni_http_client_connect(d->client, ws->connaio); diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 3194a56d..ab6486bd 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -33,6 +33,7 @@ struct nni_inproc_pipe { nni_inproc_pair *pair; nni_msgq * rq; nni_msgq * wq; + nni_pipe * npipe; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -57,6 +58,8 @@ struct nni_inproc_ep { nni_list aios; size_t rcvmax; nni_mtx mtx; + nni_dialer * ndialer; + nni_listener *nlistener; }; // nni_inproc is our global state - this contains the list of active endpoints @@ -103,7 +106,7 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) } static int -nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) +nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep) { nni_inproc_pipe *pipe; @@ -120,6 +123,14 @@ nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) return (0); } +static int +nni_inproc_pipe_init(void *arg, nni_pipe *p) +{ + nni_inproc_pipe *pipe = arg; + pipe->npipe = p; + return (0); +} + static void nni_inproc_pipe_fini(void *arg) { @@ -196,9 +207,10 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) } static int -nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock) +nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) { nni_inproc_ep *ep; + nni_sock * sock = nni_dialer_sock(ndialer); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -208,6 +220,7 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock) ep->listener = false; ep->proto = nni_sock_proto_id(sock); ep->rcvmax = 0; + ep->ndialer = ndialer; NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -215,19 +228,22 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock) *epp = ep; return (0); } + static int -nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock) +nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) { nni_inproc_ep *ep; + nni_sock * sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&ep->mtx); - ep->listener = true; - ep->proto = nni_sock_proto_id(sock); - ep->rcvmax = 0; + ep->listener = true; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; + ep->nlistener = nlistener; NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -330,8 +346,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) nni_mtx_init(&pair->mx); spipe = cpipe = NULL; - if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || - ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + if (((rv = nni_inproc_pipe_alloc(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_alloc(&spipe, srv)) != 0) || ((rv = nni_msgq_init(&pair->q[0], 1)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) { @@ -522,6 +538,7 @@ static nni_tran_option nni_inproc_pipe_options[] = { }; static nni_tran_pipe_ops nni_inproc_pipe_ops = { + .p_init = nni_inproc_pipe_init, .p_fini = nni_inproc_pipe_fini, .p_send = nni_inproc_pipe_send, .p_recv = nni_inproc_pipe_recv, diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 05e0b895..58fff1a7 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -32,6 +32,7 @@ struct ipctran_pipe { bool closed; nni_sockaddr sa; ipctran_ep * ep; + nni_pipe * npipe; nni_list_node node; nni_atomic_flag reaped; nni_reap_item reap; @@ -64,6 +65,8 @@ struct ipctran_ep { nni_ipc_dialer * dialer; nni_ipc_listener *listener; nni_reap_item reap; + nni_dialer * ndialer; + nni_listener * nlistener; }; static void ipctran_pipe_send_start(ipctran_pipe *); @@ -113,6 +116,14 @@ ipctran_pipe_stop(void *arg) nni_aio_stop(p->connaio); } +static int +ipctran_pipe_init(void *arg, nni_pipe *npipe) +{ + ipctran_pipe *p = arg; + p->npipe = npipe; + return (0); +} + static void ipctran_pipe_fini(void *arg) { @@ -154,7 +165,7 @@ ipctran_pipe_reap(ipctran_pipe *p) } static int -ipctran_pipe_init(ipctran_pipe **pipep, ipctran_ep *ep) +ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep) { ipctran_pipe *p; int rv; @@ -749,11 +760,12 @@ ipctran_ep_close(void *arg) } static int -ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) +ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) { ipctran_ep *ep; int rv; size_t sz; + nni_sock * sock = nni_dialer_sock(ndialer); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -764,6 +776,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) sz = sizeof(ep->sa.s_ipc.sa_path); ep->sa.s_ipc.sa_family = NNG_AF_IPC; ep->proto = nni_sock_proto_id(sock); + ep->ndialer = ndialer; if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { ipctran_ep_fini(ep); @@ -780,11 +793,12 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) } static int -ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock) +ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) { ipctran_ep *ep; int rv; size_t sz; + nni_sock * sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -795,6 +809,7 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_sock *sock) sz = sizeof(ep->sa.s_ipc.sa_path); ep->sa.s_ipc.sa_family = NNG_AF_IPC; ep->proto = nni_sock_proto_id(sock); + ep->nlistener = nlistener; if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { ipctran_ep_fini(ep); @@ -821,7 +836,7 @@ ipctran_ep_connect(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = ipctran_pipe_init(&p, ep)) != 0) { + if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -887,7 +902,7 @@ ipctran_ep_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = ipctran_pipe_init(&p, ep)) != 0) { + if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -1006,6 +1021,7 @@ static nni_tran_option ipctran_pipe_options[] = { }; static nni_tran_pipe_ops ipctran_pipe_ops = { + .p_init = ipctran_pipe_init, .p_fini = ipctran_pipe_fini, .p_stop = ipctran_pipe_stop, .p_send = ipctran_pipe_send, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 017b4ccf..fed8872c 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -23,6 +23,7 @@ typedef struct tcptran_ep tcptran_ep; // tcp_pipe is one end of a TCP connection. struct tcptran_pipe { nni_tcp_conn * conn; + nni_pipe * npipe; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -69,6 +70,8 @@ struct tcptran_ep { nni_reap_item reap; nni_tcp_dialer * dialer; nni_tcp_listener *listener; + nni_dialer * ndialer; + nni_listener * nlistener; }; static void tcptran_pipe_send_start(tcptran_pipe *); @@ -121,6 +124,14 @@ tcptran_pipe_stop(void *arg) nni_aio_stop(p->rslvaio); } +static int +tcptran_pipe_init(void *arg, nni_pipe *npipe) +{ + tcptran_pipe *p = arg; + p->npipe = npipe; + return (0); +} + static void tcptran_pipe_fini(void *arg) { @@ -162,7 +173,7 @@ tcptran_pipe_reap(tcptran_pipe *p) } static int -tcptran_pipe_init(tcptran_pipe **pipep, tcptran_ep *ep) +tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep) { tcptran_pipe *p; int rv; @@ -750,13 +761,14 @@ tcptran_ep_close(void *arg) } static int -tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) +tcptran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) { tcptran_ep * ep; int rv; uint16_t af; char * host; nng_sockaddr srcsa; + nni_sock * sock = nni_dialer_sock(ndialer); if (strcmp(url->u_scheme, "tcp") == 0) { af = NNG_AF_UNSPEC; @@ -788,6 +800,7 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) ep->nodelay = true; ep->keepalive = false; ep->url = url; + ep->ndialer = ndialer; // Detect an embedded local interface name in the hostname. This // syntax is only valid with dialers. @@ -839,13 +852,14 @@ tcptran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) return (0); } static int -tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock) +tcptran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) { tcptran_ep *ep; int rv; char * host; nni_aio * aio; uint16_t af; + nni_sock * sock = nni_listener_sock(nlistener); if (strcmp(url->u_scheme, "tcp") == 0) { af = NNG_AF_UNSPEC; @@ -876,6 +890,7 @@ tcptran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock) ep->nodelay = true; ep->keepalive = false; ep->url = url; + ep->nlistener = nlistener; if (strlen(url->u_hostname) == 0) { host = NULL; @@ -926,7 +941,7 @@ tcptran_ep_connect(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = tcptran_pipe_init(&p, ep)) != 0) { + if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -1065,7 +1080,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = tcptran_pipe_init(&p, ep)) != 0) { + if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -1135,6 +1150,7 @@ static nni_tran_option tcptran_pipe_options[] = { }; static nni_tran_pipe_ops tcptran_pipe_ops = { + .p_init = tcptran_pipe_init, .p_fini = tcptran_pipe_fini, .p_stop = tcptran_pipe_stop, .p_send = tcptran_pipe_send, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 49bc932d..cf478796 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -30,6 +30,7 @@ typedef struct tlstran_pipe tlstran_pipe; // tlstran_pipe is one end of a TLS connection. struct tlstran_pipe { nni_tls * tls; + nni_pipe * npipe; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -77,6 +78,8 @@ struct tlstran_ep { nni_tcp_listener *listener; nng_sockaddr sa; nng_sockaddr bsa; + nni_dialer * ndialer; + nni_listener * nlistener; }; static void tlstran_pipe_send_start(tlstran_pipe *); @@ -125,6 +128,14 @@ tlstran_pipe_stop(void *arg) nni_aio_stop(p->rslvaio); } +static int +tlstran_pipe_init(void *arg, nni_pipe *npipe) +{ + tlstran_pipe *p = arg; + p->npipe = npipe; + return (0); +} + static void tlstran_pipe_fini(void *arg) { @@ -153,7 +164,7 @@ tlstran_pipe_fini(void *arg) } static int -tlstran_pipe_init(tlstran_pipe **pipep, tlstran_ep *ep) +tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep) { tlstran_pipe *p; int rv; @@ -742,13 +753,14 @@ tlstran_ep_close(void *arg) } static int -tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) +tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) { tlstran_ep *ep; int rv; uint16_t af; char * host = url->u_hostname; char * port = url->u_port; + nni_sock * sock = nni_dialer_sock(ndialer); if (strcmp(url->u_scheme, "tls+tcp") == 0) { af = NNG_AF_UNSPEC; @@ -782,6 +794,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) ep->proto = nni_sock_proto_id(sock); ep->nodelay = true; ep->keepalive = false; + ep->ndialer = ndialer; if (((rv = nni_tcp_dialer_init(&ep->dialer)) != 0) || ((rv = nni_tls_config_init(&ep->cfg, NNG_TLS_MODE_CLIENT)) != 0) || @@ -796,13 +809,14 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_sock *sock) } static int -tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock) +tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener) { tlstran_ep *ep; int rv; uint16_t af; char * host = url->u_hostname; nni_aio * aio; + nni_sock * sock = nni_listener_sock(nlistener); if (strcmp(url->u_scheme, "tls+tcp") == 0) { af = NNG_AF_UNSPEC; @@ -835,6 +849,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_sock *sock) ep->proto = nni_sock_proto_id(sock); ep->nodelay = true; ep->keepalive = false; + ep->nlistener = nlistener; if (strlen(host) == 0) { host = NULL; @@ -879,7 +894,7 @@ tlstran_ep_connect(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = tlstran_pipe_init(&p, ep)) != 0) { + if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -923,7 +938,7 @@ tlstran_ep_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - if ((rv = tlstran_pipe_init(&p, ep)) != 0) { + if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; @@ -1224,6 +1239,7 @@ static nni_tran_option tlstran_pipe_options[] = { }; static nni_tran_pipe_ops tlstran_pipe_ops = { + .p_init = tlstran_pipe_init, .p_fini = tlstran_pipe_fini, .p_stop = tlstran_pipe_stop, .p_send = tlstran_pipe_send, diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 9b3f67f4..baa9ea34 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -42,6 +42,7 @@ struct ws_dialer { nni_ws_dialer *dialer; nni_list headers; // req headers bool started; + nni_dialer * ndialer; }; struct ws_listener { @@ -55,19 +56,21 @@ struct ws_listener { nni_ws_listener *listener; nni_list headers; // res headers bool started; + nni_listener * nlistener; }; struct ws_pipe { - nni_mtx mtx; - size_t rcvmax; - bool closed; - uint16_t rproto; - uint16_t lproto; - nni_aio *user_txaio; - nni_aio *user_rxaio; - nni_aio *txaio; - nni_aio *rxaio; - nni_ws * ws; + nni_mtx mtx; + nni_pipe *npipe; + size_t rcvmax; + bool closed; + uint16_t rproto; + uint16_t lproto; + nni_aio * user_txaio; + nni_aio * user_rxaio; + nni_aio * txaio; + nni_aio * rxaio; + nni_ws * ws; }; static void @@ -201,6 +204,14 @@ ws_pipe_stop(void *arg) nni_aio_stop(p->txaio); } +static int +ws_pipe_init(void *arg, nni_pipe *npipe) +{ + ws_pipe *p = arg; + p->npipe = npipe; + return (0); +} + static void ws_pipe_fini(void *arg) { @@ -230,7 +241,7 @@ ws_pipe_close(void *arg) } static int -ws_pipe_init(ws_pipe **pipep, void *ws) +ws_pipe_alloc(ws_pipe **pipep, void *ws) { ws_pipe *p; int rv; @@ -663,6 +674,7 @@ static nni_tran_option ws_pipe_options[] = { }; static nni_tran_pipe_ops ws_pipe_ops = { + .p_init = ws_pipe_init, .p_fini = ws_pipe_fini, .p_stop = ws_pipe_stop, .p_send = ws_pipe_send, @@ -782,7 +794,7 @@ ws_connect_cb(void *arg) NNI_ASSERT(nni_list_empty(&d->aios)); if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); - } else if ((rv = ws_pipe_init(&p, ws)) != 0) { + } else if ((rv = ws_pipe_alloc(&p, ws)) != 0) { nni_ws_fini(ws); nni_aio_finish_error(uaio, rv); } else { @@ -835,7 +847,7 @@ ws_accept_cb(void *arg) ws_pipe *p; // Make a pipe nni_aio_list_remove(uaio); - if ((rv = ws_pipe_init(&p, ws)) != 0) { + if ((rv = ws_pipe_alloc(&p, ws)) != 0) { nni_ws_close(ws); nni_aio_finish_error(uaio, rv); } else { @@ -855,9 +867,10 @@ ws_accept_cb(void *arg) } static int -ws_dialer_init(void **dp, nni_url *url, nni_sock *s) +ws_dialer_init(void **dp, nni_url *url, nni_dialer *ndialer) { ws_dialer * d; + nni_sock * s = nni_dialer_sock(ndialer); const char *n; int rv; @@ -869,9 +882,10 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s) nni_aio_list_init(&d->aios); - d->lproto = nni_sock_proto_id(s); - d->rproto = nni_sock_peer_id(s); - n = nni_sock_peer_name(s); + d->lproto = nni_sock_proto_id(s); + d->rproto = nni_sock_peer_id(s); + d->ndialer = ndialer; + n = nni_sock_peer_name(s); if (((rv = nni_ws_dialer_init(&d->dialer, url)) != 0) || ((rv = nni_aio_init(&d->connaio, ws_connect_cb, d)) != 0) || @@ -886,11 +900,12 @@ ws_dialer_init(void **dp, nni_url *url, nni_sock *s) } static int -ws_listener_init(void **lp, nni_url *url, nni_sock *sock) +ws_listener_init(void **lp, nni_url *url, nni_listener *nlistener) { ws_listener *l; const char * n; int rv; + nni_sock * sock = nni_listener_sock(nlistener); if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { return (NNG_ENOMEM); @@ -900,9 +915,10 @@ ws_listener_init(void **lp, nni_url *url, nni_sock *sock) nni_aio_list_init(&l->aios); - l->lproto = nni_sock_proto_id(sock); - l->rproto = nni_sock_peer_id(sock); - n = nni_sock_proto_name(sock); + l->lproto = nni_sock_proto_id(sock); + l->rproto = nni_sock_peer_id(sock); + n = nni_sock_proto_name(sock); + l->nlistener = nlistener; if (((rv = nni_ws_listener_init(&l->listener, url)) != 0) || ((rv = nni_aio_init(&l->accaio, ws_accept_cb, l)) != 0) || diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 0a68c1ca..0b0303d9 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -186,6 +186,7 @@ struct zt_pipe { nni_list_node zp_link; const char * zp_addr; zt_node * zp_ztn; + nni_pipe * zp_npipe; uint64_t zp_nwid; uint64_t zp_laddr; uint64_t zp_raddr; @@ -219,7 +220,6 @@ struct zt_ep { char ze_home[NNG_MAXADDRLEN]; // should be enough zt_node * ze_ztn; uint64_t ze_nwid; - int ze_mode; int ze_running; uint64_t ze_raddr; // remote node address uint64_t ze_laddr; // local node address @@ -241,9 +241,11 @@ struct zt_ep { // established connection/pipe unless the application calls // accept. Since the "application" is our library, that should // be pretty much as fast we can run. - zt_creq ze_creqs[zt_listenq]; - int ze_creq_head; - int ze_creq_tail; + zt_creq ze_creqs[zt_listenq]; + int ze_creq_head; + int ze_creq_tail; + nni_dialer * ze_ndialer; + nni_listener *ze_nlistener; }; // Locking strategy. At present the ZeroTier core is not reentrant or fully @@ -274,7 +276,7 @@ static void zt_ep_send_conn_req(zt_ep *); static void zt_ep_conn_req_cb(void *); static void zt_ep_doaccept(zt_ep *); static void zt_pipe_dorecv(zt_pipe *); -static int zt_pipe_init(zt_pipe **, zt_ep *, uint64_t, uint64_t); +static int zt_pipe_alloc(zt_pipe **, zt_ep *, uint64_t, uint64_t); static void zt_pipe_ping_cb(void *); static void zt_fraglist_clear(zt_fraglist *); static void zt_fraglist_free(zt_fraglist *); @@ -524,7 +526,7 @@ zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, } ep->ze_mtu = config->mtu; - if ((ep->ze_mode == NNI_EP_MODE_DIAL) && + if ((ep->ze_ndialer != NULL) && (nni_list_first(&ep->ze_aios) != NULL)) { zt_ep_send_conn_req(ep); } @@ -642,7 +644,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) zt_pipe *p; int rv; - if (ep->ze_mode != NNI_EP_MODE_DIAL) { + if (ep->ze_ndialer == NULL) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Inappropriate operation"); return; @@ -665,7 +667,7 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) return; } - if ((rv = zt_pipe_init(&p, ep, raddr, ep->ze_laddr)) != 0) { + if ((rv = zt_pipe_alloc(&p, ep, raddr, ep->ze_laddr)) != 0) { // We couldn't create the pipe, just drop it. nni_aio_finish_error(aio, rv); return; @@ -689,7 +691,7 @@ zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) zt_pipe *p; int i; - if (ep->ze_mode != NNI_EP_MODE_LISTEN) { + if (ep->ze_nlistener == NULL) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Inappropriate operation"); return; @@ -742,8 +744,8 @@ zt_ep_recv_error(zt_ep *ep, const uint8_t *data, size_t len) // is that when we have an outstanding CON_REQ, we would like to // process that appropriately. - if (ep->ze_mode != NNI_EP_MODE_DIAL) { - // Drop it. + if (ep->ze_ndialer == NULL) { + // Not a dialer. Drop it. return; } @@ -1637,6 +1639,14 @@ zt_pipe_close(void *arg) nni_mtx_unlock(&zt_lk); } +static int +zt_pipe_init(void *arg, nni_pipe *npipe) +{ + zt_pipe *p = arg; + p->zp_npipe = npipe; + return (0); +} + static void zt_pipe_fini(void *arg) { @@ -1669,7 +1679,7 @@ zt_pipe_reap(zt_pipe *p) } static int -zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) +zt_pipe_alloc(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) { zt_pipe *p; int rv; @@ -1698,10 +1708,12 @@ zt_pipe_init(zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr) p->zp_ping_try = 0; nni_atomic_flag_reset(&p->zp_reaped); - if (ep->ze_mode == NNI_EP_MODE_DIAL) { - rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); - } else { + if (ep->ze_nlistener != NULL) { + // listener rv = nni_idhash_insert(ztn->zn_rpipes, raddr, p); + } else { + // dialer + rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p); } if ((rv != 0) || ((rv = nni_idhash_insert(ztn->zn_peers, p->zp_raddr, p)) != 0) || @@ -2129,7 +2141,8 @@ zt_parsedec(const char **sp, uint64_t *valp) } static int -zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) +zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer, + nni_listener *nlistener) { zt_ep * ep; uint64_t node; @@ -2141,7 +2154,6 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (NNG_ENOMEM); } - ep->ze_mode = mode; ep->ze_mtu = ZT_MIN_MTU; ep->ze_aio = NULL; ep->ze_ping_tries = zt_ping_tries; @@ -2149,6 +2161,8 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) ep->ze_conn_time = zt_conn_time; ep->ze_conn_tries = zt_conn_tries; ep->ze_proto = nni_sock_proto_id(sock); + ep->ze_ndialer = ndialer; + ep->ze_nlistener = nlistener; nni_aio_list_init(&ep->ze_aios); @@ -2181,26 +2195,23 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } // Parse the URL. - switch (mode) { - case NNI_EP_MODE_DIAL: - /// We have to have a non-zero port number to connect to. + if (nlistener != NULL) { + // listener + ep->ze_laddr = node; + ep->ze_laddr <<= 24; + ep->ze_laddr |= port; + ep->ze_raddr = 0; + ep->ze_nlistener = nlistener; + } else { + // dialer if (port == 0) { return (NNG_EADDRINVAL); } ep->ze_raddr = node; ep->ze_raddr <<= 24; ep->ze_raddr |= port; - ep->ze_laddr = 0; - break; - case NNI_EP_MODE_LISTEN: - ep->ze_laddr = node; - ep->ze_laddr <<= 24; - ep->ze_laddr |= port; - ep->ze_raddr = 0; - break; - default: - NNI_ASSERT(0); - break; + ep->ze_laddr = 0; + ep->ze_ndialer = ndialer; } nni_mtx_lock(&zt_lk); @@ -2217,15 +2228,15 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } static int -zt_dialer_init(void **epp, nni_url *url, nni_sock *sock) +zt_dialer_init(void **epp, nni_url *url, nni_dialer *d) { - return (zt_ep_init(epp, url, sock, NNI_EP_MODE_DIAL)); + return (zt_ep_init(epp, url, nni_dialer_sock(d), d, NULL)); } static int -zt_listener_init(void **epp, nni_url *url, nni_sock *sock) +zt_listener_init(void **epp, nni_url *url, nni_listener *l) { - return (zt_ep_init(epp, url, sock, NNI_EP_MODE_LISTEN)); + return (zt_ep_init(epp, url, nni_listener_sock(l), NULL, l)); } static void @@ -2380,7 +2391,7 @@ zt_ep_doaccept(zt_ep *ep) // We remove this AIO. This keeps it from being canceled. nni_aio_list_remove(aio); - rv = zt_pipe_init(&p, ep, creq.cr_raddr, ep->ze_laddr); + rv = zt_pipe_alloc(&p, ep, creq.cr_raddr, ep->ze_laddr); if (rv != 0) { zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr, ep->ze_laddr, zt_err_unknown, @@ -2439,8 +2450,6 @@ zt_ep_conn_req_cb(void *arg) nni_aio *uaio; int rv; - NNI_ASSERT(ep->ze_mode == NNI_EP_MODE_DIAL); - nni_mtx_lock(&zt_lk); ep->ze_creq_active = 0; @@ -2642,7 +2651,7 @@ zt_ep_get_url(void *arg, void *data, size_t *szp, nni_opt_type t) uint64_t addr; nni_mtx_lock(&zt_lk); - addr = ep->ze_mode == NNI_EP_MODE_DIAL ? ep->ze_raddr : ep->ze_laddr; + addr = ep->ze_nlistener != NULL ? ep->ze_laddr : ep->ze_raddr; snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u", (unsigned long long) addr >> zt_port_shift, (unsigned long long) ep->ze_nwid, @@ -2957,6 +2966,7 @@ static nni_tran_option zt_pipe_options[] = { }; static nni_tran_pipe_ops zt_pipe_ops = { + .p_init = zt_pipe_init, .p_fini = zt_pipe_fini, .p_send = zt_pipe_send, .p_recv = zt_pipe_recv, |
