aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-27 14:21:20 -0800
committerGarrett D'Amore <garrett@damore.org>2017-12-26 15:31:53 -0800
commit93db6fe3aaff421d61a15993ba6827b742ab00d1 (patch)
treed4d6372cb5d606ba9bcdb60b88b6271086940895 /src/transport
parentc9bf5a76b0d6aead6ae91af71ada51a17881ac0a (diff)
downloadnng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.gz
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.bz2
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.zip
fixes #2 Websocket transport
This is a rather large changeset -- it fundamentally adds websocket transport, but as part of this changeset we added a generic framework for both HTTP and websocket. We also made some supporting changes to the core, such as changing the way timeouts work for AIOs and adding additional state keeping for AIOs, and adding a common framework for deferred finalization (to avoid certain kinds of circular deadlocks during resource cleanup). We also invented a new initialization framework so that we can avoid wiring in knowledge about them into the master initialization framework. The HTTP framework is not yet complete, but it is good enough for simple static serving and building additional services on top of -- including websocket. We expect both websocket and HTTP support to evolve considerably, and so these are not part of the public API yet. Property support for the websocket transport (in particular address properties) is still missing, as is support for TLS. The websocket transport here is a bit more robust than the original nanomsg implementation, as it supports multiple sockets listening at the same port sharing the same HTTP server instance, discriminating between them based on URI (and possibly the virtual host). Websocket is enabled by default at present, and work to conditionalize HTTP and websocket further (to minimize bloat) is still pending.
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/tls/tls.c70
-rw-r--r--src/transport/ws/CMakeLists.txt19
-rw-r--r--src/transport/ws/README.adoc38
-rw-r--r--src/transport/ws/websocket.c533
-rw-r--r--src/transport/ws/websocket.h62
-rw-r--r--src/transport/zerotier/zerotier.c15
6 files changed, 680 insertions, 57 deletions
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 8dcb3f60..9c794e64 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -10,7 +10,6 @@
#include <stdbool.h>
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -498,46 +497,6 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
return (rv);
}
-static int
-nni_tls_parse_pair(char *pair, char **hostp, char **servp)
-{
- char *host, *serv, *end;
-
- if (pair[0] == '[') {
- host = pair + 1;
- // IP address enclosed ... for IPv6 usually.
- if ((end = strchr(host, ']')) == NULL) {
- return (NNG_EADDRINVAL);
- }
- *end = '\0';
- serv = end + 1;
- if (*serv == ':') {
- serv++;
- } else if (*serv != '\0') {
- return (NNG_EADDRINVAL);
- }
- } else {
- host = pair;
- serv = strchr(host, ':');
- if (serv != NULL) {
- *serv = '\0';
- serv++;
- }
- }
- if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
- *hostp = NULL;
- } else {
- *hostp = host;
- }
- if ((serv == NULL) || (strlen(serv) == 0)) {
- *servp = NULL;
- } else {
- *servp = serv;
- }
- // Stash the port in big endian (network) byte order.
- return (0);
-}
-
// Note that the url *must* be in a modifiable buffer.
int
nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
@@ -555,8 +514,9 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
// is the second part.
*h1 = '\0';
h1++;
- if (((rv = nni_tls_parse_pair(h1, rhost, rserv)) != 0) ||
- ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0)) {
+ if (((rv = nni_tran_parse_host_port(h1, rhost, rserv)) != 0) ||
+ ((rv = nni_tran_parse_host_port(url, lhost, lserv)) !=
+ 0)) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -566,7 +526,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
} else if (mode == NNI_EP_MODE_DIAL) {
*lhost = NULL;
*lserv = NULL;
- if ((rv = nni_tls_parse_pair(url, rhost, rserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, rhost, rserv)) != 0) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -577,7 +537,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
NNI_ASSERT(mode == NNI_EP_MODE_LISTEN);
*rhost = NULL;
*rserv = NULL;
- if ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, lhost, lserv)) != 0) {
return (rv);
}
// We have to have a port to listen on!
@@ -657,9 +617,13 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
return (NNG_EADDRINVAL);
}
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// Parse the URLs first.
rv = nni_tls_parse_url(buf, &lhost, &lserv, &rhost, &rserv, mode);
if (rv != 0) {
+ nni_aio_fini(aio);
return (rv);
}
if (mode == NNI_EP_MODE_DIAL) {
@@ -672,10 +636,6 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
authmode = NNI_TLS_CONFIG_AUTH_MODE_NONE;
}
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
@@ -683,7 +643,11 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &rsa;
nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(rserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
nni_aio_fini(aio);
return (rv);
}
@@ -695,7 +659,10 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &lsa;
nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
nni_aio_fini(aio);
return (rv);
}
@@ -705,12 +672,14 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ nni_strfree(rhost);
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
if (nni_strlcpy(ep->addr, url, sizeof(ep->addr)) >= sizeof(ep->addr)) {
NNI_FREE_STRUCT(ep);
+ nni_strfree(rhost);
return (NNG_EADDRINVAL);
}
@@ -718,15 +687,18 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) ||
((rv = nni_tls_config_auth_mode(ep->cfg, authmode)) != 0) ||
((rv = nni_aio_init(&ep->aio, nni_tls_ep_cb, ep)) != 0)) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
if ((tlsmode == NNI_TLS_CONFIG_CLIENT) && (rhost != NULL)) {
if ((rv = nni_tls_config_server_name(ep->cfg, rhost)) != 0) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
}
+ nni_strfree(rhost);
ep->proto = nni_sock_proto(sock);
ep->authmode = authmode;
diff --git a/src/transport/ws/CMakeLists.txt b/src/transport/ws/CMakeLists.txt
new file mode 100644
index 00000000..18842df9
--- /dev/null
+++ b/src/transport/ws/CMakeLists.txt
@@ -0,0 +1,19 @@
+#
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# WebSocket transport
+
+if (NNG_TRANSPORT_WS)
+ set(WS_SOURCES transport/ws/websocket.c transport/ws/websocket.h)
+ install(FILES websocket.h DESTINATION include/nng/transport/ws)
+
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${WS_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/ws/README.adoc b/src/transport/ws/README.adoc
new file mode 100644
index 00000000..e3101297
--- /dev/null
+++ b/src/transport/ws/README.adoc
@@ -0,0 +1,38 @@
+= websocket transport
+
+This transport provides support for SP over websocket using TCP or TLS.
+When using TCP, it is compatible with the libnanomsg legacy transport.
+It also is compatible with mangos (both TCP and TLS).
+
+TLS support requires the mbedTLS library.
+
+We set the "protocol" such as "pair.sp.nanomsg.org" in the
+Sec-WebSocket-Protocol field -- the client sets to the the server's
+protocol - i.e. the protocol that the server speaks. For example,
+if the the server is a REP, then a REQ client would send "rep.sp.nanomsg.org".
+
+The server sends the same value (it's own), per the WebSocket specs. (Note
+that the client's protocol is never sent, but assumed to be complementary
+to the protocol in the Sec-WebSocket-Protocol field.)
+
+Each SP message is a WebSocket message.
+
+WebSocket is defined in RFC 6455.
+
+== Design
+
+We unfortunately need to implement our own design for this -- the only
+reasonable client library would be libcurl, and there is a dearth of
+suitable server libraries. Since we don't have to support full HTTP, but
+just the initial handshake, this isn't too tragic.
+
+== Multiple Server Sockets
+
+In order to support Multiple Server sockets listening on the same port,
+the application must be long lived. We will set up a listener on the
+configured TCP (or TLS) port, and examine the PATH supplied in the GET.
+This will be used to match against the URL requested, and if the URL
+matches we will create the appropriate pipe.
+
+If no server endpoint at that address can be found, we return an
+HTTP error, and close the socket.
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
new file mode 100644
index 00000000..8a73bcfb
--- /dev/null
+++ b/src/transport/ws/websocket.c
@@ -0,0 +1,533 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/websocket/websocket.h"
+
+typedef struct ws_ep ws_ep;
+typedef struct ws_pipe ws_pipe;
+
+struct ws_ep {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ char * addr;
+ uint16_t lproto; // local protocol
+ uint16_t rproto; // remote protocol
+ size_t rcvmax;
+ char * protoname;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_aio * connaio;
+ nni_aio * accaio;
+ nni_ws_listener *listener;
+ nni_ws_dialer * dialer;
+};
+
+struct ws_pipe {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_mtx mtx;
+ size_t rcvmax; // inherited from EP
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_ws * ws;
+};
+
+static void
+ws_pipe_send_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *taio;
+ nni_aio *uaio;
+
+ nni_mtx_lock(&p->mtx);
+ taio = p->txaio;
+ uaio = p->user_txaio;
+ p->user_txaio = NULL;
+
+ if (uaio != NULL) {
+ int rv;
+ if ((rv = nni_aio_result(taio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish(uaio, 0, 0);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *raio = p->rxaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&p->mtx);
+ uaio = p->user_rxaio;
+ p->user_rxaio = NULL;
+ if ((rv = nni_aio_result(raio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_msg *msg = nni_aio_get_msg(raio);
+ if (uaio != NULL) {
+ nni_aio_finish_msg(uaio, msg);
+ } else {
+ nni_msg_free(msg);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_rxaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_cancel(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_rxaio = aio;
+
+ nni_ws_recv_msg(p->ws, p->rxaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_txaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // This aborts the upper send, which will call back with an error
+ // when it is done.
+ nni_aio_cancel(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_txaio = aio;
+ nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+
+ nni_ws_send_msg(p->ws, p->txaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_fini(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+
+ if (p->ws) {
+ nni_ws_fini(p->ws);
+ }
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+static void
+ws_pipe_close(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ nni_ws_close(p->ws);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static int
+ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
+{
+ ws_pipe *p;
+ int rv;
+ nni_aio *aio;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ if (((rv = nni_aio_init(&p->txaio, ws_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, ws_pipe_recv_cb, p)) != 0)) {
+ ws_pipe_fini(p);
+ return (rv);
+ }
+
+ p->mode = ep->mode;
+ p->rcvmax = ep->rcvmax;
+ // p->addr = ep->addr;
+ p->rproto = ep->rproto;
+ p->lproto = ep->lproto;
+ p->ws = ws;
+
+ if ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_pipe(aio, p);
+ }
+
+ *pipep = p;
+ return (0);
+}
+
+static uint16_t
+ws_pipe_peer(void *arg)
+{
+ ws_pipe *p = arg;
+
+ return (p->rproto);
+}
+
+static void
+ws_pipe_start(void *arg, nni_aio *aio)
+{
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+// We have very different approaches for server and client.
+// Servers use the HTTP server framework, and a request methodology.
+
+static int
+ws_ep_bind(void *arg)
+{
+ ws_ep *ep = arg;
+ return (nni_ws_listener_listen(ep->listener));
+}
+
+static void
+ws_ep_cancel(nni_aio *aio, int rv)
+{
+ ws_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_accept(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+
+ // We already bound, so we just need to look for an available
+ // pipe (created by the handler), and match it.
+ // Otherwise we stick the AIO in the accept list.
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_list_append(&ep->aios, aio);
+ if (aio == nni_list_first(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, ep->accaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_connect(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+
+ // If we can't start, then its dying and we can't report
+ // either.
+ if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&ep->aios, aio);
+ nni_ws_dialer_dial(ep->dialer, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz)
+{
+ ws_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_size(v, sz, 0, NNI_MAXSZ));
+ }
+ return (nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ));
+}
+
+static int
+ws_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp)
+{
+ ws_ep *ep = arg;
+ return (nni_getopt_size(ep->rcvmax, v, szp));
+}
+
+static nni_tran_pipe_option ws_pipe_options[] = {
+#if 0
+ // clang-format off
+ { NNG_OPT_LOCADDR, ws_pipe_getopt_locaddr },
+ { NNG_OPT_REMADDR, ws_pipe_getopt_remaddr },
+ // clang-format on
+#endif
+ // terminate list
+ { NULL, NULL }
+};
+
+static nni_tran_pipe ws_pipe_ops = {
+ .p_fini = ws_pipe_fini,
+ .p_start = ws_pipe_start,
+ .p_send = ws_pipe_send,
+ .p_recv = ws_pipe_recv,
+ .p_close = ws_pipe_close,
+ .p_peer = ws_pipe_peer,
+ .p_options = ws_pipe_options,
+};
+
+static nni_tran_ep_option ws_ep_options[] = {
+ {
+ .eo_name = NNG_OPT_RECVMAXSZ,
+ .eo_getopt = ws_ep_getopt_recvmaxsz,
+ .eo_setopt = ws_ep_setopt_recvmaxsz,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
+static void
+ws_ep_fini(void *arg)
+{
+ ws_ep *ep = arg;
+
+ nni_aio_stop(ep->accaio);
+ nni_aio_stop(ep->connaio);
+ nni_aio_fini(ep->accaio);
+ nni_aio_fini(ep->connaio);
+ if (ep->listener != NULL) {
+ nni_ws_listener_fini(ep->listener);
+ }
+ if (ep->dialer != NULL) {
+ nni_ws_dialer_fini(ep->dialer);
+ }
+ nni_strfree(ep->addr);
+ nni_strfree(ep->protoname);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+static void
+ws_ep_conn_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ ws_pipe *p;
+ nni_aio *caio = ep->connaio;
+ nni_aio *uaio;
+ int rv;
+ nni_ws * ws = NULL;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_result(caio) == 0) {
+ ws = nni_aio_get_pipe(caio);
+ }
+ if ((uaio = nni_list_first(&ep->aios)) == NULL) {
+ // The client stopped caring about this!
+ if (ws != NULL) {
+ nni_ws_fini(ws);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(uaio);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+ if ((rv = nni_aio_result(caio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_fini(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_close(void *arg)
+{
+ ws_ep *ep = arg;
+
+ if (ep->mode == NNI_EP_MODE_LISTEN) {
+ nni_ws_listener_close(ep->listener);
+ } else {
+ nni_ws_dialer_close(ep->dialer);
+ }
+}
+
+static void
+ws_ep_acc_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ nni_aio *aaio = ep->accaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ uaio = nni_list_first(&ep->aios);
+ if ((rv = nni_aio_result(aaio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_ws *ws = nni_aio_get_pipe(aaio);
+ if (uaio != NULL) {
+ ws_pipe *p;
+ // Make a pipe
+ nni_aio_list_remove(uaio);
+ if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_close(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ }
+ }
+ if (!nni_list_empty(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, aaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ ws_ep * ep;
+ const char *pname;
+ int rv;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&ep->mtx);
+
+ // List of pipes (server only).
+ nni_aio_list_init(&ep->aios);
+
+ ep->mode = mode;
+ ep->lproto = nni_sock_proto(sock);
+ ep->rproto = nni_sock_peer(sock);
+
+ if (mode == NNI_EP_MODE_DIAL) {
+ pname = nni_sock_peer_name(sock);
+ rv = nni_ws_dialer_init(&ep->dialer, url);
+ } else {
+ pname = nni_sock_proto_name(sock);
+ rv = nni_ws_listener_init(&ep->listener, url);
+ }
+
+ if ((rv == 0) && ((ep->addr = nni_strdup(url)) == NULL)) {
+ rv = NNG_ENOMEM;
+ }
+ if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) !=
+ 0)) {
+ ws_ep_fini(ep);
+ return (rv);
+ }
+
+ *epp = ep;
+ return (0);
+}
+static int
+ws_tran_init(void)
+{
+ return (0);
+}
+
+static void
+ws_tran_fini(void)
+{
+}
+
+static nni_tran_ep ws_ep_ops = {
+ .ep_init = ws_ep_init,
+ .ep_fini = ws_ep_fini,
+ .ep_connect = ws_ep_connect,
+ .ep_bind = ws_ep_bind,
+ .ep_accept = ws_ep_accept,
+ .ep_close = ws_ep_close,
+ .ep_options = ws_ep_options,
+};
+
+static nni_tran ws_tran = {
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ws",
+ .tran_ep = &ws_ep_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
+};
+
+int
+nng_ws_register(void)
+{
+ return (nni_tran_register(&ws_tran));
+}
diff --git a/src/transport/ws/websocket.h b/src/transport/ws/websocket.h
new file mode 100644
index 00000000..1beb6156
--- /dev/null
+++ b/src/transport/ws/websocket.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_WS_WEBSOCKET_H
+#define NNG_TRANSPORT_WS_WEBSOCKET_H
+
+// TLS transport. This is used for communication via TLS v1.2 over TCP/IP.
+
+NNG_DECL int nng_ws_register(void);
+
+// TLS options. Note that these can only be set *before* the endpoint is
+// started. Once started, it is no longer possible to alter the TLS
+// configuration.
+
+// NNG_OPT_TLS_CA_CERT is a string with one or more X.509 certificates,
+// representing the entire CA chain. The content may be either PEM or DER
+// encoded.
+#define NNG_OPT_TLS_CA_CERT "tls:ca-cert"
+
+// NNG_OPT_TLS_CRL is a PEM encoded CRL (revocation list). Multiple lists
+// may be loaded by using this option multiple times.
+#define NNG_OPT_TLS_CRL "tls:crl"
+
+// NNG_OPT_TLS_CERT is used to specify our own certificate. At present
+// only one certificate may be supplied. (In the future it may be
+// possible to call this multiple times, for servers that select different
+// certificates depending upon client capabilities.)
+#define NNG_OPT_TLS_CERT "tls:cert"
+
+// NNG_OPT_TLS_PRIVATE_KEY is used to specify the private key used
+// with the given certificate. This should be called after setting
+// the certificate. The private key may be in PEM or DER format.
+// If in PEM encoded, a terminating ZERO byte should be included.
+#define NNG_OPT_TLS_PRIVATE_KEY "tls:private-key"
+
+// NNG_OPT_TLS_PRIVATE_KEY_PASSWORD is used to specify a password
+// used for the private key. The value is an ASCIIZ string.
+#define NNG_OPT_TLS_PRIVATE_KEY_PASSWORD "tls:private-key-password"
+
+// NNG_OPT_TLS_AUTH_MODE is an integer indicating whether our
+// peer should be verified or not. It is required on clients/dialers,
+// and off on servers/listeners, by default.
+#define NNG_OPT_TLS_AUTH_MODE "tls:auth-mode"
+
+extern int nng_tls_auth_mode_required;
+extern int nng_tls_auth_mode_none;
+extern int nng_tls_auth_mode_optional;
+
+// NNG_OPT_TLS_AUTH_VERIFIED is a boolean that can be read on pipes,
+// indicating whether the peer certificate is verified.
+#define NNG_OPT_TLS_AUTH_VERIFIED "tls:auth-verified"
+
+// XXX: TBD: Ciphersuite selection and reporting. Session reuse?
+
+#endif // NNG_TRANSPORT_WS_WEBSOCKET_H
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 607c353c..16a29da1 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1292,7 +1292,7 @@ zt_wire_packet_send_cb(void *arg)
nni_aio * aio = arg;
zt_send_hdr *hdr;
- hdr = nni_aio_get_data(aio);
+ hdr = nni_aio_get_data(aio, 0);
nni_free(hdr, hdr->len + sizeof(*hdr));
nni_aio_fini_cb(aio);
}
@@ -1355,7 +1355,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
buf += sizeof(*hdr);
memcpy(buf, data, len);
- nni_aio_set_data(aio, hdr);
+ nni_aio_set_data(aio, hdr, 0);
hdr->sa = addr;
hdr->len = len;
@@ -2001,7 +2001,7 @@ zt_pipe_ping_cb(void *arg)
}
if (p->zp_ping_try < p->zp_ping_count) {
nni_time now = nni_clock();
- nni_aio_set_timeout(aio, now + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
// We want pings. We only send one if needed, but we
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
@@ -2034,7 +2034,7 @@ zt_pipe_start(void *arg, nni_aio *aio)
if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) &&
(p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
- nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
0) {
p->zp_ping_active = 1;
@@ -2456,7 +2456,7 @@ zt_ep_conn_req_cb(void *arg)
}
if (nni_list_first(&ep->ze_aios) != NULL) {
- nni_aio_set_timeout(aio, nni_clock() + zt_conn_interval);
+ nni_aio_set_timeout(aio, zt_conn_interval);
if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
ep->ze_creq_active = 1;
ep->ze_creq_try++;
@@ -2479,8 +2479,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&zt_lk);
if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_time now = nni_clock();
- int rv;
+ int rv;
// Clear the port so we get an ephemeral port.
ep->ze_laddr &= ~((uint64_t) zt_port_mask);
@@ -2498,7 +2497,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
ep->ze_running = 1;
- nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval);
+ nni_aio_set_timeout(ep->ze_creq_aio, zt_conn_interval);
if (nni_aio_start(
ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {