aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental')
-rw-r--r--src/supplemental/http/http_api.h18
-rw-r--r--src/supplemental/http/http_client.c181
-rw-r--r--src/supplemental/http/http_conn.c125
-rw-r--r--src/supplemental/http/http_server.c192
-rw-r--r--src/supplemental/ipc/CMakeLists.txt16
-rw-r--r--src/supplemental/ipc/ipc.c138
-rw-r--r--src/supplemental/tcp/CMakeLists.txt6
-rw-r--r--src/supplemental/tcp/tcp.c444
-rw-r--r--src/supplemental/tls/mbedtls/tls.c363
-rw-r--r--src/supplemental/tls/none/tls.c92
-rw-r--r--src/supplemental/tls/tls_api.h52
-rw-r--r--src/supplemental/tls/tls_common.c754
-rw-r--r--src/supplemental/websocket/CMakeLists.txt2
-rw-r--r--src/supplemental/websocket/stub.c40
-rw-r--r--src/supplemental/websocket/websocket.c2066
-rw-r--r--src/supplemental/websocket/websocket.h53
16 files changed, 2621 insertions, 1921 deletions
diff --git a/src/supplemental/http/http_api.h b/src/supplemental/http/http_api.h
index 569e8532..45738318 100644
--- a/src/supplemental/http/http_api.h
+++ b/src/supplemental/http/http_api.h
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -96,9 +96,7 @@ extern void *nni_http_conn_get_ctx(nni_http_conn *);
// These initialization functions create stream for HTTP transactions.
// They should only be used by the server or client HTTP implementations,
// and are not for use by other code.
-extern int nni_http_conn_init_tcp(nni_http_conn **, nni_tcp_conn *);
-extern int nni_http_conn_init_tls(
- nni_http_conn **, struct nng_tls_config *, nni_tcp_conn *);
+extern int nni_http_conn_init(nni_http_conn **, nng_stream *);
extern void nni_http_conn_close(nni_http_conn *);
extern void nni_http_conn_fini(nni_http_conn *);
@@ -207,6 +205,11 @@ extern int nni_http_server_set_tls(nni_http_server *, struct nng_tls_config *);
extern int nni_http_server_get_tls(
nni_http_server *, struct nng_tls_config **);
+extern int nni_http_server_setx(
+ nni_http_server *, const char *, const void *, size_t, nni_type);
+extern int nni_http_server_getx(
+ nni_http_server *, const char *, void *, size_t *, nni_type);
+
// nni_http_server_start starts listening on the supplied port.
extern int nni_http_server_start(nni_http_server *);
@@ -350,6 +353,11 @@ extern int nni_http_client_set_tls(nni_http_client *, struct nng_tls_config *);
extern int nni_http_client_get_tls(
nni_http_client *, struct nng_tls_config **);
+extern int nni_http_client_setx(
+ nni_http_client *, const char *, const void *, size_t, nni_type);
+extern int nni_http_client_getx(
+ nni_http_client *, const char *, void *, size_t *, nni_type);
+
extern void nni_http_client_connect(nni_http_client *, nni_aio *);
// nni_http_transact_conn is used to perform a round-trip exchange (i.e. a
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index 798cbe14..c35dcaa8 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -1,6 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -14,25 +15,20 @@
#include <string.h>
#include "core/nng_impl.h"
-#include "nng/supplemental/tls/tls.h"
#include "supplemental/tls/tls_api.h"
+#include <nng/supplemental/tls/tls.h>
+
#include "http_api.h"
static nni_mtx http_txn_lk;
struct nng_http_client {
- nni_list aios;
- nni_mtx mtx;
- bool closed;
- bool resolving;
- nng_tls_config *tls;
- nni_aio * aio;
- nng_sockaddr sa;
- nni_tcp_dialer *dialer;
- char * host;
- char * port;
- nni_url * url;
+ nni_list aios;
+ nni_mtx mtx;
+ bool closed;
+ nni_aio * aio;
+ nng_stream_dialer *dialer;
};
static void
@@ -43,9 +39,7 @@ http_dial_start(nni_http_client *c)
if ((aio = nni_list_first(&c->aios)) == NULL) {
return;
}
- c->resolving = true;
- nni_aio_set_input(c->aio, 0, &c->sa);
- nni_tcp_resolv(c->host, c->port, NNG_AF_UNSPEC, 0, c->aio);
+ nng_stream_dialer_dial(c->dialer, c->aio);
}
static void
@@ -54,7 +48,7 @@ http_dial_cb(void *arg)
nni_http_client *c = arg;
nni_aio * aio;
int rv;
- nni_tcp_conn * tcp;
+ nng_stream * stream;
nni_http_conn * conn;
nni_mtx_lock(&c->mtx);
@@ -63,9 +57,9 @@ http_dial_cb(void *arg)
if ((aio = nni_list_first(&c->aios)) == NULL) {
// User abandoned request, and no residuals left.
nni_mtx_unlock(&c->mtx);
- if ((rv == 0) && !c->resolving) {
- tcp = nni_aio_get_output(c->aio, 0);
- nni_tcp_conn_fini(tcp);
+ if (rv == 0) {
+ stream = nni_aio_get_output(c->aio, 0);
+ nng_stream_free(stream);
}
return;
}
@@ -78,28 +72,16 @@ http_dial_cb(void *arg)
return;
}
- if (c->resolving) {
- // This was a DNS lookup -- advance to normal TCP connect.
- c->resolving = false;
- nni_tcp_dialer_dial(c->dialer, &c->sa, c->aio);
- nni_mtx_unlock(&c->mtx);
- return;
- }
-
nni_aio_list_remove(aio);
- tcp = nni_aio_get_output(c->aio, 0);
- NNI_ASSERT(tcp != NULL);
+ stream = nni_aio_get_output(c->aio, 0);
+ NNI_ASSERT(stream != NULL);
- if (c->tls != NULL) {
- rv = nni_http_conn_init_tls(&conn, c->tls, tcp);
- } else {
- rv = nni_http_conn_init_tcp(&conn, tcp);
- }
+ rv = nni_http_conn_init(&conn, stream);
http_dial_start(c);
nni_mtx_unlock(&c->mtx);
if (rv != 0) {
- // the conn_init function will have already discard tcp.
+ // the conn_init function will have already discard stream.
nni_aio_finish_error(aio, rv);
return;
}
@@ -112,16 +94,8 @@ void
nni_http_client_fini(nni_http_client *c)
{
nni_aio_fini(c->aio);
- nni_tcp_dialer_fini(c->dialer);
+ nng_stream_dialer_free(c->dialer);
nni_mtx_fini(&c->mtx);
-#ifdef NNG_SUPP_TLS
- if (c->tls != NULL) {
- nni_tls_config_fini(c->tls);
- }
-#endif
- nni_strfree(c->host);
- nni_strfree(c->port);
-
NNI_FREE_STRUCT(c);
}
@@ -130,59 +104,37 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
{
int rv;
nni_http_client *c;
+ nng_url myurl;
+
+ // Rewrite URLs to either TLS or TCP.
+ memcpy(&myurl, url, sizeof(myurl));
+ if ((strcmp(url->u_scheme, "http") == 0) ||
+ (strcmp(url->u_scheme, "ws") == 0)) {
+ myurl.u_scheme = "tcp";
+ } else if ((strcmp(url->u_scheme, "https") == 0) ||
+ (strcmp(url->u_scheme, "wss") == 0)) {
+ myurl.u_scheme = "tls+tcp";
+ } else {
+ return (NNG_EADDRINVAL);
+ }
if (strlen(url->u_hostname) == 0) {
// We require a valid hostname.
return (NNG_EADDRINVAL);
}
- if ((strcmp(url->u_scheme, "http") != 0) &&
-#ifdef NNG_SUPP_TLS
- (strcmp(url->u_scheme, "https") != 0) &&
- (strcmp(url->u_scheme, "wss") != 0) &&
-#endif
- (strcmp(url->u_scheme, "ws") != 0)) {
- return (NNG_EADDRINVAL);
- }
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->aios);
- if (((c->host = nni_strdup(url->u_hostname)) == NULL) ||
- ((strlen(url->u_port) != 0) &&
- ((c->port = nni_strdup(url->u_port)) == NULL))) {
- nni_http_client_fini(c);
- return (NNG_ENOMEM);
- }
-
-#ifdef NNG_SUPP_TLS
- if ((strcmp(url->u_scheme, "https") == 0) ||
- (strcmp(url->u_scheme, "wss") == 0)) {
- rv = nni_tls_config_init(&c->tls, NNG_TLS_MODE_CLIENT);
- if (rv != 0) {
- nni_http_client_fini(c);
- return (rv);
- }
- // Take the server name right from the client URL. We only
- // consider the name, as the port is never part of the
- // certificate.
- rv = nng_tls_config_server_name(c->tls, url->u_hostname);
- if (rv != 0) {
- nni_http_client_fini(c);
- return (rv);
- }
- // Note that the application has to supply the location of
- // certificates. We could probably use a default based
- // on environment or common locations used by OpenSSL, but
- // as there is no way to *unload* the cert file, lets not
- // do that. (We might want to consider a mode to reset.)
+ if ((rv = nng_stream_dialer_alloc_url(&c->dialer, &myurl)) != 0) {
+ nni_http_client_fini(c);
+ return (rv);
}
-#endif
- if (((rv = nni_tcp_dialer_init(&c->dialer)) != 0) ||
- ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0)) {
+ if ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0) {
nni_http_client_fini(c);
return (rv);
}
@@ -192,46 +144,37 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
}
int
-nni_http_client_set_tls(nni_http_client *c, struct nng_tls_config *tls)
+nni_http_client_set_tls(nni_http_client *c, nng_tls_config *tls)
{
-#ifdef NNG_SUPP_TLS
- struct nng_tls_config *old;
- nni_mtx_lock(&c->mtx);
- old = c->tls;
- c->tls = tls;
- if (tls != NULL) {
- nni_tls_config_hold(tls);
- }
- nni_mtx_unlock(&c->mtx);
- if (old != NULL) {
- nni_tls_config_fini(old);
- }
- return (0);
-#else
- NNI_ARG_UNUSED(c);
- NNI_ARG_UNUSED(tls);
- return (NNG_EINVAL);
-#endif
+ int rv;
+ rv = nni_stream_dialer_setx(c->dialer, NNG_OPT_TLS_CONFIG, &tls,
+ sizeof(tls), NNI_TYPE_POINTER);
+ return (rv);
}
int
-nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp)
+nni_http_client_get_tls(nni_http_client *c, nng_tls_config **tlsp)
{
-#ifdef NNG_SUPP_TLS
- nni_mtx_lock(&c->mtx);
- if (c->tls == NULL) {
- nni_mtx_unlock(&c->mtx);
- return (NNG_EINVAL);
- }
- nni_tls_config_hold(c->tls);
- *tlsp = c->tls;
- nni_mtx_unlock(&c->mtx);
- return (0);
-#else
- NNI_ARG_UNUSED(c);
- NNI_ARG_UNUSED(tlsp);
- return (NNG_ENOTSUP);
-#endif
+ size_t sz = sizeof(*tlsp);
+ int rv;
+ rv = nni_stream_dialer_getx(
+ c->dialer, NNG_OPT_TLS_CONFIG, tlsp, &sz, NNI_TYPE_POINTER);
+ return (rv);
+}
+
+int
+nni_http_client_setx(nni_http_client *c, const char *name, const void *buf,
+ size_t sz, nni_type t)
+{
+ // We have no local options, but we just pass them straight through.
+ return (nni_stream_dialer_setx(c->dialer, name, buf, sz, t));
+}
+
+int
+nni_http_client_getx(
+ nni_http_client *c, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ return (nni_stream_dialer_getx(c->dialer, name, buf, szp, t));
}
static void
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 7c6159cd..1fc2c34e 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -40,25 +40,6 @@ enum write_flavor {
HTTP_WR_RES,
};
-typedef void (*http_read_fn)(void *, nni_aio *);
-typedef void (*http_write_fn)(void *, nni_aio *);
-typedef void (*http_close_fn)(void *);
-typedef void (*http_fini_fn)(void *);
-typedef int (*http_addr_fn)(void *, nni_sockaddr *);
-typedef int (*http_getopt_fn)(
- void *, const char *, void *, size_t *, nni_type);
-typedef int (*http_setopt_fn)(
- void *, const char *, const void *, size_t, nni_type);
-
-typedef struct {
- http_read_fn h_read;
- http_write_fn h_write;
- http_getopt_fn h_getopt;
- http_setopt_fn h_setopt;
- http_close_fn h_close;
- http_fini_fn h_fini;
-} http_tran;
-
#define SET_RD_FLAVOR(aio, f) \
nni_aio_set_prov_extra(aio, 0, ((void *) (intptr_t)(f)))
#define GET_RD_FLAVOR(aio) (int) ((intptr_t) nni_aio_get_prov_extra(aio, 0))
@@ -67,17 +48,11 @@ typedef struct {
#define GET_WR_FLAVOR(aio) (int) ((intptr_t) nni_aio_get_prov_extra(aio, 0))
struct nng_http_conn {
- void * sock;
- http_read_fn rd;
- http_write_fn wr;
- http_setopt_fn setopt;
- http_getopt_fn getopt;
- http_close_fn close;
- http_fini_fn fini;
- void * ctx;
- bool closed;
- nni_list rdq; // high level http read requests
- nni_list wrq; // high level http write requests
+ nng_stream *sock;
+ void * ctx;
+ bool closed;
+ nni_list rdq; // high level http read requests
+ nni_list wrq; // high level http write requests
nni_aio *rd_uaio; // user aio for read
nni_aio *wr_uaio; // user aio for write
@@ -138,7 +113,7 @@ http_close(nni_http_conn *conn)
}
if (conn->sock != NULL) {
- conn->close(conn->sock);
+ nng_stream_close(conn->sock);
}
}
@@ -204,7 +179,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio)
// to get *any* data for a partial RAW read.)
nni_aio_set_data(conn->rd_aio, 1, NULL);
nni_aio_set_iov(conn->rd_aio, niov, iov);
- conn->rd(conn->sock, conn->rd_aio);
+ nng_stream_recv(conn->sock, conn->rd_aio);
return (NNG_EAGAIN);
case HTTP_RD_REQ:
@@ -220,7 +195,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio)
iov1.iov_len = conn->rd_bufsz - conn->rd_put;
nni_aio_set_iov(conn->rd_aio, 1, &iov1);
nni_aio_set_data(conn->rd_aio, 1, aio);
- conn->rd(conn->sock, conn->rd_aio);
+ nng_stream_recv(conn->sock, conn->rd_aio);
}
return (rv);
@@ -237,7 +212,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio)
iov1.iov_len = conn->rd_bufsz - conn->rd_put;
nni_aio_set_iov(conn->rd_aio, 1, &iov1);
nni_aio_set_data(conn->rd_aio, 1, aio);
- conn->rd(conn->sock, conn->rd_aio);
+ nng_stream_recv(conn->sock, conn->rd_aio);
}
return (rv);
@@ -254,7 +229,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio)
iov1.iov_len = conn->rd_bufsz - conn->rd_put;
nni_aio_set_iov(conn->rd_aio, 1, &iov1);
nni_aio_set_data(conn->rd_aio, 1, aio);
- conn->rd(conn->sock, conn->rd_aio);
+ nng_stream_recv(conn->sock, conn->rd_aio);
}
return (rv);
}
@@ -428,7 +403,7 @@ http_wr_start(nni_http_conn *conn)
nni_aio_get_iov(aio, &niov, &iov);
nni_aio_set_iov(conn->wr_aio, niov, iov);
- conn->wr(conn->sock, conn->wr_aio);
+ nng_stream_send(conn->sock, conn->wr_aio);
}
static void
@@ -475,7 +450,7 @@ http_wr_cb(void *arg)
if (nni_aio_iov_count(aio) > 0) {
// We have more to transmit - start another and leave
// (we will get called again when it is done).
- conn->wr(conn->sock, aio);
+ nng_stream_send(conn->sock, aio);
nni_mtx_unlock(&conn->mtx);
return;
}
@@ -680,7 +655,7 @@ nni_http_conn_getopt(
if (conn->closed) {
rv = NNG_ECLOSED;
} else {
- rv = conn->getopt(conn->sock, name, buf, szp, t);
+ rv = nni_stream_getx(conn->sock, name, buf, szp, t);
}
nni_mtx_unlock(&conn->mtx);
return (rv);
@@ -695,7 +670,7 @@ nni_http_conn_setopt(nni_http_conn *conn, const char *name, const void *buf,
if (conn->closed) {
rv = NNG_ECLOSED;
} else {
- rv = conn->setopt(conn->sock, name, buf, sz, t);
+ rv = nni_stream_setx(conn->sock, name, buf, sz, t);
}
nni_mtx_unlock(&conn->mtx);
return (rv);
@@ -709,8 +684,8 @@ nni_http_conn_fini(nni_http_conn *conn)
nni_mtx_lock(&conn->mtx);
http_close(conn);
- if ((conn->sock != NULL) && (conn->fini != NULL)) {
- conn->fini(conn->sock);
+ if (conn->sock != NULL) {
+ nng_stream_free(conn->sock);
conn->sock = NULL;
}
nni_mtx_unlock(&conn->mtx);
@@ -723,7 +698,7 @@ nni_http_conn_fini(nni_http_conn *conn)
}
static int
-http_init(nni_http_conn **connp, http_tran *tran, void *data)
+http_init(nni_http_conn **connp, nng_stream *data)
{
nni_http_conn *conn;
int rv;
@@ -747,73 +722,19 @@ http_init(nni_http_conn **connp, http_tran *tran, void *data)
return (rv);
}
- conn->sock = data;
- conn->rd = tran->h_read;
- conn->wr = tran->h_write;
- conn->close = tran->h_close;
- conn->fini = tran->h_fini;
- conn->getopt = tran->h_getopt;
- conn->setopt = tran->h_setopt;
+ conn->sock = data;
*connp = conn;
return (0);
}
-static http_tran http_tcp_ops = {
- .h_read = (http_read_fn) nni_tcp_conn_recv,
- .h_write = (http_write_fn) nni_tcp_conn_send,
- .h_close = (http_close_fn) nni_tcp_conn_close,
- .h_fini = (http_fini_fn) nni_tcp_conn_fini,
- .h_getopt = (http_getopt_fn) nni_tcp_conn_getopt,
- .h_setopt = (http_setopt_fn) nni_tcp_conn_setopt,
-};
-
int
-nni_http_conn_init_tcp(nni_http_conn **connp, nni_tcp_conn *tcp)
+nni_http_conn_init(nni_http_conn **connp, nng_stream *stream)
{
int rv;
- if ((rv = http_init(connp, &http_tcp_ops, tcp)) != 0) {
- nni_tcp_conn_fini(tcp);
- }
- return (rv);
-}
-
-#ifdef NNG_SUPP_TLS
-static http_tran http_tls_ops = {
- .h_read = (http_read_fn) nni_tls_recv,
- .h_write = (http_write_fn) nni_tls_send,
- .h_close = (http_close_fn) nni_tls_close,
- .h_fini = (http_fini_fn) nni_tls_fini,
- .h_getopt = (http_getopt_fn) nni_tls_getopt,
- .h_setopt = (http_setopt_fn) nni_tls_setopt,
-};
-
-int
-nni_http_conn_init_tls(
- nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp)
-{
- nni_tls *tls;
- int rv;
-
- if ((rv = nni_tls_init(&tls, cfg, tcp)) != 0) {
- nni_tcp_conn_fini(tcp);
- return (rv);
- }
-
- if ((rv = http_init(connp, &http_tls_ops, tls)) != 0) {
- nni_tls_fini(tls);
+ if ((rv = http_init(connp, stream)) != 0) {
+ nng_stream_free(stream);
}
return (rv);
}
-#else
-int
-nni_http_conn_init_tls(
- nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp)
-{
- NNI_ARG_UNUSED(connp);
- NNI_ARG_UNUSED(cfg);
- nni_tcp_conn_fini(tcp);
- return (NNG_ENOTSUP);
-}
-#endif // NNG_SUPP_TLS
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index a6343d87..939273b7 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -1,7 +1,8 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 QXSoftware <lh563566994@126.com>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -68,22 +69,21 @@ typedef struct http_error {
} http_error;
struct nng_http_server {
- nng_sockaddr addr;
- nni_list_node node;
- int refcnt;
- int starts;
- nni_list handlers;
- nni_list conns;
- nni_mtx mtx;
- bool closed;
- nng_tls_config * tls;
- nni_aio * accaio;
- nni_tcp_listener *listener;
- char * port;
- char * hostname;
- nni_list errors;
- nni_mtx errors_mtx;
- nni_reap_item reap;
+ nng_sockaddr addr;
+ nni_list_node node;
+ int refcnt;
+ int starts;
+ nni_list handlers;
+ nni_list conns;
+ nni_mtx mtx;
+ bool closed;
+ nni_aio * accaio;
+ nng_stream_listener *listener;
+ char * port;
+ char * hostname;
+ nni_list errors;
+ nni_mtx errors_mtx;
+ nni_reap_item reap;
};
int
@@ -720,13 +720,13 @@ http_sconn_cbdone(void *arg)
}
static int
-http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp)
+http_sconn_init(http_sconn **scp, nng_stream *stream)
{
http_sconn *sc;
int rv;
if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) {
- nni_tcp_conn_fini(tcp);
+ nng_stream_free(stream);
return (NNG_ENOMEM);
}
@@ -741,11 +741,7 @@ http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp)
return (rv);
}
- if (s->tls != NULL) {
- rv = nni_http_conn_init_tls(&sc->conn, s->tls, tcp);
- } else {
- rv = nni_http_conn_init_tcp(&sc->conn, tcp);
- }
+ rv = nni_http_conn_init(&sc->conn, stream);
if (rv != 0) {
http_sconn_close(sc);
return (rv);
@@ -760,7 +756,7 @@ http_server_acccb(void *arg)
{
nni_http_server *s = arg;
nni_aio * aio = s->accaio;
- nni_tcp_conn * tcp;
+ nng_stream * stream;
http_sconn * sc;
int rv;
@@ -768,22 +764,22 @@ http_server_acccb(void *arg)
if ((rv = nni_aio_result(aio)) != 0) {
if (!s->closed) {
// try again?
- nni_tcp_listener_accept(s->listener, s->accaio);
+ nng_stream_listener_accept(s->listener, s->accaio);
}
nni_mtx_unlock(&s->mtx);
return;
}
- tcp = nni_aio_get_output(aio, 0);
+ stream = nni_aio_get_output(aio, 0);
if (s->closed) {
// If we're closing, then reject this one.
- nni_tcp_conn_fini(tcp);
+ nng_stream_free(stream);
nni_mtx_unlock(&s->mtx);
return;
}
- if (http_sconn_init(&sc, s, tcp) != 0) {
- // The TCP structure is already cleaned up.
+ if (http_sconn_init(&sc, stream) != 0) {
+ // The stream structure is already cleaned up.
// Start another accept attempt.
- nni_tcp_listener_accept(s->listener, s->accaio);
+ nng_stream_listener_accept(s->listener, s->accaio);
nni_mtx_unlock(&s->mtx);
return;
}
@@ -792,7 +788,7 @@ http_server_acccb(void *arg)
sc->handler = NULL;
nni_http_read_req(sc->conn, sc->req, sc->rxaio);
- nni_tcp_listener_accept(s->listener, s->accaio);
+ nng_stream_listener_accept(s->listener, s->accaio);
nni_mtx_unlock(&s->mtx);
}
@@ -812,20 +808,13 @@ http_server_fini(nni_http_server *s)
nni_mtx_unlock(&s->mtx);
return;
}
- if (s->listener != NULL) {
- nni_tcp_listener_fini(s->listener);
- }
+ nng_stream_listener_free(s->listener);
while ((h = nni_list_first(&s->handlers)) != NULL) {
nni_list_remove(&s->handlers, h);
h->refcnt--;
nni_http_handler_fini(h);
}
nni_mtx_unlock(&s->mtx);
-#ifdef NNG_SUPP_TLS
- if (s->tls != NULL) {
- nni_tls_config_fini(s->tls);
- }
-#endif
nni_mtx_lock(&s->errors_mtx);
while ((epage = nni_list_first(&s->errors)) != NULL) {
nni_list_remove(&s->errors, epage);
@@ -847,16 +836,20 @@ http_server_init(nni_http_server **serverp, const nni_url *url)
{
nni_http_server *s;
int rv;
- nni_aio * aio;
-
- if ((strcmp(url->u_scheme, "http") != 0) &&
-#ifdef NNG_SUPP_TLS
- (strcmp(url->u_scheme, "https") != 0) &&
- (strcmp(url->u_scheme, "wss") != 0) &&
-#endif
- (strcmp(url->u_scheme, "ws") != 0)) {
+ nng_url myurl;
+
+ // Rewrite URLs to either TLS or TCP.
+ memcpy(&myurl, url, sizeof(myurl));
+ if ((strcmp(url->u_scheme, "http") == 0) ||
+ (strcmp(url->u_scheme, "ws") == 0)) {
+ myurl.u_scheme = "tcp";
+ } else if ((strcmp(url->u_scheme, "https") == 0) ||
+ (strcmp(url->u_scheme, "wss") == 0)) {
+ myurl.u_scheme = "tls+tcp";
+ } else {
return (NNG_EADDRINVAL);
}
+
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
@@ -884,34 +877,11 @@ http_server_init(nni_http_server **serverp, const nni_url *url)
return (NNG_ENOMEM);
}
-#ifdef NNG_SUPP_TLS
- if ((strcmp(url->u_scheme, "https") == 0) ||
- (strcmp(url->u_scheme, "wss") == 0)) {
- rv = nni_tls_config_init(&s->tls, NNG_TLS_MODE_SERVER);
- if (rv != 0) {
- http_server_fini(s);
- return (rv);
- }
- }
-#endif
-
- // Do the DNS lookup *now*. This means that this is
- // synchronous, but it should be fast, since it should either
- // resolve as a number, or resolve locally, without having to
- // hit up DNS.
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- http_server_fini(s);
- return (rv);
- }
- nni_aio_set_input(aio, 0, &s->addr);
- nni_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- if (rv != 0) {
+ if ((rv = nng_stream_listener_alloc_url(&s->listener, &myurl)) != 0) {
http_server_fini(s);
return (rv);
}
+
s->refcnt = 1;
*serverp = s;
return (0);
@@ -950,15 +920,10 @@ static int
http_server_start(nni_http_server *s)
{
int rv;
- if ((rv = nni_tcp_listener_init(&s->listener)) != 0) {
- return (rv);
- }
- if ((rv = nni_tcp_listener_listen(s->listener, &s->addr)) != 0) {
- nni_tcp_listener_fini(s->listener);
- s->listener = NULL;
+ if ((rv = nng_stream_listener_listen(s->listener)) != 0) {
return (rv);
}
- nni_tcp_listener_accept(s->listener, s->accaio);
+ nng_stream_listener_accept(s->listener, s->accaio);
return (0);
}
@@ -992,7 +957,7 @@ http_server_stop(nni_http_server *s)
// Close the TCP endpoint that is listening.
if (s->listener) {
- nni_tcp_listener_close(s->listener);
+ nng_stream_listener_close(s->listener);
}
// Stopping the server is a hard stop -- it aborts any work
@@ -1764,50 +1729,37 @@ nni_http_handler_init_static(nni_http_handler **hpp, const char *uri,
}
int
-nni_http_server_set_tls(nni_http_server *s, nng_tls_config *tcfg)
+nni_http_server_set_tls(nni_http_server *s, nng_tls_config *tls)
{
-#ifdef NNG_SUPP_TLS
- nng_tls_config *old;
- nni_mtx_lock(&s->mtx);
- if (s->starts) {
- nni_mtx_unlock(&s->mtx);
- return (NNG_EBUSY);
- }
- old = s->tls;
- s->tls = tcfg;
- if (tcfg) {
- nni_tls_config_hold(tcfg);
- }
- nni_mtx_unlock(&s->mtx);
- if (old) {
- nni_tls_config_fini(old);
- }
- return (0);
-#else
- NNI_ARG_UNUSED(s);
- NNI_ARG_UNUSED(tcfg);
- return (NNG_ENOTSUP);
-#endif
+ int rv;
+ rv = nni_stream_listener_setx(s->listener, NNG_OPT_TLS_CONFIG, &tls,
+ sizeof(tls), NNI_TYPE_POINTER);
+ return (rv);
}
int
-nni_http_server_get_tls(nni_http_server *s, nng_tls_config **tp)
+nni_http_server_get_tls(nni_http_server *s, nng_tls_config **tlsp)
{
-#ifdef NNG_SUPP_TLS
- nni_mtx_lock(&s->mtx);
- if (s->tls == NULL) {
- nni_mtx_unlock(&s->mtx);
- return (NNG_EINVAL);
- }
- nni_tls_config_hold(s->tls);
- *tp = s->tls;
- nni_mtx_unlock(&s->mtx);
- return (0);
-#else
- NNI_ARG_UNUSED(s);
- NNI_ARG_UNUSED(tp);
- return (NNG_ENOTSUP);
-#endif
+ size_t sz = sizeof(*tlsp);
+ int rv;
+ rv = nni_stream_listener_getx(
+ s->listener, NNG_OPT_TLS_CONFIG, tlsp, &sz, NNI_TYPE_POINTER);
+ return (rv);
+}
+
+int
+nni_http_server_setx(nni_http_server *s, const char *name, const void *buf,
+ size_t sz, nni_type t)
+{
+ // We have no local options, but we just pass them straight through.
+ return (nni_stream_listener_setx(s->listener, name, buf, sz, t));
+}
+
+int
+nni_http_server_getx(
+ nni_http_server *s, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ return (nni_stream_listener_getx(s->listener, name, buf, szp, t));
}
void
diff --git a/src/supplemental/ipc/CMakeLists.txt b/src/supplemental/ipc/CMakeLists.txt
deleted file mode 100644
index 3bc0e4de..00000000
--- a/src/supplemental/ipc/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Copyright 2018 Capitar IT Group BV <info@capitar.com>
-# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-# Copyright 2018 Devolutions <info@devolutions.net>
-#
-# This software is supplied under the terms of the MIT License, a
-# copy of which should be located in the distribution where this
-# file was obtained (LICENSE.txt). A copy of the license may also be
-# found online at https://opensource.org/licenses/MIT.
-#
-
-set(_SRCS supplemental/ipc/ipc.c
- ${PROJECT_SOURCE_DIR}/include/nng/supplemental/ipc/ipc.h
-)
-
-set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE)
diff --git a/src/supplemental/ipc/ipc.c b/src/supplemental/ipc/ipc.c
deleted file mode 100644
index cf78dfbd..00000000
--- a/src/supplemental/ipc/ipc.c
+++ /dev/null
@@ -1,138 +0,0 @@
-//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <nng/nng.h>
-#include <nng/supplemental/ipc/ipc.h>
-
-#include "core/nng_impl.h"
-
-// This is our "public" IPC API. This allows applications to access
-// basic IPC functions, using our AIO framework. Most applications will
-// not need this.
-
-// We treat nng_ipc as nni_ipc_conn, nng_ipc_dialer as nni_ipc_dialer,
-// and nng_ipc_listener as nni_ipc_listener. We cast through void to
-// provide isolation of the names in a way that makes the compiler happy.
-// It turns out we can pretty much just wrap the platform API for IPC that
-// we have already created.
-
-void
-nng_ipc_close(nng_ipc *ipc)
-{
- nni_ipc_conn_close((void *) ipc);
-}
-
-void
-nng_ipc_free(nng_ipc *ipc)
-{
- nni_ipc_conn_fini((void *) ipc);
-}
-
-void
-nng_ipc_send(nng_ipc *ipc, nng_aio *aio)
-{
- nni_ipc_conn_send((void *) ipc, aio);
-}
-
-void
-nng_ipc_recv(nng_ipc *ipc, nng_aio *aio)
-{
- nni_ipc_conn_recv((void *) ipc, aio);
-}
-
-int
-nng_ipc_setopt(nng_ipc *ipc, const char *name, const void *val, size_t sz)
-{
- return (
- nni_ipc_conn_setopt((void *) ipc, name, val, sz, NNI_TYPE_OPAQUE));
-}
-
-int
-nng_ipc_getopt(nng_ipc *ipc, const char *name, void *val, size_t *szp)
-{
- return (nni_ipc_conn_getopt(
- (void *) ipc, name, val, szp, NNI_TYPE_OPAQUE));
-}
-
-int
-nng_ipc_dialer_alloc(nng_ipc_dialer **dp)
-{
- nni_ipc_dialer *d;
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
- if ((rv = nni_ipc_dialer_init(&d)) == 0) {
- *dp = (void *) d;
- }
- return (rv);
-}
-
-void
-nng_ipc_dialer_close(nng_ipc_dialer *d)
-{
- nni_ipc_dialer_close((void *) d);
-}
-
-void
-nng_ipc_dialer_free(nng_ipc_dialer *d)
-{
- nni_ipc_dialer_fini((void *) d);
-}
-
-void
-nng_ipc_dialer_dial(nng_ipc_dialer *d, const nng_sockaddr *sa, nng_aio *aio)
-{
- nni_ipc_dialer_dial((void *) d, sa, aio);
-}
-
-int
-nng_ipc_listener_alloc(nng_ipc_listener **lp)
-{
- nni_ipc_listener *l;
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
- if ((rv = nni_ipc_listener_init(&l)) == 0) {
- *lp = (void *) l;
- }
- return (rv);
-}
-
-void
-nng_ipc_listener_close(nng_ipc_listener *l)
-{
- nni_ipc_listener_close((void *) l);
-}
-
-void
-nng_ipc_listener_free(nng_ipc_listener *l)
-{
- nni_ipc_listener_fini((void *) l);
-}
-
-int
-nng_ipc_listener_listen(nng_ipc_listener *l, const nng_sockaddr *sa)
-{
- return (nni_ipc_listener_listen((void *) l, sa));
-}
-
-void
-nng_ipc_listener_accept(nng_ipc_listener *l, nng_aio *aio)
-{
- nni_ipc_listener_accept((void *) l, aio);
-}
diff --git a/src/supplemental/tcp/CMakeLists.txt b/src/supplemental/tcp/CMakeLists.txt
index ef82b098..09f917f8 100644
--- a/src/supplemental/tcp/CMakeLists.txt
+++ b/src/supplemental/tcp/CMakeLists.txt
@@ -1,6 +1,6 @@
#
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
-# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -8,8 +8,6 @@
# found online at https://opensource.org/licenses/MIT.
#
-set(_SRCS supplemental/tcp/tcp.c
- ${PROJECT_SOURCE_DIR}/include/nng/supplemental/tcp/tcp.h
-)
+set(_SRCS supplemental/tcp/tcp.c)
set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE)
diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c
index b8410b16..3ec396b8 100644
--- a/src/supplemental/tcp/tcp.c
+++ b/src/supplemental/tcp/tcp.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -11,144 +11,428 @@
#include <stddef.h>
#include <stdint.h>
+#include <string.h>
#include <nng/nng.h>
-#include <nng/supplemental/tcp/tcp.h>
#include "core/nng_impl.h"
+#include "core/tcp.h"
-// This is our "public" TCP API. This allows applications to access
-// basic TCP functions, using our AIO framework. Most applications will
-// not need this.
+typedef struct {
+ nng_stream_dialer ops;
+ char * host;
+ char * port;
+ int af; // address family
+ bool closed;
+ nng_sockaddr sa;
+ nni_tcp_dialer * d; // platform dialer implementation
+ nni_aio * resaio; // resolver aio
+ nni_aio * conaio; // platform connection aio
+ nni_list resaios;
+ nni_list conaios;
+ nni_mtx mtx;
+} tcp_dialer;
-// We treat nng_tcp as nni_tcp_conn, nng_tcp_dialer as nni_tcp_dialer,
-// and nng_tcp_listener as nni_tcp_listener. We cast through void to
-// provide isolation of the names in a way that makes the compiler happy.
-// It turns out we can pretty much just wrap the platform API for TCP that
-// we have already created.
-
-void
-nng_tcp_close(nng_tcp *tcp)
+static void
+tcp_dial_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_tcp_conn_close((void *) tcp);
-}
+ tcp_dialer *d = arg;
-void
-nng_tcp_free(nng_tcp *tcp)
-{
- nni_tcp_conn_fini((void *) tcp);
+ nni_mtx_lock(&d->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+
+ if (nni_list_empty(&d->conaios)) {
+ nni_aio_abort(d->conaio, NNG_ECANCELED);
+ }
+ if (nni_list_empty(&d->resaios)) {
+ nni_aio_abort(d->resaio, NNG_ECANCELED);
+ }
+ }
+ nni_mtx_unlock(&d->mtx);
}
-void
-nng_tcp_send(nng_tcp *tcp, nng_aio *aio)
+static void
+tcp_dial_res_cb(void *arg)
{
- nni_tcp_conn_send((void *) tcp, aio);
+ tcp_dialer *d = arg;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ if (d->closed || ((aio = nni_list_first(&d->resaios)) == NULL)) {
+ // ignore this.
+ while ((aio = nni_list_first(&d->resaios)) != NULL) {
+ nni_list_remove(&d->resaios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ nni_list_remove(&d->resaios, aio);
+
+ if ((rv = nni_aio_result(d->resaio)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nng_sockaddr sa;
+ nni_aio_get_sockaddr(d->resaio, &sa);
+ nni_aio_set_sockaddr(aio, &sa);
+ nni_list_append(&d->conaios, aio);
+ if (nni_list_first(&d->conaios) == aio) {
+ nni_aio_set_sockaddr(d->conaio, &sa);
+ nni_tcp_dial(d->d, d->conaio);
+ }
+ }
+
+ if (!nni_list_empty(&d->resaios)) {
+ nni_tcp_resolv(d->host, d->port, d->af, 0, d->resaio);
+ }
+ nni_mtx_unlock(&d->mtx);
}
-void
-nng_tcp_recv(nng_tcp *tcp, nng_aio *aio)
+static void
+tcp_dial_con_cb(void *arg)
{
- nni_tcp_conn_recv((void *) tcp, aio);
+ tcp_dialer *d = arg;
+ nng_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&d->mtx);
+ rv = nni_aio_result(d->conaio);
+ if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
+ if (rv == 0) {
+ // Make sure we discard the underlying connection.
+ nng_stream_free(nni_aio_get_output(d->conaio, 0));
+ nni_aio_set_output(d->conaio, 0, NULL);
+ }
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_list_remove(&d->conaios, aio);
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_set_output(aio, 0, nni_aio_get_output(d->conaio, 0));
+ nni_aio_finish(aio, 0, 0);
+ }
+
+ if ((aio = nni_list_first(&d->conaios)) != NULL) {
+ nng_sockaddr sa;
+ nni_aio_get_sockaddr(aio, &sa);
+ nni_aio_set_sockaddr(d->conaio, &sa);
+ nni_tcp_dial(d->d, d->conaio);
+ }
+ nni_mtx_unlock(&d->mtx);
}
-int
-nng_tcp_getopt(nng_tcp *tcp, const char *name, void *buf, size_t *szp)
+static void
+tcp_dialer_close(void *arg)
{
- return (nni_tcp_conn_getopt(
- (void *) tcp, name, buf, szp, NNI_TYPE_OPAQUE));
+ tcp_dialer *d = arg;
+ nni_aio * aio;
+ nni_mtx_lock(&d->mtx);
+ d->closed = true;
+ while ((aio = nni_list_first(&d->resaios)) != NULL) {
+ nni_list_remove(&d->resaios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&d->conaios)) != NULL) {
+ nni_list_remove(&d->conaios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_tcp_dialer_close(d->d);
+ nni_mtx_unlock(&d->mtx);
}
-int
-nng_tcp_setopt(nng_tcp *tcp, const char *name, const void *buf, size_t sz)
+static void
+tcp_dialer_free(void *arg)
{
- return (
- nni_tcp_conn_setopt((void *) tcp, name, buf, sz, NNI_TYPE_OPAQUE));
+ tcp_dialer *d = arg;
+
+ if (d == NULL) {
+ return;
+ }
+
+ if (d->d != NULL) {
+ nni_tcp_dialer_close(d->d);
+ nni_tcp_dialer_fini(d->d);
+ }
+ nni_strfree(d->host);
+ nni_strfree(d->port);
+ nni_aio_fini(d->resaio);
+ nni_aio_fini(d->conaio);
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
}
-int
-nng_tcp_dialer_alloc(nng_tcp_dialer **dp)
+static void
+tcp_dialer_dial(void *arg, nng_aio *aio)
{
- nni_tcp_dialer *d;
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
+ tcp_dialer *d = arg;
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- if ((rv = nni_tcp_dialer_init(&d)) == 0) {
- *dp = (void *) d;
+ if (d->host != NULL) {
+ nni_list_append(&d->resaios, aio);
+ if (nni_list_first(&d->resaios) == aio) {
+ nni_tcp_resolv(d->host, d->port, d->af, 0, d->resaio);
+ }
+ } else {
+ nni_list_append(&d->conaios, aio);
+ if (nni_list_first(&d->conaios) == aio) {
+ nni_aio_set_sockaddr(d->conaio, &d->sa);
+ nni_tcp_dial(d->d, d->conaio);
+ }
}
- return (rv);
+ nni_mtx_unlock(&d->mtx);
}
-void
-nng_tcp_dialer_close(nng_tcp_dialer *d)
+static int
+tcp_dialer_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- nni_tcp_dialer_close((void *) d);
+ tcp_dialer *d = arg;
+ return (nni_tcp_dialer_getopt(d->d, name, buf, szp, t));
}
-void
-nng_tcp_dialer_free(nng_tcp_dialer *d)
+static int
+tcp_dialer_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- nni_tcp_dialer_fini((void *) d);
+ tcp_dialer *d = arg;
+ return (nni_tcp_dialer_setopt(d->d, name, buf, sz, t));
}
-void
-nng_tcp_dialer_dial(nng_tcp_dialer *d, const nng_sockaddr *sa, nng_aio *aio)
+static int
+tcp_dialer_alloc(tcp_dialer **dp)
{
- nni_tcp_dialer_dial((void *) d, sa, aio);
+ int rv;
+ tcp_dialer *d;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->resaios);
+ nni_aio_list_init(&d->conaios);
+
+ if (((rv = nni_aio_init(&d->resaio, tcp_dial_res_cb, d)) != 0) ||
+ ((rv = nni_aio_init(&d->conaio, tcp_dial_con_cb, d)) != 0) ||
+ ((rv = nni_tcp_dialer_init(&d->d)) != 0)) {
+ tcp_dialer_free(d);
+ return (rv);
+ }
+
+ d->ops.sd_close = tcp_dialer_close;
+ d->ops.sd_free = tcp_dialer_free;
+ d->ops.sd_dial = tcp_dialer_dial;
+ d->ops.sd_getx = tcp_dialer_getx;
+ d->ops.sd_setx = tcp_dialer_setx;
+
+ *dp = d;
+ return (0);
}
int
-nng_tcp_listener_alloc(nng_tcp_listener **lp)
+nni_tcp_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
{
- nni_tcp_listener *l;
- int rv;
+ tcp_dialer *d;
+ int rv;
+ const char *p;
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((rv = nni_tcp_listener_init(&l)) == 0) {
- *lp = (void *) l;
+
+ if ((rv = tcp_dialer_alloc(&d)) != 0) {
+ return (rv);
}
- return (rv);
+
+ if (((p = url->u_port) == NULL) || (strlen(p) == 0)) {
+ p = nni_url_default_port(url->u_scheme);
+ }
+
+ if ((strlen(p) == 0) || (strlen(url->u_hostname) == 0)) {
+ // Dialer needs both a destination hostname and port.
+ tcp_dialer_free(d);
+ return (NNG_EADDRINVAL);
+ }
+
+ if (strchr(url->u_scheme, '4') != NULL) {
+ d->af = NNG_AF_INET;
+ } else if (strchr(url->u_scheme, '6') != NULL) {
+ d->af = NNG_AF_INET6;
+ } else {
+ d->af = NNG_AF_UNSPEC;
+ }
+
+ if (((d->host = nng_strdup(url->u_hostname)) == NULL) ||
+ ((d->port = nng_strdup(p)) == NULL)) {
+ tcp_dialer_free(d);
+ return (NNG_ENOMEM);
+ }
+
+ *dp = (void *) d;
+ return (0);
}
-void
-nng_tcp_listener_close(nng_tcp_listener *l)
+typedef struct {
+ nng_stream_listener ops;
+ nni_tcp_listener * l;
+ nng_sockaddr sa;
+} tcp_listener;
+
+static void
+tcp_listener_close(void *arg)
{
- nni_tcp_listener_close((void *) l);
+ tcp_listener *l = arg;
+ nni_tcp_listener_close(l->l);
}
-void
-nng_tcp_listener_free(nng_tcp_listener *l)
+static void
+tcp_listener_free(void *arg)
{
- nni_tcp_listener_fini((void *) l);
+ tcp_listener *l = arg;
+ nni_tcp_listener_fini(l->l);
+ NNI_FREE_STRUCT(l);
}
-int
-nng_tcp_listener_listen(nng_tcp_listener *l, const nng_sockaddr *sa)
+static int
+tcp_listener_listen(void *arg)
+{
+ tcp_listener *l = arg;
+ return (nni_tcp_listener_listen(l->l, &l->sa));
+}
+
+static void
+tcp_listener_accept(void *arg, nng_aio *aio)
{
- return (nni_tcp_listener_listen((void *) l, sa));
+ tcp_listener *l = arg;
+ nni_tcp_listener_accept(l->l, aio);
}
-void
-nng_tcp_listener_accept(nng_tcp_listener *l, nng_aio *aio)
+static int
+tcp_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- nni_tcp_listener_accept((void *) l, aio);
+ tcp_listener *l = arg;
+ return (nni_tcp_listener_getopt(l->l, name, buf, szp, t));
+}
+
+static int
+tcp_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ tcp_listener *l = arg;
+ return (nni_tcp_listener_setopt(l->l, name, buf, sz, t));
+}
+
+static int
+tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
+{
+ tcp_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_tcp_listener_init(&l->l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->sa = *sa;
+
+ l->ops.sl_free = tcp_listener_free;
+ l->ops.sl_close = tcp_listener_close;
+ l->ops.sl_listen = tcp_listener_listen;
+ l->ops.sl_accept = tcp_listener_accept;
+ l->ops.sl_getx = tcp_listener_getx;
+ l->ops.sl_setx = tcp_listener_setx;
+
+ *lp = (void *) l;
+ return (0);
}
int
-nng_tcp_listener_getopt(
- nng_tcp_listener *l, const char *name, void *buf, size_t *szp)
+nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url)
{
- return (nni_tcp_listener_getopt(
- (void *) l, name, buf, szp, NNI_TYPE_OPAQUE));
+ nni_aio * aio;
+ int af;
+ int rv;
+ nng_sockaddr sa;
+ const char * h;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if (strchr(url->u_scheme, '4') != NULL) {
+ af = NNG_AF_INET;
+ } else if (strchr(url->u_scheme, '6') != NULL) {
+ af = NNG_AF_INET6;
+ } else {
+ af = NNG_AF_UNSPEC;
+ }
+
+ if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+
+ h = url->u_hostname;
+
+ // Wildcard special case, which means bind to INADDR_ANY.
+ if ((h != NULL) && ((strcmp(h, "*") == 0) || (strlen(h) == 0))) {
+ h = NULL;
+ }
+ nni_tcp_resolv(h, url->u_port, af, 1, aio);
+ nni_aio_wait(aio);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
+ return (rv);
+ }
+ nni_aio_get_sockaddr(aio, &sa);
+ nni_aio_fini(aio);
+
+ return (tcp_listener_alloc_addr(lp, &sa));
}
+static int
+tcp_check_bool(const void *val, size_t sz, nni_type t)
+{
+ return (nni_copyin_bool(NULL, val, sz, t));
+}
+
+static const nni_chkoption tcp_chkopts[] = {
+ {
+ .o_name = NNG_OPT_TCP_KEEPALIVE,
+ .o_check = tcp_check_bool,
+ },
+ {
+ .o_name = NNG_OPT_TCP_NODELAY,
+ .o_check = tcp_check_bool,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
int
-nng_tcp_listener_setopt(
- nng_tcp_listener *l, const char *name, const void *buf, size_t sz)
+nni_tcp_checkopt(const char *name, const void *data, size_t sz, nni_type t)
{
- return (nni_tcp_listener_setopt(
- (void *) l, name, buf, sz, NNI_TYPE_OPAQUE));
-} \ No newline at end of file
+ return (nni_chkopt(tcp_chkopts, name, data, sz, t));
+}
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index 9f1e8f83..d3816747 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -28,10 +28,9 @@
#include "mbedtls/ssl.h"
#include "core/nng_impl.h"
+#include "core/tcp.h"
#include "supplemental/tls/tls_api.h"
-#include <nng/supplemental/tls/tls.h>
-
// Implementation note. This implementation buffers data between the TLS
// encryption layer (mbedTLS) and the underlying TCP socket. As a result,
// there may be some additional latency caused by buffer draining and
@@ -59,8 +58,9 @@ typedef struct nni_tls_certkey {
nni_list_node node;
} nni_tls_certkey;
-struct nni_tls {
- nni_tcp_conn * tcp;
+typedef struct {
+ nni_tls_common com;
+ nng_stream * tcp;
mbedtls_ssl_context ctx;
nng_tls_config * cfg; // kept so we can release it
nni_mtx lk;
@@ -81,7 +81,7 @@ struct nni_tls {
nni_list sends; // upper side sends
nni_list recvs; // upper recv aios
nni_aio * handshake; // handshake aio (upper)
-};
+} tls;
struct nng_tls_config {
mbedtls_ssl_config cfg_ctx;
@@ -100,18 +100,18 @@ struct nng_tls_config {
nni_list certkeys;
};
-static void nni_tls_send_cb(void *);
-static void nni_tls_recv_cb(void *);
+static void tls_send_cb(void *);
+static void tls_recv_cb(void *);
-static void nni_tls_do_send(nni_tls *);
-static void nni_tls_do_recv(nni_tls *);
-static void nni_tls_do_handshake(nni_tls *);
+static void tls_do_send(tls *);
+static void tls_do_recv(tls *);
+static void tls_do_handshake(tls *);
-static int nni_tls_net_send(void *, const unsigned char *, size_t);
-static int nni_tls_net_recv(void *, unsigned char *, size_t);
+static int tls_net_send(void *, const unsigned char *, size_t);
+static int tls_net_recv(void *, unsigned char *, size_t);
static void
-nni_tls_dbg(void *ctx, int level, const char *file, int line, const char *s)
+tls_dbg(void *ctx, int level, const char *file, int line, const char *s)
{
char buf[128];
NNI_ARG_UNUSED(ctx);
@@ -121,7 +121,7 @@ nni_tls_dbg(void *ctx, int level, const char *file, int line, const char *s)
}
static int
-nni_tls_get_entropy(void *arg, unsigned char *buf, size_t len)
+tls_get_entropy(void *arg, unsigned char *buf, size_t len)
{
NNI_ARG_UNUSED(arg);
while (len) {
@@ -137,7 +137,7 @@ nni_tls_get_entropy(void *arg, unsigned char *buf, size_t len)
}
static int
-nni_tls_random(void *arg, unsigned char *buf, size_t sz)
+tls_random(void *arg, unsigned char *buf, size_t sz)
{
#ifdef NNG_TLS_USE_CTR_DRBG
int rv;
@@ -149,7 +149,7 @@ nni_tls_random(void *arg, unsigned char *buf, size_t sz)
nni_mtx_unlock(&cfg->rng_lk);
return (rv);
#else
- return (nni_tls_get_entropy(arg, buf, sz));
+ return (tls_get_entropy(arg, buf, sz));
#endif
}
@@ -230,15 +230,15 @@ nni_tls_config_init(nng_tls_config **cpp, enum nng_tls_mode mode)
#ifdef NNG_TLS_USE_CTR_DRBG
mbedtls_ctr_drbg_init(&cfg->rng_ctx);
rv = mbedtls_ctr_drbg_seed(
- &cfg->rng_ctx, nni_tls_get_entropy, NULL, NULL, 0);
+ &cfg->rng_ctx, tls_get_entropy, NULL, NULL, 0);
if (rv != 0) {
nni_tls_config_fini(cfg);
return (rv);
}
#endif
- mbedtls_ssl_conf_rng(&cfg->cfg_ctx, nni_tls_random, cfg);
+ mbedtls_ssl_conf_rng(&cfg->cfg_ctx, tls_random, cfg);
- mbedtls_ssl_conf_dbg(&cfg->cfg_ctx, nni_tls_dbg, cfg);
+ mbedtls_ssl_conf_dbg(&cfg->cfg_ctx, tls_dbg, cfg);
*cpp = cfg;
return (0);
@@ -252,41 +252,12 @@ nni_tls_config_hold(nng_tls_config *cfg)
nni_mtx_unlock(&cfg->lk);
}
-void
-nni_tls_fini(nni_tls *tp)
-{
- // Shut it all down first.
- if (tp != NULL) {
- if (tp->tcp) {
- nni_tcp_conn_close(tp->tcp);
- }
- nni_aio_stop(tp->tcp_send);
- nni_aio_stop(tp->tcp_recv);
-
- // And finalize / free everything.
- if (tp->tcp) {
- nni_tcp_conn_fini(tp->tcp);
- }
- nni_aio_fini(tp->tcp_send);
- nni_aio_fini(tp->tcp_recv);
- mbedtls_ssl_free(&tp->ctx);
- nni_mtx_fini(&tp->lk);
- nni_free(tp->recvbuf, NNG_TLS_MAX_RECV_SIZE);
- nni_free(tp->sendbuf, NNG_TLS_MAX_RECV_SIZE);
- if (tp->cfg != NULL) {
- // release the hold we got on it
- nni_tls_config_fini(tp->cfg);
- }
- NNI_FREE_STRUCT(tp);
- }
-}
-
-// nni_tls_mkerr converts an mbed error to an NNG error. In all cases
+// tls_mkerr converts an mbed error to an NNG error. In all cases
// we just encode with NNG_ETRANERR.
static struct {
int tls;
int nng;
-} nni_tls_errs[] = {
+} tls_errs[] = {
{ MBEDTLS_ERR_SSL_NO_CLIENT_CERTIFICATE, NNG_EPEERAUTH },
{ MBEDTLS_ERR_SSL_CA_CHAIN_REQUIRED, NNG_EPEERAUTH },
{ MBEDTLS_ERR_SSL_PEER_VERIFY_FAILED, NNG_EPEERAUTH },
@@ -300,60 +271,74 @@ static struct {
};
static int
-nni_tls_mkerr(int err)
+tls_mkerr(int err)
{
- for (int i = 0; nni_tls_errs[i].tls != 0; i++) {
- if (nni_tls_errs[i].tls == err) {
- return (nni_tls_errs[i].nng);
+ for (int i = 0; tls_errs[i].tls != 0; i++) {
+ if (tls_errs[i].tls == err) {
+ return (tls_errs[i].nng);
}
}
return (NNG_ECRYPTO);
}
-int
-nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp)
+// The common code should call this only after it has released
+// it's upper layer stuff.
+static void
+tls_free(void *arg)
{
- nni_tls *tp;
- int rv;
- bool on = true;
+ tls *tls = arg;
+
+ // Shut it all down first.
+ if (tls != NULL) {
+ if (tls->tcp != NULL) {
+ nng_stream_close(tls->tcp);
+ }
+ nni_aio_stop(tls->tcp_send);
+ nni_aio_stop(tls->tcp_recv);
- // During the handshake, disable Nagle to shorten the
- // negotiation. Once things are set up the caller can
- // re-enable Nagle if so desired.
- (void) nni_tcp_conn_setopt(
- tcp, NNG_OPT_TCP_NODELAY, &on, sizeof(on), NNI_TYPE_BOOL);
+ nni_aio_fini(tls->com.aio);
+ nng_tls_config_free(tls->com.cfg);
- if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) {
- return (NNG_ENOMEM);
+ // And finalize / free everything.
+ nng_stream_free(tls->tcp);
+ nni_aio_fini(tls->tcp_send);
+ nni_aio_fini(tls->tcp_recv);
+ mbedtls_ssl_free(&tls->ctx);
+ if (tls->recvbuf != NULL) {
+ nni_free(tls->recvbuf, NNG_TLS_MAX_RECV_SIZE);
+ }
+ if (tls->sendbuf != NULL) {
+ nni_free(tls->sendbuf, NNG_TLS_MAX_RECV_SIZE);
+ }
+ nni_mtx_fini(&tls->lk);
+ NNI_FREE_STRUCT(tls);
}
+}
+
+int
+nni_tls_start(nng_stream *arg, nng_stream *tcp)
+{
+ tls * tp = (void *) arg;
+ nng_tls_config *cfg = tp->com.cfg;
+ int rv;
+
if ((tp->recvbuf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) == NULL) {
- NNI_FREE_STRUCT(tp);
return (NNG_ENOMEM);
}
if ((tp->sendbuf = nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) == NULL) {
- nni_free(tp->sendbuf, NNG_TLS_MAX_RECV_SIZE);
- NNI_FREE_STRUCT(tp);
return (NNG_ENOMEM);
}
nni_mtx_lock(&cfg->lk);
// No more changes allowed to config.
cfg->active = true;
- cfg->refcnt++;
- tp->cfg = cfg;
nni_mtx_unlock(&cfg->lk);
- nni_aio_list_init(&tp->sends);
- nni_aio_list_init(&tp->recvs);
- nni_mtx_init(&tp->lk);
mbedtls_ssl_init(&tp->ctx);
- mbedtls_ssl_set_bio(
- &tp->ctx, tp, nni_tls_net_send, nni_tls_net_recv, NULL);
+ mbedtls_ssl_set_bio(&tp->ctx, tp, tls_net_send, tls_net_recv, NULL);
if ((rv = mbedtls_ssl_setup(&tp->ctx, &cfg->cfg_ctx)) != 0) {
- rv = nni_tls_mkerr(rv);
- nni_tls_fini(tp);
- return (rv);
+ return (tls_mkerr(rv));
}
if (cfg->server_name) {
@@ -362,25 +347,23 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp)
tp->tcp = tcp;
- if (((rv = nni_aio_init(&tp->tcp_send, nni_tls_send_cb, tp)) != 0) ||
- ((rv = nni_aio_init(&tp->tcp_recv, nni_tls_recv_cb, tp)) != 0)) {
- nni_tls_fini(tp);
+ if (((rv = nni_aio_init(&tp->tcp_send, tls_send_cb, tp)) != 0) ||
+ ((rv = nni_aio_init(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) {
return (rv);
}
nni_mtx_lock(&tp->lk);
// Kick off a handshake operation.
- nni_tls_do_handshake(tp);
+ tls_do_handshake(tp);
nni_mtx_unlock(&tp->lk);
- *tpp = tp;
return (0);
}
static void
-nni_tls_cancel(nni_aio *aio, void *arg, int rv)
+tls_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_tls *tp = arg;
+ tls *tp = arg;
nni_mtx_lock(&tp->lk);
if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
@@ -389,33 +372,16 @@ nni_tls_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&tp->lk);
}
+// tls_send_cb is called when the underlying TCP send completes.
static void
-nni_tls_fail(nni_tls *tp, int rv)
-{
- nni_aio *aio;
- tp->tls_closed = true;
- nni_tcp_conn_close(tp->tcp);
- tp->tcp_closed = true;
- while ((aio = nni_list_first(&tp->recvs)) != NULL) {
- nni_list_remove(&tp->recvs, aio);
- nni_aio_finish_error(aio, rv);
- }
- while ((aio = nni_list_first(&tp->sends)) != NULL) {
- nni_list_remove(&tp->recvs, aio);
- nni_aio_finish_error(aio, rv);
- }
-}
-
-// nni_tls_send_cb is called when the underlying TCP send completes.
-static void
-nni_tls_send_cb(void *ctx)
+tls_send_cb(void *ctx)
{
- nni_tls *tp = ctx;
+ tls * tp = ctx;
nni_aio *aio = tp->tcp_send;
nni_mtx_lock(&tp->lk);
if (nni_aio_result(aio) != 0) {
- nni_tcp_conn_close(tp->tcp);
+ nng_stream_close(tp->tcp);
tp->tcp_closed = true;
} else {
size_t n = nni_aio_count(aio);
@@ -428,25 +394,24 @@ nni_tls_send_cb(void *ctx)
iov.iov_len = tp->sendlen;
nni_aio_set_iov(aio, 1, &iov);
nni_aio_set_timeout(aio, NNG_DURATION_INFINITE);
- nni_tcp_conn_send(tp->tcp, aio);
+ nng_stream_send(tp->tcp, aio);
nni_mtx_unlock(&tp->lk);
return;
}
tp->sendoff = 0;
tp->sending = false;
}
- if (!tp->hsdone) {
- nni_tls_do_handshake(tp);
- }
+
+ tls_do_handshake(tp);
if (tp->hsdone) {
- nni_tls_do_send(tp);
- nni_tls_do_recv(tp);
+ tls_do_send(tp);
+ tls_do_recv(tp);
}
nni_mtx_unlock(&tp->lk);
}
static void
-nni_tls_recv_start(nni_tls *tp)
+tls_recv_start(tls *tp)
{
nni_aio *aio;
nni_iov iov;
@@ -467,13 +432,13 @@ nni_tls_recv_start(nni_tls *tp)
iov.iov_len = NNG_TLS_MAX_RECV_SIZE;
nni_aio_set_iov(aio, 1, &iov);
nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE);
- nni_tcp_conn_recv(tp->tcp, aio);
+ nng_stream_recv(tp->tcp, aio);
}
static void
-nni_tls_recv_cb(void *ctx)
+tls_recv_cb(void *ctx)
{
- nni_tls *tp = ctx;
+ tls * tp = ctx;
nni_aio *aio = tp->tcp_recv;
nni_mtx_lock(&tp->lk);
@@ -481,7 +446,7 @@ nni_tls_recv_cb(void *ctx)
if (nni_aio_result(aio) != 0) {
// Close the underlying TCP channel, but permit data we
// already received to continue to be received.
- nni_tcp_conn_close(tp->tcp);
+ nng_stream_close(tp->tcp);
tp->tcp_closed = true;
} else {
NNI_ASSERT(tp->recvlen == 0);
@@ -492,12 +457,10 @@ nni_tls_recv_cb(void *ctx)
// If we were closed (above), the upper layer will detect and
// react properly. Otherwise the upper layer will consume
// data.
- if (!tp->hsdone) {
- nni_tls_do_handshake(tp);
- }
+ tls_do_handshake(tp);
if (tp->hsdone) {
- nni_tls_do_recv(tp);
- nni_tls_do_send(tp);
+ tls_do_recv(tp);
+ tls_do_send(tp);
}
nni_mtx_unlock(&tp->lk);
@@ -511,10 +474,10 @@ nni_tls_recv_cb(void *ctx)
// ridiculous over queueing. This is always called with the pipe
// lock held, and never blocks.
static int
-nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len)
+tls_net_send(void *ctx, const unsigned char *buf, size_t len)
{
- nni_tls *tp = ctx;
- nni_iov iov;
+ tls * tp = ctx;
+ nni_iov iov;
if (len > NNG_TLS_MAX_SEND_SIZE) {
len = NNG_TLS_MAX_SEND_SIZE;
@@ -538,22 +501,24 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len)
iov.iov_len = len;
nni_aio_set_iov(tp->tcp_send, 1, &iov);
nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE);
- nni_tcp_conn_send(tp->tcp, tp->tcp_send);
+ nng_stream_send(tp->tcp, tp->tcp_send);
return (len);
}
static int
-nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len)
+tls_net_recv(void *ctx, unsigned char *buf, size_t len)
{
- nni_tls *tp = ctx;
+ tls *tp = ctx;
// We should already be running with the pipe lock held,
// as we are running in that context.
- if (tp->tcp_closed && tp->recvlen == 0) {
- return (MBEDTLS_ERR_NET_RECV_FAILED);
- }
if (tp->recvlen == 0) {
+ if (tp->tcp_closed) {
+ // The underlying TCP transport has closed, and we
+ // have no more data in our receive buffer.
+ return (MBEDTLS_ERR_NET_RECV_FAILED);
+ }
len = MBEDTLS_ERR_SSL_WANT_READ;
} else {
if (len > tp->recvlen) {
@@ -564,16 +529,15 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len)
tp->recvlen -= len;
}
- nni_tls_recv_start(tp);
+ tls_recv_start(tp);
return ((int) len);
}
-// nni_tls_send is the exported send function. It has a similar
-// calling convention as the platform TCP pipe.
-void
-nni_tls_send(nni_tls *tp, nni_aio *aio)
+static void
+tls_send(void *arg, nni_aio *aio)
{
- int rv;
+ int rv;
+ tls *tp = arg;
if (nni_aio_begin(aio) != 0) {
return;
@@ -584,20 +548,21 @@ nni_tls_send(nni_tls *tp, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tls_cancel, tp)) != 0) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&tp->sends, aio);
- nni_tls_do_send(tp);
+ tls_do_send(tp);
nni_mtx_unlock(&tp->lk);
}
-void
-nni_tls_recv(nni_tls *tp, nni_aio *aio)
+static void
+tls_recv(void *arg, nni_aio *aio)
{
- int rv;
+ int rv;
+ tls *tp = arg;
if (nni_aio_begin(aio) != 0) {
return;
@@ -608,22 +573,22 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tls_cancel, tp)) != 0) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&tp->recvs, aio);
- nni_tls_do_recv(tp);
+ tls_do_recv(tp);
nni_mtx_unlock(&tp->lk);
}
static int
tls_get_verified(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_tls *tp = arg;
- bool v = (mbedtls_ssl_get_verify_result(&tp->ctx) == 0);
+ tls *tp = arg;
+ bool v = (mbedtls_ssl_get_verify_result(&tp->ctx) == 0);
return (nni_copyout_bool(v, buf, szp, t));
}
@@ -638,26 +603,28 @@ static const nni_option tls_options[] = {
},
};
-int
-nni_tls_setopt(
- nni_tls *tp, const char *name, const void *buf, size_t sz, nni_type t)
+static int
+tls_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- int rv;
+ tls * tp = arg;
+ int rv;
+ nng_stream *tcp;
- if ((rv = nni_tcp_conn_setopt(tp->tcp, name, buf, sz, t)) !=
- NNG_ENOTSUP) {
+ tcp = (tp != NULL) ? tp->tcp : NULL;
+
+ if ((rv = nni_stream_setx(tcp, name, buf, sz, t)) != NNG_ENOTSUP) {
return (rv);
}
return (nni_setopt(tls_options, name, tp, buf, sz, t));
}
-int
-nni_tls_getopt(
- nni_tls *tp, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tls_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- int rv;
+ tls *tp = arg;
+ int rv;
- if ((rv = nni_tcp_conn_getopt(tp->tcp, name, buf, szp, t)) !=
+ if ((rv = nni_stream_getx(tp->tcp, name, buf, szp, t)) !=
NNG_ENOTSUP) {
return (rv);
}
@@ -665,11 +632,12 @@ nni_tls_getopt(
}
static void
-nni_tls_do_handshake(nni_tls *tp)
+tls_do_handshake(tls *tp)
{
- int rv;
+ int rv;
+ nni_aio *aio;
- if (tp->tls_closed) {
+ if (tp->hsdone || tp->tls_closed) {
return;
}
rv = mbedtls_ssl_handshake(&tp->ctx);
@@ -686,15 +654,24 @@ nni_tls_do_handshake(nni_tls *tp)
default:
// some other error occurred, this causes us to tear it down
- nni_tls_fail(tp, nni_tls_mkerr(rv));
+ nng_stream_close(tp->tcp);
+ tp->tls_closed = true;
+ tp->tcp_closed = true;
+ rv = tls_mkerr(rv);
+
+ while (((aio = nni_list_first(&tp->recvs)) != NULL) ||
+ ((aio = nni_list_first(&tp->sends)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
}
}
-// nni_tls_do_send is called to try to send more data if we have not
+// tls_do_send is called to try to send more data if we have not
// yet completed the I/O. It also completes any transactions that
// *have* completed. It must be called with the lock held.
static void
-nni_tls_do_send(nni_tls *tp)
+tls_do_send(tls *tp)
{
nni_aio *aio;
@@ -732,7 +709,7 @@ nni_tls_do_send(nni_tls *tp)
// Want better diagnostics.
nni_aio_list_remove(aio);
if (n < 0) {
- nni_aio_finish_error(aio, nni_tls_mkerr(n));
+ nni_aio_finish_error(aio, tls_mkerr(n));
} else {
nni_aio_finish(aio, 0, n);
}
@@ -740,7 +717,7 @@ nni_tls_do_send(nni_tls *tp)
}
static void
-nni_tls_do_recv(nni_tls *tp)
+tls_do_recv(tls *tp)
{
nni_aio *aio;
@@ -777,16 +754,17 @@ nni_tls_do_recv(nni_tls *tp)
nni_aio_list_remove(aio);
if (n < 0) {
- nni_aio_finish_error(aio, nni_tls_mkerr(n));
+ nni_aio_finish_error(aio, tls_mkerr(n));
} else {
nni_aio_finish(aio, 0, n);
}
}
}
-void
-nni_tls_close(nni_tls *tp)
+static void
+tls_close(void *arg)
{
+ tls * tp = arg;
nni_aio *aio;
nni_aio_close(tp->tcp_send);
@@ -795,12 +773,8 @@ nni_tls_close(nni_tls *tp)
nni_mtx_lock(&tp->lk);
tp->tls_closed = true;
- while ((aio = nni_list_first(&tp->sends)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
-
- while ((aio = nni_list_first(&tp->recvs)) != NULL) {
+ while (((aio = nni_list_first(&tp->sends)) != NULL) ||
+ ((aio = nni_list_first(&tp->recvs)) != NULL)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
@@ -813,11 +787,36 @@ nni_tls_close(nni_tls *tp)
// connection at this point.
(void) mbedtls_ssl_close_notify(&tp->ctx);
} else {
- nni_tcp_conn_close(tp->tcp);
+ nng_stream_close(tp->tcp);
}
nni_mtx_unlock(&tp->lk);
}
+// This allocates a TLS structure, that can be used by the caller.
+// The reason we have this API is so that the base structure can be
+// embedded in the parent structure.
+int
+nni_tls_alloc(nng_stream **tlsp)
+{
+ tls *tp;
+
+ if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_aio_list_init(&tp->sends);
+ nni_aio_list_init(&tp->recvs);
+ nni_mtx_init(&tp->lk);
+ tp->com.ops.s_close = tls_close;
+ tp->com.ops.s_free = tls_free;
+ tp->com.ops.s_send = tls_send;
+ tp->com.ops.s_recv = tls_recv;
+ tp->com.ops.s_getx = tls_getx;
+ tp->com.ops.s_setx = tls_setx;
+
+ *tlsp = (void *) tp;
+ return (0);
+}
+
int
nng_tls_config_server_name(nng_tls_config *cfg, const char *name)
{
@@ -882,14 +881,14 @@ nng_tls_config_ca_chain(
pem = (const uint8_t *) certs;
len = strlen(certs) + 1;
if ((rv = mbedtls_x509_crt_parse(&cfg->ca_certs, pem, len)) != 0) {
- rv = nni_tls_mkerr(rv);
+ rv = tls_mkerr(rv);
goto err;
}
if (crl != NULL) {
pem = (const uint8_t *) crl;
len = strlen(crl) + 1;
if ((rv = mbedtls_x509_crl_parse(&cfg->crl, pem, len)) != 0) {
- rv = nni_tls_mkerr(rv);
+ rv = tls_mkerr(rv);
goto err;
}
}
@@ -919,7 +918,7 @@ nng_tls_config_own_cert(
pem = (const uint8_t *) cert;
len = strlen(cert) + 1;
if ((rv = mbedtls_x509_crt_parse(&ck->crt, pem, len)) != 0) {
- rv = nni_tls_mkerr(rv);
+ rv = tls_mkerr(rv);
goto err;
}
@@ -928,7 +927,7 @@ nng_tls_config_own_cert(
rv = mbedtls_pk_parse_key(&ck->key, pem, len, (const uint8_t *) pass,
pass != NULL ? strlen(pass) : 0);
if (rv != 0) {
- rv = nni_tls_mkerr(rv);
+ rv = tls_mkerr(rv);
goto err;
}
@@ -941,7 +940,7 @@ nng_tls_config_own_cert(
rv = mbedtls_ssl_conf_own_cert(&cfg->cfg_ctx, &ck->crt, &ck->key);
if (rv != 0) {
nni_mtx_unlock(&cfg->lk);
- rv = nni_tls_mkerr(rv);
+ rv = tls_mkerr(rv);
goto err;
}
diff --git a/src/supplemental/tls/none/tls.c b/src/supplemental/tls/none/tls.c
index 257bb6b1..a1e70b73 100644
--- a/src/supplemental/tls/none/tls.c
+++ b/src/supplemental/tls/none/tls.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -20,8 +20,6 @@
#include "core/nng_impl.h"
#include "supplemental/tls/tls_api.h"
-#include <nng/supplemental/tls/tls.h>
-
void
nni_tls_config_fini(nng_tls_config *cfg)
{
@@ -42,68 +40,6 @@ nni_tls_config_hold(nng_tls_config *cfg)
NNI_ARG_UNUSED(cfg);
}
-void
-nni_tls_fini(nni_tls *tp)
-{
- NNI_ARG_UNUSED(tp);
-}
-
-int
-nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp)
-{
- NNI_ARG_UNUSED(tpp);
- NNI_ARG_UNUSED(cfg);
- NNI_ARG_UNUSED(tcp);
-
- return (NNG_ENOTSUP);
-}
-
-// nni_tls_send is the exported send function. It has a similar
-// calling convention as the platform TCP pipe.
-void
-nni_tls_send(nni_tls *tp, nni_aio *aio)
-{
- NNI_ARG_UNUSED(tp);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
-}
-
-void
-nni_tls_recv(nni_tls *tp, nni_aio *aio)
-{
- NNI_ARG_UNUSED(tp);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
-}
-
-void
-nni_tls_close(nni_tls *tp)
-{
- NNI_ARG_UNUSED(tp);
-}
-
-int
-nni_tls_getopt(
- nni_tls *tp, const char *name, void *buf, size_t *szp, nni_type t)
-{
- NNI_ARG_UNUSED(tp);
- NNI_ARG_UNUSED(name);
- NNI_ARG_UNUSED(buf);
- NNI_ARG_UNUSED(szp);
- NNI_ARG_UNUSED(t);
- return (NNG_ENOTSUP);
-}
-
-int
-nni_tls_setopt(
- nni_tls *tp, const char *name, const void *buf, size_t sz, nni_type t)
-{
- NNI_ARG_UNUSED(tp);
- NNI_ARG_UNUSED(name);
- NNI_ARG_UNUSED(buf);
- NNI_ARG_UNUSED(sz);
- NNI_ARG_UNUSED(t);
- return (NNG_ENOTSUP);
-}
-
int
nng_tls_config_server_name(nng_tls_config *cfg, const char *name)
{
@@ -190,3 +126,29 @@ nng_tls_config_free(nng_tls_config *cfg)
{
NNI_ARG_UNUSED(cfg);
}
+
+int
+nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
+{
+ NNI_ARG_UNUSED(dp);
+ NNI_ARG_UNUSED(url);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ NNI_ARG_UNUSED(lp);
+ NNI_ARG_UNUSED(url);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_tls_checkopt(const char *nm, const void *buf, size_t sz, nni_type t)
+{
+ NNI_ARG_UNUSED(nm);
+ NNI_ARG_UNUSED(buf);
+ NNI_ARG_UNUSED(sz);
+ NNI_ARG_UNUSED(t);
+ return (NNG_ENOTSUP);
+} \ No newline at end of file
diff --git a/src/supplemental/tls/tls_api.h b/src/supplemental/tls/tls_api.h
index 22dd68a0..4e6146b1 100644
--- a/src/supplemental/tls/tls_api.h
+++ b/src/supplemental/tls/tls_api.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -12,12 +12,28 @@
#ifndef NNG_SUPPLEMENTAL_TLS_TLS_API_H
#define NNG_SUPPLEMENTAL_TLS_TLS_API_H
-#include <stdbool.h>
-
#include <nng/supplemental/tls/tls.h>
-// nni_tls represents the context for a single TLS stream.
-typedef struct nni_tls nni_tls;
+// This nni_tls_common structure represents the "base" structure for
+// an implementation to extend. One of these must be the first member
+// of the implementation specific TLS stream struct.
+typedef struct {
+ nng_stream ops;
+ nni_aio * aio; // system aio for connect/accept
+ nni_aio * uaio; // user aio for connect/accept
+ nng_tls_config *cfg;
+} nni_tls_common;
+
+// The implementation supplies this function to create the TLS connection
+// object. All fields will be zeroed.
+extern int nni_tls_alloc(nng_stream **);
+extern int nni_tls_dialer_alloc(nng_stream_dialer **, const nng_url *);
+extern int nni_tls_listener_alloc(nng_stream_listener **, const nng_url *);
+extern int nni_tls_checkopt(const char *, const void *, size_t, nni_type);
+
+// nni_tls_start is called by the common TLS dialer/listener completions
+// to start the TLS stream activity. This may also do allocations, etc.
+extern int nni_tls_start(nng_stream *, nng_stream *);
// nni_tls_config_init creates a new TLS configuration object.
// The object is created with a reference count of one.
@@ -34,30 +50,4 @@ extern void nni_tls_config_fini(nng_tls_config *);
// the configuration object is created with a hold on it.
extern void nni_tls_config_hold(nng_tls_config *);
-extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_tcp_conn *);
-extern void nni_tls_close(nni_tls *);
-extern void nni_tls_fini(nni_tls *);
-extern void nni_tls_send(nni_tls *, nng_aio *);
-extern void nni_tls_recv(nni_tls *, nng_aio *);
-
-extern int nni_tls_setopt(
- nni_tls *, const char *, const void *, size_t, nni_type);
-extern int nni_tls_getopt(nni_tls *, const char *, void *, size_t *, nni_type);
-
-extern int nni_tls_set(
- nng_tls *, const char *, const void *, size_t, nni_type);
-extern int nni_tls_get(nng_tls *, const char *, void *, size_t *, nni_type);
-
-extern int nni_tls_dialer_setopt(
- nng_tls_dialer *, const char *, const void *, size_t, nni_type);
-
-extern int nni_tls_dialer_getopt(
- nng_tls_dialer *, const char *, void *, size_t *, nni_type);
-
-extern int nni_tls_listener_setopt(
- nng_tls_listener *, const char *, const void *, size_t, nni_type);
-
-extern int nni_tls_listener_getopt(
- nng_tls_listener *, const char *, void *, size_t *, nni_type);
-
#endif // NNG_SUPPLEMENTAL_TLS_TLS_API_H
diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c
index f93ca0ba..990d3add 100644
--- a/src/supplemental/tls/tls_common.c
+++ b/src/supplemental/tls/tls_common.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -15,9 +15,9 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "core/tcp.h"
#include "supplemental/tls/tls_api.h"
-#include <nng/supplemental/tcp/tcp.h>
#include <nng/supplemental/tls/tls.h>
// This file contains common code for TLS, and is only compiled if we
@@ -25,126 +25,29 @@
// parts of TLS support that are invariant relative to different TLS
// libraries, such as dialer and listener support.
-struct nng_tls_s {
- nni_tls * c;
- nni_aio * aio; // system aio for connect/accept
- nni_aio * uaio; // user aio for connect/accept
- nng_tls_config *cfg;
-};
-
-// We use a union and share an "endpoint" for both dialers and listeners.
-// This allows us to reuse the bulk of the code for things like option
-// handlers for both dialers and listeners.
-typedef union tls_tcp_ep_u {
- nni_tcp_dialer * d;
- nni_tcp_listener *l;
-} tls_tcp_ep;
-
-typedef struct nng_tls_ep_s {
- tls_tcp_ep tcp;
- nng_tls_config *cfg;
- nni_mtx lk;
-} tls_ep;
-
-void
-nng_tls_close(nng_tls *tls)
-{
- nni_tls_close(tls->c);
-}
+typedef struct {
+ nng_stream_dialer ops;
+ nng_stream_dialer *d; // underlying TCP dialer
+ nng_tls_config * cfg;
+ nni_mtx lk; // protects the config
+} tls_dialer;
-void
-nng_tls_free(nng_tls *tls)
-{
- if (tls != NULL) {
- nni_tls_fini(tls->c);
- nni_aio_fini(tls->aio);
- nng_tls_config_free(tls->cfg);
- NNI_FREE_STRUCT(tls);
- }
-}
-
-void
-nng_tls_send(nng_tls *tls, nng_aio *aio)
-{
- nni_tls_send(tls->c, aio);
-}
-
-void
-nng_tls_recv(nng_tls *tls, nng_aio *aio)
-{
- nni_tls_recv(tls->c, aio);
-}
-
-int
-nni_tls_get(nng_tls *tls, const char *name, void *buf, size_t *szp, nni_type t)
-{
- return (nni_tls_getopt(tls->c, name, buf, szp, t));
-}
-
-int
-nni_tls_set(
- nng_tls *tls, const char *name, const void *buf, size_t sz, nni_type t)
-{
- return (nni_tls_setopt(tls->c, name, buf, sz, t));
-}
-
-int
-nng_tls_getopt(nng_tls *tls, const char *name, void *buf, size_t *szp)
-{
- return (nni_tls_getopt(tls->c, name, buf, szp, NNI_TYPE_OPAQUE));
-}
-
-int
-nng_tls_setopt(nng_tls *tls, const char *name, const void *buf, size_t sz)
-{
- return (nni_tls_setopt(tls->c, name, buf, sz, NNI_TYPE_OPAQUE));
-}
-
-int
-nng_tls_dialer_alloc(nng_tls_dialer **dp)
-{
- tls_ep *ep;
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&ep->lk);
-
- if ((rv = nni_tcp_dialer_init(&ep->tcp.d)) != 0) {
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
- if ((rv = nng_tls_config_alloc(&ep->cfg, NNG_TLS_MODE_CLIENT)) != 0) {
- nni_tcp_dialer_fini(ep->tcp.d);
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
- *dp = (void *) ep;
- return (rv);
-}
-
-void
-nng_tls_dialer_close(nng_tls_dialer *d)
+static void
+tls_dialer_close(void *arg)
{
- tls_ep *ep = (void *) d;
- nni_tcp_dialer_close(ep->tcp.d);
+ tls_dialer *d = arg;
+ nng_stream_dialer_close(d->d);
}
-void
-nng_tls_dialer_free(nng_tls_dialer *d)
+static void
+tls_dialer_free(void *arg)
{
- tls_ep *ep = (void *) d;
- if (ep != NULL) {
- nni_tcp_dialer_fini(ep->tcp.d);
- nng_tls_config_free(ep->cfg);
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
+ tls_dialer *d;
+ if ((d = arg) != NULL) {
+ nng_stream_dialer_free(d->d);
+ nng_tls_config_free(d->cfg);
+ nni_mtx_fini(&d->lk);
+ NNI_FREE_STRUCT(d);
}
}
@@ -154,28 +57,28 @@ nng_tls_dialer_free(nng_tls_dialer *d)
static void
tls_conn_cb(void *arg)
{
- nng_tls * tls = arg;
- nni_tcp_conn *tcp;
- int rv;
+ nng_stream * tls = arg;
+ nni_tls_common *com = arg;
+ nng_stream * tcp;
+ int rv;
- if ((rv = nni_aio_result(tls->aio)) != 0) {
- nni_aio_finish_error(tls->uaio, rv);
- nng_tls_free(tls);
+ if ((rv = nni_aio_result(com->aio)) != 0) {
+ nni_aio_finish_error(com->uaio, rv);
+ nng_stream_free(tls);
return;
}
- tcp = nni_aio_get_output(tls->aio, 0);
+ tcp = nni_aio_get_output(com->aio, 0);
- rv = nni_tls_init(&tls->c, tls->cfg, tcp);
- if (rv != 0) {
- nni_aio_finish_error(tls->uaio, rv);
- nni_tcp_conn_fini(tcp);
- nng_tls_free(tls);
+ if ((rv = nni_tls_start(tls, tcp)) != 0) {
+ nni_aio_finish_error(com->uaio, rv);
+ nng_stream_free(tcp);
+ nng_stream_free(tls);
return;
}
- nni_aio_set_output(tls->uaio, 0, tls);
- nni_aio_finish(tls->uaio, 0, 0);
+ nni_aio_set_output(com->uaio, 0, tls);
+ nni_aio_finish(com->uaio, 0, 0);
}
// Dialer cancel is called when the user has indicated that they no longer
@@ -183,49 +86,52 @@ tls_conn_cb(void *arg)
static void
tls_conn_cancel(nni_aio *aio, void *arg, int rv)
{
- nng_tls *tls = arg;
- NNI_ASSERT(tls->uaio == aio);
+ nni_tls_common *com = arg;
+ NNI_ASSERT(com->uaio == aio);
// Just pass this down. If the connection is already done, this
// will have no effect.
- nni_aio_abort(tls->aio, rv);
+ nni_aio_abort(com->aio, rv);
}
-void
-nng_tls_dialer_dial(nng_tls_dialer *d, const nng_sockaddr *sa, nng_aio *aio)
+static void
+tls_dialer_dial(void *arg, nng_aio *aio)
{
- int rv;
- nng_tls *tls;
- tls_ep * ep = (void *) d;
+ tls_dialer * d = arg;
+ int rv;
+ nng_stream * tls;
+ nni_tls_common *com;
if (nni_aio_begin(aio) != 0) {
return;
}
- if ((tls = NNI_ALLOC_STRUCT(tls)) == NULL) {
- nni_aio_finish_error(aio, NNG_ENOMEM);
+ if ((rv = nni_tls_alloc(&tls)) != 0) {
+ nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_init(&tls->aio, tls_conn_cb, tls)) != 0) {
+
+ com = (void *) tls;
+ if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) {
nni_aio_finish_error(aio, rv);
- NNI_FREE_STRUCT(tls);
+ nng_stream_free(tls);
return;
}
- tls->uaio = aio;
+ com->uaio = aio;
// Save a copy of the TLS configuration. This way we don't have
// to ensure that the dialer outlives the connection, because the
// only shared data is the configuration which is reference counted.
- nni_mtx_lock(&ep->lk);
- tls->cfg = ep->cfg;
- nng_tls_config_hold(tls->cfg);
- nni_mtx_unlock(&ep->lk);
+ nni_mtx_lock(&d->lk);
+ com->cfg = d->cfg;
+ nng_tls_config_hold(com->cfg);
+ nni_mtx_unlock(&d->lk);
if ((rv = nni_aio_schedule(aio, tls_conn_cancel, tls)) != 0) {
nni_aio_finish_error(aio, rv);
- nng_tls_free(tls);
+ nng_stream_free(tls);
return;
}
- nni_tcp_dialer_dial(ep->tcp.d, sa, tls->aio);
+ nng_stream_dialer_dial(d->d, com->aio);
}
static int
@@ -241,11 +147,12 @@ tls_check_string(const void *v, size_t sz, nni_opt_type t)
}
static int
-tls_ep_set_config(void *arg, const void *buf, size_t sz, nni_type t)
+tls_dialer_set_config(void *arg, const void *buf, size_t sz, nni_type t)
{
int rv;
nng_tls_config *cfg;
- tls_ep * ep;
+ tls_dialer * d = arg;
+ nng_tls_config *old;
if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) {
return (rv);
@@ -253,308 +160,523 @@ tls_ep_set_config(void *arg, const void *buf, size_t sz, nni_type t)
if (cfg == NULL) {
return (NNG_EINVAL);
}
- if ((ep = arg) != NULL) {
- nng_tls_config *old;
-
- nni_mtx_lock(&ep->lk);
- old = ep->cfg;
- nng_tls_config_hold(cfg);
- ep->cfg = cfg;
- nni_mtx_unlock(&ep->lk);
- if (old != NULL) {
- nng_tls_config_free(old);
- }
+ nni_mtx_lock(&d->lk);
+ old = d->cfg;
+ nng_tls_config_hold(cfg);
+ d->cfg = cfg;
+ nni_mtx_unlock(&d->lk);
+ if (old != NULL) {
+ nng_tls_config_free(old);
}
return (0);
}
static int
-tls_ep_get_config(void *arg, void *buf, size_t *szp, nni_type t)
+tls_dialer_get_config(void *arg, void *buf, size_t *szp, nni_type t)
{
- tls_ep * ep = arg;
+ tls_dialer * d = arg;
nng_tls_config *cfg;
int rv;
- nni_mtx_lock(&ep->lk);
- if ((cfg = ep->cfg) != NULL) {
+ nni_mtx_lock(&d->lk);
+ if ((cfg = d->cfg) != NULL) {
nng_tls_config_hold(cfg);
}
if ((rv = nni_copyout_ptr(cfg, buf, szp, t)) != 0) {
nng_tls_config_free(cfg);
}
- nni_mtx_unlock(&ep->lk);
+ nni_mtx_unlock(&d->lk);
return (rv);
}
static int
-tls_ep_set_server_name(void *arg, const void *buf, size_t sz, nni_type t)
+tls_dialer_set_server_name(void *arg, const void *buf, size_t sz, nni_type t)
{
- tls_ep *ep = arg;
- int rv;
- if ((rv = tls_check_string(buf, sz, t)) != 0) {
- return (rv);
- }
- if ((ep = arg) != NULL) {
- nni_mtx_lock(&ep->lk);
- rv = nng_tls_config_server_name(ep->cfg, buf);
- nni_mtx_unlock(&ep->lk);
+ tls_dialer *d = arg;
+ int rv;
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->lk);
+ rv = nng_tls_config_server_name(d->cfg, buf);
+ nni_mtx_unlock(&d->lk);
}
return (rv);
}
static int
-tls_ep_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t)
+tls_dialer_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t)
{
- int mode;
- int rv;
- tls_ep *ep;
+ int mode;
+ int rv;
+ tls_dialer *d = arg;
rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE,
NNG_TLS_AUTH_MODE_REQUIRED, t);
- if ((rv == 0) && ((ep = arg) != NULL)) {
- nni_mtx_lock(&ep->lk);
- rv = nng_tls_config_auth_mode(ep->cfg, mode);
- nni_mtx_unlock(&ep->lk);
+ if (rv == 0) {
+ nni_mtx_lock(&d->lk);
+ rv = nng_tls_config_auth_mode(d->cfg, mode);
+ nni_mtx_unlock(&d->lk);
}
return (rv);
}
static int
-tls_ep_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t)
+tls_dialer_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t)
{
- tls_ep *ep;
- int rv;
+ tls_dialer *d = arg;
+ int rv;
- if (((rv = tls_check_string(buf, sz, t)) == 0) &&
- ((ep = arg) != NULL)) {
- nni_mtx_lock(&ep->lk);
- rv = nng_tls_config_ca_file(ep->cfg, buf);
- nni_mtx_unlock(&ep->lk);
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->lk);
+ rv = nng_tls_config_ca_file(d->cfg, buf);
+ nni_mtx_unlock(&d->lk);
}
return (rv);
}
static int
-tls_ep_set_cert_key_file(void *arg, const void *buf, size_t sz, nni_opt_type t)
+tls_dialer_set_cert_key_file(
+ void *arg, const void *buf, size_t sz, nni_opt_type t)
{
- tls_ep *ep;
- int rv;
+ tls_dialer *d = arg;
+ int rv;
- if (((rv = tls_check_string(buf, sz, t)) == 0) &&
- ((ep = arg) != NULL)) {
- nni_mtx_lock(&ep->lk);
- rv = nng_tls_config_cert_key_file(ep->cfg, buf, NULL);
- nni_mtx_unlock(&ep->lk);
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->lk);
+ rv = nng_tls_config_cert_key_file(d->cfg, buf, NULL);
+ nni_mtx_unlock(&d->lk);
}
return (rv);
}
-static const nni_option tls_ep_opts[] = {
+static const nni_option tls_dialer_opts[] = {
{
.o_name = NNG_OPT_TLS_CONFIG,
- .o_get = tls_ep_get_config,
- .o_set = tls_ep_set_config,
+ .o_get = tls_dialer_get_config,
+ .o_set = tls_dialer_set_config,
},
{
.o_name = NNG_OPT_TLS_SERVER_NAME,
- .o_set = tls_ep_set_server_name,
+ .o_set = tls_dialer_set_server_name,
},
{
.o_name = NNG_OPT_TLS_CA_FILE,
- .o_set = tls_ep_set_ca_file,
+ .o_set = tls_dialer_set_ca_file,
},
{
.o_name = NNG_OPT_TLS_CERT_KEY_FILE,
- .o_set = tls_ep_set_cert_key_file,
+ .o_set = tls_dialer_set_cert_key_file,
},
{
.o_name = NNG_OPT_TLS_AUTH_MODE,
- .o_set = tls_ep_set_auth_mode,
+ .o_set = tls_dialer_set_auth_mode,
},
{
.o_name = NULL,
},
};
-// private version of getopt and setopt take the type
-int
-nni_tls_dialer_getopt(
- nng_tls_dialer *d, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tls_dialer_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- int rv;
- tls_ep *ep = (void *) d;
+ tls_dialer *d = arg;
+ int rv;
- rv = nni_tcp_dialer_getopt(ep->tcp.d, name, buf, szp, t);
+ rv = nni_stream_dialer_getx(d->d, name, buf, szp, t);
if (rv == NNG_ENOTSUP) {
- rv = nni_getopt(tls_ep_opts, name, ep, buf, szp, t);
+ rv = nni_getopt(tls_dialer_opts, name, d, buf, szp, t);
}
return (rv);
}
-int
-nni_tls_dialer_setopt(nng_tls_dialer *d, const char *name, const void *buf,
- size_t sz, nni_type t)
+static int
+tls_dialer_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- int rv;
- tls_ep *ep = (void *) d;
+ tls_dialer *d = arg;
+ int rv;
- rv = nni_tcp_dialer_setopt(
- ep != NULL ? ep->tcp.d : NULL, name, buf, sz, t);
+ rv = nni_stream_dialer_setx(d->d, name, buf, sz, t);
if (rv == NNG_ENOTSUP) {
- rv = nni_setopt(tls_ep_opts, name, ep, buf, sz, t);
+ rv = nni_setopt(tls_dialer_opts, name, d, buf, sz, t);
}
return (rv);
}
-// public versions of option handlers here
-
-int
-nng_tls_dialer_getopt(
- nng_tls_dialer *d, const char *name, void *buf, size_t *szp)
-{
- return (nni_tls_dialer_getopt(d, name, buf, szp, NNI_TYPE_OPAQUE));
-}
-
int
-nng_tls_dialer_setopt(
- nng_tls_dialer *d, const char *name, const void *buf, size_t sz)
+nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
{
- return (nni_tls_dialer_setopt(d, name, buf, sz, NNI_TYPE_OPAQUE));
-}
+ tls_dialer *d;
+ int rv;
+ nng_url myurl;
-void
-nng_tls_listener_close(nng_tls_listener *l)
-{
- tls_ep *ep = (void *) l;
- nni_tcp_listener_close(ep->tcp.l);
-}
-
-void
-nng_tls_listener_free(nng_tls_listener *l)
-{
- tls_ep *ep = (void *) l;
- if (ep != NULL) {
- nng_tls_listener_close(l);
- nng_tls_config_free(ep->cfg);
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
- }
-}
-
-int
-nng_tls_listener_alloc(nng_tls_listener **lp)
-{
- tls_ep *ep;
- int rv;
+ memcpy(&myurl, url, sizeof(myurl));
+ myurl.u_scheme = url->u_scheme + strlen("tls+");
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&ep->lk);
+ nni_mtx_init(&d->lk);
- if ((rv = nni_tcp_listener_init(&ep->tcp.l)) != 0) {
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
+ if ((rv = nng_stream_dialer_alloc_url(&d->d, &myurl)) != 0) {
+ nni_mtx_fini(&d->lk);
+ NNI_FREE_STRUCT(d);
return (rv);
}
- if ((rv = nng_tls_config_alloc(&ep->cfg, NNG_TLS_MODE_SERVER)) != 0) {
- nni_tcp_listener_fini(ep->tcp.l);
- nni_mtx_fini(&ep->lk);
- NNI_FREE_STRUCT(ep);
+ if ((rv = nng_tls_config_alloc(&d->cfg, NNG_TLS_MODE_CLIENT)) != 0) {
+ nng_stream_dialer_free(d->d);
+ nni_mtx_fini(&d->lk);
+ NNI_FREE_STRUCT(d);
return (rv);
}
- *lp = (void *) ep;
- return (0);
+
+ // Set the expected outbound hostname
+ nng_tls_config_server_name(d->cfg, url->u_hostname);
+
+ d->ops.sd_close = tls_dialer_close;
+ d->ops.sd_free = tls_dialer_free;
+ d->ops.sd_dial = tls_dialer_dial;
+ d->ops.sd_getx = tls_dialer_getx;
+ d->ops.sd_setx = tls_dialer_setx;
+ *dp = (void *) d;
+ return (rv);
}
-int
-nng_tls_listener_listen(nng_tls_listener *l, const nng_sockaddr *sa)
+typedef struct {
+ nng_stream_listener ops;
+ nng_stream_listener *l;
+ nng_tls_config * cfg;
+ nni_mtx lk;
+} tls_listener;
+
+static void
+tls_listener_close(void *arg)
{
- tls_ep *ep = (void *) l;
- return (nni_tcp_listener_listen(ep->tcp.l, sa));
+ tls_listener *l = arg;
+ nng_stream_listener_close(l->l);
}
-void
-nng_tls_listener_accept(nng_tls_listener *l, nng_aio *aio)
+static void
+tls_listener_free(void *arg)
{
- int rv;
- nng_tls *tls;
- tls_ep * ep = (void *) l;
+ tls_listener *l;
+ if ((l = arg) != NULL) {
+ tls_listener_close(l);
+ nng_tls_config_free(l->cfg);
+ nni_mtx_fini(&l->lk);
+ NNI_FREE_STRUCT(l);
+ }
+}
+
+static int
+tls_listener_listen(void *arg)
+{
+ tls_listener *l = arg;
+ return (nng_stream_listener_listen(l->l));
+}
+
+static void
+tls_listener_accept(void *arg, nng_aio *aio)
+{
+ tls_listener * l = arg;
+ int rv;
+ nng_stream * tls;
+ nni_tls_common *com;
if (nni_aio_begin(aio) != 0) {
return;
}
- if ((tls = NNI_ALLOC_STRUCT(tls)) == NULL) {
- nni_aio_finish_error(aio, NNG_ENOMEM);
+ if ((rv = nni_tls_alloc(&tls)) != 0) {
+ nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_init(&tls->aio, tls_conn_cb, tls)) != 0) {
+ com = (void *) tls;
+ if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) {
nni_aio_finish_error(aio, rv);
- NNI_FREE_STRUCT(tls);
+ nng_stream_free(tls);
return;
}
- tls->uaio = aio;
+ com->uaio = aio;
// Save a copy of the TLS configuration. This way we don't have
// to ensure that the dialer outlives the connection, because the
// only shared data is the configuration which is reference counted.
- nni_mtx_lock(&ep->lk);
- tls->cfg = ep->cfg;
- nng_tls_config_hold(tls->cfg);
- nni_mtx_unlock(&ep->lk);
+ nni_mtx_lock(&l->lk);
+ com->cfg = l->cfg;
+ nng_tls_config_hold(com->cfg);
+ nni_mtx_unlock(&l->lk);
if ((rv = nni_aio_schedule(aio, tls_conn_cancel, tls)) != 0) {
nni_aio_finish_error(aio, rv);
- nng_tls_free(tls);
+ nng_stream_free(tls);
return;
}
- nni_tcp_listener_accept(ep->tcp.l, tls->aio);
+ nng_stream_listener_accept(l->l, com->aio);
}
-int
-nni_tls_listener_getopt(
- nng_tls_listener *l, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tls_listener_set_config(void *arg, const void *buf, size_t sz, nni_type t)
{
- int rv;
- tls_ep *ep = (void *) l;
+ int rv;
+ nng_tls_config *cfg;
+ tls_listener * l = arg;
+ nng_tls_config *old;
- rv = nni_tcp_listener_getopt(ep->tcp.l, name, buf, szp, t);
- if (rv == NNG_ENOTSUP) {
- rv = nni_getopt(tls_ep_opts, name, ep, buf, szp, t);
+ if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) {
+ return (rv);
+ }
+ if (cfg == NULL) {
+ return (NNG_EINVAL);
+ }
+
+ nni_mtx_lock(&l->lk);
+ old = l->cfg;
+ nng_tls_config_hold(cfg);
+ l->cfg = cfg;
+ nni_mtx_unlock(&l->lk);
+ if (old != NULL) {
+ nng_tls_config_free(old);
+ }
+ return (0);
+}
+
+static int
+tls_listener_get_config(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ tls_listener * l = arg;
+ nng_tls_config *cfg;
+ int rv;
+ nni_mtx_lock(&l->lk);
+ if ((cfg = l->cfg) != NULL) {
+ nng_tls_config_hold(cfg);
+ }
+ if ((rv = nni_copyout_ptr(cfg, buf, szp, t)) != 0) {
+ nng_tls_config_free(cfg);
}
+ nni_mtx_unlock(&l->lk);
return (rv);
}
-int
-nni_tls_listener_setopt(nng_tls_listener *l, const char *name, const void *buf,
- size_t sz, nni_type t)
+static int
+tls_listener_set_server_name(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ tls_listener *l = arg;
+ int rv;
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->lk);
+ rv = nng_tls_config_server_name(l->cfg, buf);
+ nni_mtx_unlock(&l->lk);
+ }
+ return (rv);
+}
+
+static int
+tls_listener_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t)
{
- int rv;
- tls_ep *ep = (void *) l;
+ int mode;
+ int rv;
+ tls_listener *l = arg;
- rv = nni_tcp_listener_setopt(
- ep != NULL ? ep->tcp.l : NULL, name, buf, sz, t);
+ rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE,
+ NNG_TLS_AUTH_MODE_REQUIRED, t);
+ if (rv == 0) {
+ nni_mtx_lock(&l->lk);
+ rv = nng_tls_config_auth_mode(l->cfg, mode);
+ nni_mtx_unlock(&l->lk);
+ }
+ return (rv);
+}
+
+static int
+tls_listener_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ tls_listener *l = arg;
+ int rv;
+
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->lk);
+ rv = nng_tls_config_ca_file(l->cfg, buf);
+ nni_mtx_unlock(&l->lk);
+ }
+ return (rv);
+}
+
+static int
+tls_listener_set_cert_key_file(
+ void *arg, const void *buf, size_t sz, nni_opt_type t)
+{
+ tls_listener *l = arg;
+ int rv;
+
+ if ((rv = tls_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->lk);
+ rv = nng_tls_config_cert_key_file(l->cfg, buf, NULL);
+ nni_mtx_unlock(&l->lk);
+ }
+ return (rv);
+}
+
+static const nni_option tls_listener_opts[] = {
+ {
+ .o_name = NNG_OPT_TLS_CONFIG,
+ .o_get = tls_listener_get_config,
+ .o_set = tls_listener_set_config,
+ },
+ {
+ .o_name = NNG_OPT_TLS_SERVER_NAME,
+ .o_set = tls_listener_set_server_name,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CA_FILE,
+ .o_set = tls_listener_set_ca_file,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CERT_KEY_FILE,
+ .o_set = tls_listener_set_cert_key_file,
+ },
+ {
+ .o_name = NNG_OPT_TLS_AUTH_MODE,
+ .o_set = tls_listener_set_auth_mode,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+tls_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ int rv;
+ tls_listener *l = arg;
+
+ rv = nni_stream_listener_getx(l->l, name, buf, szp, t);
if (rv == NNG_ENOTSUP) {
- rv = nni_setopt(tls_ep_opts, name, ep, buf, sz, t);
+ rv = nni_getopt(tls_listener_opts, name, l, buf, szp, t);
}
return (rv);
}
-// public versions of option handlers here
+static int
+tls_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ tls_listener *l = arg;
+
+ rv = nni_stream_listener_setx(l->l, name, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_setopt(tls_listener_opts, name, l, buf, sz, t);
+ }
+ return (rv);
+}
int
-nng_tls_listener_getopt(
- nng_tls_listener *l, const char *name, void *buf, size_t *szp)
+nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url)
{
- return (nni_tls_listener_getopt(l, name, buf, szp, NNI_TYPE_OPAQUE));
+ tls_listener *l;
+ int rv;
+ nng_url myurl;
+
+ memcpy(&myurl, url, sizeof(myurl));
+ myurl.u_scheme = url->u_scheme + strlen("tls+");
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->lk);
+
+ if ((rv = nng_stream_listener_alloc_url(&l->l, &myurl)) != 0) {
+ nni_mtx_fini(&l->lk);
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ if ((rv = nng_tls_config_alloc(&l->cfg, NNG_TLS_MODE_SERVER)) != 0) {
+ nng_stream_listener_free(l->l);
+ nni_mtx_fini(&l->lk);
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->ops.sl_free = tls_listener_free;
+ l->ops.sl_close = tls_listener_close;
+ l->ops.sl_accept = tls_listener_accept;
+ l->ops.sl_listen = tls_listener_listen;
+ l->ops.sl_getx = tls_listener_getx;
+ l->ops.sl_setx = tls_listener_setx;
+ *lp = (void *) l;
+ return (0);
}
+// The following checks exist for socket configuration, when we need to
+// configure an option on a socket before any transport is configured
+// underneath.
+
+static int
+tls_check_config(const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ nng_tls_config *cfg;
+
+ if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) {
+ return (rv);
+ }
+ if (cfg == NULL) {
+ return (NNG_EINVAL);
+ }
+ return (0);
+}
+
+static int
+tls_check_auth_mode(const void *buf, size_t sz, nni_type t)
+{
+ int mode;
+ int rv;
+
+ rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE,
+ NNG_TLS_AUTH_MODE_REQUIRED, t);
+ return (rv);
+}
+
+static const nni_chkoption tls_chkopts[] = {
+ {
+ .o_name = NNG_OPT_TLS_CONFIG,
+ .o_check = tls_check_config,
+ },
+ {
+ .o_name = NNG_OPT_TLS_SERVER_NAME,
+ .o_check = tls_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CA_FILE,
+ .o_check = tls_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_CERT_KEY_FILE,
+ .o_check = tls_check_string,
+ },
+ {
+ .o_name = NNG_OPT_TLS_AUTH_MODE,
+ .o_check = tls_check_auth_mode,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
int
-nng_tls_listener_setopt(
- nng_tls_listener *l, const char *name, const void *buf, size_t sz)
+nni_tls_checkopt(const char *name, const void *data, size_t sz, nni_type t)
{
- return (nni_tls_listener_setopt(l, name, buf, sz, NNI_TYPE_OPAQUE));
+ int rv;
+
+ rv = nni_chkopt(tls_chkopts, name, data, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_stream_checkopt("tcp", name, data, sz, t);
+ }
+ return (rv);
}
diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt
index 22ee955d..3b102c80 100644
--- a/src/supplemental/websocket/CMakeLists.txt
+++ b/src/supplemental/websocket/CMakeLists.txt
@@ -12,5 +12,7 @@ if (NNG_SUPP_WEBSOCKET)
set(_SRCS
supplemental/websocket/websocket.c
supplemental/websocket/websocket.h)
+else()
+ set(_SRCS supplemental/websocket/stubs.c)
endif()
set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE)
diff --git a/src/supplemental/websocket/stub.c b/src/supplemental/websocket/stub.c
new file mode 100644
index 00000000..94e7f1b5
--- /dev/null
+++ b/src/supplemental/websocket/stub.c
@@ -0,0 +1,40 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Devolutions <info@devolutions.net>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+// This stub file exists to support configuration of the stream subsystem
+// when websocket support is unconfigured.
+
+int
+nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
+{
+ NNI_ARG_UNUSED(dp);
+ NNI_ARG_UNUSED(url);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ws_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ NNI_ARG_UNUSED(lp);
+ NNI_ARG_UNUSED(url);
+ return (NNG_ENOTSUP);
+}
+
+int
+nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ NNI_ARG_UNUSED(name);
+ NNI_ARG_UNUSED(data);
+ NNI_ARG_UNUSED(sz);
+ NNI_ARG_UNUSED(t);
+ return (NNG_ENOTSUP);
+}
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 3d3a68cb..4cecf430 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -18,11 +18,25 @@
#include "supplemental/http/http_api.h"
#include "supplemental/sha1/sha1.h"
+#include <nng/transport/ws/websocket.h>
+
#include "websocket.h"
+// This should be removed or handled differently in the future.
+typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
+
+// We have chosen to be a bit more stringent in the size of the frames that
+// we send, while we more generously allow larger incoming frames. These
+// may be tuned by options.
+#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
+#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv)
+#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send)
+
+// Alias for checking the prefix of a string.
+#define startswith(s, t) (strncmp(s, t, strlen(t)) == 0)
+
// Pre-defined types for some prototypes. These are from other subsystems.
typedef struct ws_frame ws_frame;
-typedef struct ws_msg ws_msg;
typedef struct ws_header {
nni_list_node node;
@@ -31,17 +45,20 @@ typedef struct ws_header {
} ws_header;
struct nni_ws {
+ nng_stream ops;
nni_list_node node;
nni_reap_item reap;
bool server;
bool closed;
bool ready;
bool wclose;
+ bool isstream;
+ bool inmsg;
nni_mtx mtx;
- nni_list txmsgs;
- nni_list rxmsgs;
nni_list sendq;
nni_list recvq;
+ nni_list txq;
+ nni_list rxq;
ws_frame * txframe;
ws_frame * rxframe;
nni_aio * txaio; // physical aios
@@ -57,26 +74,31 @@ struct nni_ws {
char * reshdrs;
size_t maxframe;
size_t fragsize;
+ size_t recvmax; // largest message size
nni_ws_listener *listener;
nni_ws_dialer * dialer;
};
struct nni_ws_listener {
- nni_http_server * server;
- char * proto;
- nni_mtx mtx;
- nni_cv cv;
- nni_list pend;
- nni_list reply;
- nni_list aios;
- nni_url * url;
- bool started;
- bool closed;
- nni_http_handler * handler;
- nni_ws_listen_hook hookfn;
- void * hookarg;
- nni_list headers; // response headers
- size_t maxframe;
+ nng_stream_listener ops;
+ nni_http_server * server;
+ char * proto;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_list pend;
+ nni_list reply;
+ nni_list aios;
+ nng_url * url;
+ bool started;
+ bool closed;
+ bool isstream;
+ nni_http_handler * handler;
+ nni_ws_listen_hook hookfn;
+ void * hookarg;
+ nni_list headers; // response headers
+ size_t maxframe;
+ size_t fragsize;
+ size_t recvmax; // largest message size
};
// The dialer tracks user aios in two lists. The first list is for aios
@@ -86,18 +108,21 @@ struct nni_ws_listener {
// completion of an earlier connection. (We don't want to establish
// requests when we already have connects negotiating.)
struct nni_ws_dialer {
- nni_http_req * req;
- nni_http_res * res;
- nni_http_client *client;
- nni_mtx mtx;
- nni_cv cv;
- char * proto;
- nni_url * url;
- nni_list wspend; // ws structures still negotiating
- bool closed;
- nng_sockaddr sa;
- nni_list headers; // request headers
- size_t maxframe;
+ nng_stream_dialer ops;
+ nni_http_req * req;
+ nni_http_res * res;
+ nni_http_client * client;
+ nni_mtx mtx;
+ nni_cv cv;
+ char * proto;
+ nng_url * url;
+ nni_list wspend; // ws structures still negotiating
+ bool closed;
+ bool isstream;
+ nni_list headers; // request headers
+ size_t maxframe;
+ size_t fragsize;
+ size_t recvmax;
};
typedef enum ws_type {
@@ -133,16 +158,7 @@ struct ws_frame {
bool masked;
size_t bufsz; // allocated size
uint8_t * buf;
- ws_msg * wmsg;
-};
-
-struct ws_msg {
- nni_list frames;
- nni_list_node node;
- nni_ws * ws;
- nni_aio * aio;
- uint8_t * buf;
- size_t bufsz;
+ nng_aio * aio;
};
static void ws_send_close(nni_ws *ws, uint16_t code);
@@ -150,6 +166,127 @@ static void ws_conn_cb(void *);
static void ws_close_cb(void *);
static void ws_read_cb(void *);
static void ws_write_cb(void *);
+static void ws_close_error(nni_ws *ws, uint16_t code);
+
+static void ws_str_free(void *);
+static void ws_str_close(void *);
+static void ws_str_send(void *, nng_aio *);
+static void ws_str_recv(void *, nng_aio *);
+static int ws_str_getx(void *, const char *, void *, size_t *, nni_type);
+static int ws_str_setx(void *, const char *, const void *, size_t, nni_type);
+
+static void ws_listener_close(void *);
+static void ws_listener_free(void *);
+
+static int
+ws_check_string(const void *v, size_t sz, nni_opt_type t)
+{
+ if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
+ return (NNG_EBADTYPE);
+ }
+ if (nni_strnlen(v, sz) >= sz) {
+ return (NNG_EINVAL);
+ }
+ return (0);
+}
+
+static int
+ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups)
+{
+ ws_header *hdr;
+ char * nv;
+
+ if ((nv = nni_strdup(v)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if (strip_dups) {
+ NNI_LIST_FOREACH (l, hdr) {
+ if (nni_strcasecmp(hdr->name, n) == 0) {
+ nni_strfree(hdr->value);
+ hdr->value = nv;
+ return (0);
+ }
+ }
+ }
+
+ if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) {
+ nni_strfree(nv);
+ return (NNG_ENOMEM);
+ }
+ if ((hdr->name = nni_strdup(n)) == NULL) {
+ nni_strfree(nv);
+ NNI_FREE_STRUCT(hdr);
+ return (NNG_ENOMEM);
+ }
+ hdr->value = nv;
+ nni_list_append(l, hdr);
+ return (0);
+}
+
+static int
+ws_set_header(nni_list *l, const char *n, const char *v)
+{
+ return (ws_set_header_ext(l, n, v, true));
+}
+
+static int
+ws_set_headers(nni_list *l, const char *str)
+{
+ char * dupstr;
+ size_t duplen;
+ char * n;
+ char * v;
+ char * nl;
+ int rv;
+
+ if ((dupstr = nni_strdup(str)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ duplen = strlen(dupstr) + 1; // so we can free it later
+
+ n = dupstr;
+ for (;;) {
+ if ((v = strchr(n, ':')) == NULL) {
+ // Note that this also means that if
+ // a bare word is present, we ignore it.
+ break;
+ }
+ *v = '\0';
+ v++;
+ while (*v == ' ') {
+ // Skip leading whitespace. Not strictly
+ // necessary, but still a good idea.
+ v++;
+ }
+ nl = v;
+ // Find the end of the line -- should be CRLF, but can
+ // also be unterminated or just LF if user
+ while ((*nl != '\0') && (*nl != '\r') && (*nl != '\n')) {
+ nl++;
+ }
+ while ((*nl == '\r') || (*nl == '\n')) {
+ *nl = '\0';
+ nl++;
+ }
+
+ // Note that this can lead to a partial failure. As this
+ // is most likely ENOMEM, don't worry too much about it.
+ // This method does *not* eliminate duplicates.
+ if ((rv = ws_set_header_ext(l, n, v, false)) != 0) {
+ goto done;
+ }
+
+ // Advance to the next name.
+ n = nl;
+ }
+
+ rv = 0;
+
+done:
+ nni_free(dupstr, duplen);
+ return (rv);
+}
// This looks, case independently for a word in a list, which is either
// space or comma separated.
@@ -204,30 +341,13 @@ ws_make_accept(const char *key, char *accept)
static void
ws_frame_fini(ws_frame *frame)
{
- if (frame->bufsz) {
+ if (frame->bufsz != 0) {
nni_free(frame->buf, frame->bufsz);
}
NNI_FREE_STRUCT(frame);
}
static void
-ws_msg_fini(ws_msg *wm)
-{
- ws_frame *frame;
-
- NNI_ASSERT(!nni_list_node_active(&wm->node));
- while ((frame = nni_list_first(&wm->frames)) != NULL) {
- nni_list_remove(&wm->frames, frame);
- ws_frame_fini(frame);
- }
- if (wm->bufsz != 0) {
- nni_free(wm->buf, wm->bufsz);
- }
-
- NNI_FREE_STRUCT(wm);
-}
-
-static void
ws_mask_frame(ws_frame *frame)
{
uint32_t r;
@@ -263,31 +383,19 @@ ws_unmask_frame(ws_frame *frame)
static int
ws_msg_init_control(
- ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
+ ws_frame **framep, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
{
- ws_msg * wm;
ws_frame *frame;
if (len > 125) {
return (NNG_EINVAL);
}
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
- }
- wm->buf = NULL;
- wm->bufsz = 0;
-
if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- ws_msg_fini(wm);
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
memcpy(frame->sdata, buf, len);
-
- nni_list_append(&wm->frames, frame);
- frame->wmsg = wm;
frame->len = len;
frame->final = true;
frame->op = op;
@@ -303,114 +411,100 @@ ws_msg_init_control(
ws_mask_frame(frame);
}
- wm->aio = NULL;
- wm->ws = ws;
- *wmp = wm;
+ *framep = frame;
return (0);
}
static int
-ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
+ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
{
- ws_msg * wm;
+ nng_aio *aio = frame->aio;
+ nni_iov *iov;
+ unsigned niov;
size_t len;
- size_t maxfrag = ws->fragsize; // make this tunable. (1MB default)
uint8_t *buf;
- uint8_t op;
-
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
- }
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
-
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
- wm->bufsz = len;
- if ((wm->buf = nni_alloc(len)) == NULL) {
- NNI_FREE_STRUCT(wm);
- return (NNG_ENOMEM);
- }
- buf = wm->buf;
- memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg));
- memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg),
- nni_msg_len(msg));
-
- op = WS_BINARY; // to start -- no support for sending TEXT frames
-
- // do ... while because we want at least one frame (even for empty
- // messages.) Headers get their own frame, if present. Best bet
- // is to try not to have a header when coming here.
- do {
- ws_frame *frame;
- if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- ws_msg_fini(wm);
+ // Figure out how much we need for the entire aio.
+ frame->len = 0;
+ nni_aio_get_iov(aio, &niov, &iov);
+ for (unsigned i = 0; i < niov; i++) {
+ frame->len += iov[i].iov_len;
+ }
+
+ if ((frame->len > ws->fragsize) && (ws->fragsize > 0)) {
+ // Limit it to a single frame per policy (fragsize), as needed.
+ frame->len = ws->fragsize;
+ // For stream mode, we constrain ourselves to one frame
+ // per message. Submitter may see a partial transmit, and
+ // should resubmit as needed. For message mode, we will
+ // continue to resubmit.
+ frame->final = ws->isstream ? true : false;
+ } else {
+ // It all fits in this frame (which might not be the first),
+ // so we're done.
+ frame->final = true;
+ }
+ // Potentially allocate space for the data if we need to.
+ // Note that an empty message is legal.
+ if ((frame->bufsz < frame->len) && (frame->len > 0)) {
+ frame->buf = nni_alloc(frame->len);
+ if (frame->buf == NULL) {
return (NNG_ENOMEM);
}
- nni_list_append(&wm->frames, frame);
- frame->wmsg = wm;
- frame->len = len > maxfrag ? maxfrag : len;
- frame->buf = buf;
- frame->op = op;
-
- buf += frame->len;
- len -= frame->len;
- op = WS_CONT;
-
- if (len == 0) {
- frame->final = true;
- }
- frame->head[0] = frame->op;
- frame->hlen = 2;
- if (frame->final) {
- frame->head[0] |= 0x80; // final frame bit
- }
- if (frame->len < 126) {
- frame->head[1] = frame->len & 0x7f;
- } else if (frame->len < 65536) {
- frame->head[1] = 126;
- NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
- frame->hlen += 2;
- } else {
- frame->head[1] = 127;
- NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
- frame->hlen += 8;
- }
+ }
+ buf = frame->buf;
- if (ws->server) {
- frame->masked = false;
- } else {
- ws_mask_frame(frame);
+ // Now copy the data into the frame.
+ len = frame->len;
+ while (len != 0) {
+ size_t n = len;
+ if (n > iov->iov_len) {
+ n = iov->iov_len;
}
+ memcpy(buf, iov->iov_buf, n);
+ iov++;
+ len -= n;
+ buf += n;
+ }
- } while (len);
-
- wm->aio = aio;
- wm->ws = ws;
- *wmp = wm;
- return (0);
-}
+ if (nni_aio_count(aio) == 0) {
+ // This is the first frame.
+ frame->op = WS_BINARY;
+ } else {
+ frame->op = WS_CONT;
+ }
-static int
-ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio)
-{
- ws_msg *wm;
+ // Populate the frame header.
+ frame->head[0] = frame->op;
+ frame->hlen = 2;
+ if (frame->final) {
+ frame->head[0] |= 0x80; // final frame bit
+ }
+ if (frame->len < 126) {
+ frame->head[1] = frame->len & 0x7f;
+ } else if (frame->len < 65536) {
+ frame->head[1] = 126;
+ NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
+ frame->hlen += 2;
+ } else {
+ frame->head[1] = 127;
+ NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
+ frame->hlen += 8;
+ }
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
+ // If we are on the client, then we need to mask the frame.
+ frame->masked = false;
+ if (!ws->server) {
+ ws_mask_frame(frame);
}
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
- wm->aio = aio;
- wm->ws = ws;
- *wmp = wm;
return (0);
}
static void
ws_close_cb(void *arg)
{
- nni_ws * ws = arg;
- ws_msg * wm;
- nni_aio *aio;
+ nni_ws * ws = arg;
+ ws_frame *frame;
nni_aio_close(ws->txaio);
nni_aio_close(ws->rxaio);
@@ -422,31 +516,13 @@ ws_close_cb(void *arg)
nni_http_conn_close(ws->http);
- // This list (receive) should be empty.
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
- }
-
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ if (frame->aio != NULL) {
+ nni_aio_list_remove(frame->aio);
+ nni_aio_finish_error(frame->aio, NNG_ECLOSED);
}
- ws_msg_fini(wm);
- }
- ws->txframe = NULL;
-
- if (ws->rxframe != NULL) {
- ws_frame_fini(ws->rxframe);
- ws->rxframe = NULL;
+ ws_frame_fini(frame);
}
// Any txframe should have been killed with its wmsg.
@@ -456,19 +532,13 @@ ws_close_cb(void *arg)
static void
ws_close(nni_ws *ws, uint16_t code)
{
- ws_msg *wm;
+ nng_aio *aio;
// Receive stuff gets aborted always. No further receives
// once we get a close.
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_aio *aio;
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((aio = nni_list_first(&ws->recvq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
// If were closing "gracefully", then don't abort in-flight
@@ -487,7 +557,6 @@ static void
ws_start_write(nni_ws *ws)
{
ws_frame *frame;
- ws_msg * wm;
nni_iov iov[2];
int niov;
@@ -495,13 +564,11 @@ ws_start_write(nni_ws *ws)
return; // busy
}
- if ((wm = nni_list_first(&ws->txmsgs)) == NULL) {
- // Nothing to send.
- return;
+ if ((frame = nni_list_first(&ws->txq)) == NULL) {
+ return; // nothing to send
}
-
- frame = nni_list_first(&wm->frames);
NNI_ASSERT(frame != NULL);
+ nni_list_remove(&ws->txq, frame);
// Push it out.
ws->txframe = frame;
@@ -534,7 +601,6 @@ ws_write_cb(void *arg)
{
nni_ws * ws = arg;
ws_frame *frame;
- ws_msg * wm;
nni_aio * aio;
int rv;
@@ -545,17 +611,20 @@ ws_write_cb(void *arg)
return;
}
ws->txframe = NULL;
+
if (frame->op == WS_CLOSE) {
// If this was a close frame, we are done.
// No other messages may succeed..
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
+ ws->txframe = NULL;
+ ws_frame_fini(frame);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ if ((aio = frame->aio) != NULL) {
+ frame->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
+ ws_frame_fini(frame);
}
- ws_msg_fini(wm);
}
if (ws->wclose) {
ws->wclose = false;
@@ -565,54 +634,55 @@ ws_write_cb(void *arg)
return;
}
- wm = frame->wmsg;
- aio = wm->aio;
-
+ aio = frame->aio;
if ((rv = nni_aio_result(ws->txaio)) != 0) {
-
- nni_list_remove(&ws->txmsgs, wm);
- ws_msg_fini(wm);
+ frame->aio = NULL;
if (aio != NULL) {
- wm->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
-
+ ws_frame_fini(frame);
ws->closed = true;
nni_http_conn_close(ws->http);
nni_mtx_unlock(&ws->mtx);
return;
}
- // good frame, was it the last
- nni_list_remove(&wm->frames, frame);
- ws_frame_fini(frame);
-
- // if we still have more frames to transmit, then schedule it.
- if (!nni_list_empty(&wm->frames)) {
- ws_start_write(ws);
- nni_mtx_unlock(&ws->mtx);
- return;
+ if (aio != NULL) {
+ nni_aio_iov_advance(aio, frame->len);
+ nni_aio_bump_count(aio, frame->len);
+ if (frame->final) {
+ frame->aio = NULL;
+ nni_aio_list_remove(aio);
+ } else {
+ // Clear the aio so that we won't attempt to finish
+ // it outside the lock
+ aio = NULL;
+ }
}
- if (aio != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
+ if (frame->final) {
+ ws_frame_fini(frame);
+ } else {
+ // This one cannot fail here, since we only do allocation
+ // at initial scheduling.
+ ws_frame_prep_tx(ws, frame);
+ // Schedule at end. This permits other frames to interleave.
+ nni_list_append(&ws->txq, frame);
}
- nni_list_remove(&ws->txmsgs, wm);
- // Write the next frame.
ws_start_write(ws);
nni_mtx_unlock(&ws->mtx);
- // discard while not holding lock (just deallocations)
- ws_msg_fini(wm);
-
+ // We attempt to finish the operation synchronously, outside the lock.
if (aio != NULL) {
- nng_msg *msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
- nni_msg_free(msg);
+ nni_msg *msg;
+ // Successful send, don't leak the message!
+ if ((msg = nni_aio_get_msg(aio)) != NULL) {
+ nni_aio_set_msg(aio, NULL);
+ nni_msg_free(msg);
+ }
+ nni_aio_finish_synch(aio, 0, nni_aio_count(aio));
}
}
@@ -620,7 +690,6 @@ static void
ws_write_cancel(nni_aio *aio, void *arg, int rv)
{
nni_ws * ws = arg;
- ws_msg * wm;
ws_frame *frame;
// Is this aio active? We can tell by looking at the active tx frame.
@@ -630,17 +699,17 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&ws->mtx);
return;
}
- wm = nni_aio_get_prov_extra(aio, 0);
- if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
+ frame = nni_aio_get_prov_extra(aio, 0);
+ if (frame == ws->txframe) {
nni_aio_abort(ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
- } else if (nni_list_active(&ws->txmsgs, wm)) {
+ } else {
// If scheduled, just need to remove node and complete it.
- nni_list_remove(&ws->txmsgs, wm);
- wm->aio = NULL;
+ nni_list_remove(&ws->txq, frame);
+ frame->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
+ ws_frame_fini(frame);
}
nni_mtx_unlock(&ws->mtx);
}
@@ -648,10 +717,10 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
static void
ws_send_close(nni_ws *ws, uint16_t code)
{
- ws_msg * wm;
- uint8_t buf[sizeof(uint16_t)];
- int rv;
- nni_aio *aio;
+ ws_frame *frame;
+ uint8_t buf[sizeof(uint16_t)];
+ int rv;
+ nni_aio * aio;
NNI_PUT16(buf, code);
@@ -665,125 +734,45 @@ ws_send_close(nni_ws *ws, uint16_t code)
return;
}
ws->wclose = true;
- rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf));
+ rv = ws_msg_init_control(&frame, ws, WS_CLOSE, buf, sizeof(buf));
if (rv != 0) {
ws->wclose = false;
nni_aio_finish_error(aio, rv);
return;
}
- // Close frames get priority!
if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) {
ws->wclose = false;
nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
return;
}
- nni_list_prepend(&ws->txmsgs, wm);
+ // This gets inserted at the head.
+ nni_list_prepend(&ws->txq, frame);
ws_start_write(ws);
}
static void
ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len)
{
- ws_msg *wm;
+ ws_frame *frame;
// Note that we do not care if this works or not. So no AIO needed.
if ((ws->closed) ||
- (ws_msg_init_control(&wm, ws, op, buf, len) != 0)) {
+ (ws_msg_init_control(&frame, ws, op, buf, len) != 0)) {
return;
}
// Control frames at head of list. (Note that this may preempt
// the close frame or other ping/pong requests. Oh well.)
- nni_list_prepend(&ws->txmsgs, wm);
+ nni_list_prepend(&ws->txq, frame);
ws_start_write(ws);
}
-static const nni_option ws_options[] = {
- {
- .o_name = NULL,
- },
-};
-
-int
-nni_ws_getopt(nni_ws *ws, const char *name, void *buf, size_t *szp, nni_type t)
-{
- int rv;
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_http_conn_getopt(ws->http, name, buf, szp, t);
- if (rv == NNG_ENOTSUP) {
- rv = nni_getopt(ws_options, name, ws, buf, szp, t);
- }
- nni_mtx_unlock(&ws->mtx);
- return (rv);
-}
-
-int
-nni_ws_setopt(
- nni_ws *ws, const char *name, const void *buf, size_t sz, nni_type t)
-{
- int rv;
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_http_conn_setopt(ws->http, name, buf, sz, t);
- if (rv == NNG_ENOTSUP) {
- rv = nni_setopt(ws_options, name, ws, buf, sz, t);
- }
- nni_mtx_unlock(&ws->mtx);
- return (rv);
-}
-
-void
-nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
-{
- ws_msg * wm;
- nni_msg *msg;
- int rv;
-
- msg = nni_aio_get_msg(aio);
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- ws_msg_fini(wm);
- return;
- }
- if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) {
- nni_mtx_unlock(&ws->mtx);
- nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
- return;
- }
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->sendq, aio);
- nni_list_append(&ws->txmsgs, wm);
- ws_start_write(ws);
- nni_mtx_unlock(&ws->mtx);
-}
-
static void
ws_start_read(nni_ws *ws)
{
ws_frame *frame;
- ws_msg * wm;
nni_aio * aio;
nni_iov iov;
@@ -791,18 +780,18 @@ ws_start_read(nni_ws *ws)
return; // already reading or closed
}
- if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) {
- return; // no body expecting a message.
+ // If nobody is waiting for recv, and we already have a data
+ // frame, stop reading. This keeps us from buffering infinitely.
+ if (nni_list_empty(&ws->recvq) && !nni_list_empty(&ws->rxq)) {
+ return;
}
if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
+ if ((aio = nni_list_first(&ws->recvq)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ENOMEM);
}
- ws_msg_fini(wm);
+ ws_close(ws, WS_CLOSE_INTERNAL);
return;
}
@@ -820,34 +809,143 @@ ws_start_read(nni_ws *ws)
}
static void
-ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop)
+ws_read_finish_str(nni_ws *ws)
{
- ws_msg *wm = nni_list_first(&ws->rxmsgs);
+ for (;;) {
+ nni_aio * aio;
+ nni_iov * iov;
+ unsigned niov;
+ ws_frame *frame;
- switch (frame->op) {
- case WS_CONT:
- if (wm == NULL) {
- ws_close(ws, WS_CLOSE_GOING_AWAY);
+ if ((aio = nni_list_first(&ws->recvq)) == NULL) {
return;
}
- if (nni_list_empty(&wm->frames)) {
+
+ if ((frame = nni_list_first(&ws->rxq)) == NULL) {
+ return;
+ }
+
+ // Discard 0 length frames -- in stream mode they are not used.
+ if (frame->len == 0) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
+ continue;
+ }
+
+ // We are completing this aio one way or the other.
+ nni_aio_list_remove(aio);
+ nni_aio_get_iov(aio, &niov, &iov);
+
+ while ((frame != NULL) && (niov != 0)) {
+ size_t n;
+
+ if ((n = frame->len) > iov->iov_len) {
+ // This eats the entire iov.
+ n = iov->iov_len;
+ }
+ iov->iov_buf = ((uint8_t *) iov->iov_buf) + n;
+ iov->iov_len -= n;
+ if (iov->iov_len == 0) {
+ iov++;
+ niov--;
+ }
+
+ if (frame->len == n) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
+ frame = nni_list_first(&ws->rxq);
+ } else {
+ frame->len -= n;
+ frame->buf += n;
+ }
+
+ nni_aio_bump_count(aio, n);
+ }
+
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+ }
+}
+
+static void
+ws_read_finish_msg(nni_ws *ws)
+{
+ nni_aio * aio;
+ size_t len;
+ ws_frame *frame;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+
+ // If we have no data, no waiter, or have not received the complete
+ // message yet, then there is nothing to do.
+ if (ws->inmsg || nni_list_empty(&ws->rxq) ||
+ ((aio = nni_list_first(&ws->recvq)) == NULL)) {
+ return;
+ }
+
+ // At this point, we have both a complete message in the queue (and
+ // there should not be any frames other than the for the message),
+ // and a waiting reader.
+ len = 0;
+ NNI_LIST_FOREACH (&ws->rxq, frame) {
+ len += frame->len;
+ }
+
+ nni_aio_list_remove(aio);
+
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ ws_close_error(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ while ((frame = nni_list_first(&ws->rxq)) != NULL) {
+ nni_list_remove(&ws->rxq, frame);
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ ws_frame_fini(frame);
+ }
+
+ nni_aio_set_msg(aio, msg);
+ nni_aio_bump_count(aio, nni_msg_len(msg));
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+ws_read_finish(nni_ws *ws)
+{
+ if (ws->isstream) {
+ ws_read_finish_str(ws);
+ } else {
+ ws_read_finish_msg(ws);
+ }
+}
+
+static void
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+{
+ switch (frame->op) {
+ case WS_CONT:
+ if (!ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
return;
}
+ if (frame->final) {
+ ws->inmsg = false;
+ }
ws->rxframe = NULL;
- nni_list_append(&wm->frames, frame);
+ nni_list_append(&ws->rxq, frame);
break;
case WS_BINARY:
- if (wm == NULL) {
- ws_close(ws, WS_CLOSE_GOING_AWAY);
- return;
- }
- if (!nni_list_empty(&wm->frames)) {
+ if (ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
return;
}
+ if (!frame->final) {
+ ws->inmsg = true;
+ }
ws->rxframe = NULL;
- nni_list_append(&wm->frames, frame);
+ nni_list_append(&ws->rxq, frame);
break;
case WS_TEXT:
// No support for text mode at present.
@@ -880,23 +978,13 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop)
return;
}
- // If this was the last (final) frame, then complete it. But
- // we have to look at the msg, since we might have got a
- // control frame.
- if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
- nni_list_remove(&ws->rxmsgs, wm);
- *wmp = wm;
- *aiop = wm->aio;
- nni_aio_list_remove(wm->aio);
- wm->aio = NULL;
- }
+ ws_read_finish(ws);
}
static void
ws_read_cb(void *arg)
{
- nni_ws * ws = arg;
- ws_msg * wm;
+ nni_ws * ws = arg;
nni_aio * aio = ws->rxaio;
ws_frame *frame;
int rv;
@@ -976,6 +1064,21 @@ ws_read_cb(void *arg)
nni_mtx_unlock(&ws->mtx);
return;
}
+ // For message mode, also check to make sure that the overall
+ // length of the message has not exceeded our recvmax.
+ // (Protect against an infinite stream of small messages!)
+ if ((!ws->isstream) && (ws->recvmax > 0)) {
+ size_t totlen = frame->len;
+ ws_frame *fr2;
+ NNI_LIST_FOREACH (&ws->rxq, fr2) {
+ totlen += fr2->len;
+ }
+ if (totlen > ws->recvmax) {
+ ws_close(ws, WS_CLOSE_TOO_BIG);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
// Check for masking. (We don't actually do the unmask
// here, because we don't have data yet.)
@@ -1023,134 +1126,40 @@ ws_read_cb(void *arg)
// At this point, we have a complete frame.
ws_unmask_frame(frame); // idempotent
- wm = NULL;
- aio = NULL;
- ws_read_frame_cb(ws, frame, &wm, &aio);
+ ws_read_frame_cb(ws, frame);
ws_start_read(ws);
nni_mtx_unlock(&ws->mtx);
-
- // Got a good message, so we have to do the work to send it up.
- if (wm != NULL) {
- size_t len = 0;
- nni_msg *msg;
- uint8_t *body;
- int rv;
-
- NNI_LIST_FOREACH (&wm->frames, frame) {
- len += frame->len;
- }
- if ((rv = nni_msg_alloc(&msg, len)) != 0) {
- nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
- nni_ws_close_error(ws, WS_CLOSE_INTERNAL);
- return;
- }
- body = nni_msg_body(msg);
- NNI_LIST_FOREACH (&wm->frames, frame) {
- memcpy(body, frame->buf, frame->len);
- body += frame->len;
- }
- nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
- ws_msg_fini(wm);
- }
}
static void
ws_read_cancel(nni_aio *aio, void *arg, int rv)
{
nni_ws *ws = arg;
- ws_msg *wm;
nni_mtx_lock(&ws->mtx);
- if (!nni_aio_list_active(aio)) {
- nni_mtx_unlock(&ws->mtx);
- return;
- }
- wm = nni_aio_get_prov_extra(aio, 0);
- if (wm == nni_list_first(&ws->rxmsgs)) {
- // Cancellation will percolate back up.
- nni_aio_abort(ws->rxaio, rv);
- } else if (nni_list_active(&ws->rxmsgs, wm)) {
- nni_list_remove(&ws->rxmsgs, wm);
- ws_msg_fini(wm);
+ if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
-{
- ws_msg *wm;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_mtx_lock(&ws->mtx);
- if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) {
- nni_mtx_unlock(&ws->mtx);
- ws_msg_fini(wm);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->recvq, aio);
- nni_list_append(&ws->rxmsgs, wm);
- ws_start_read(ws);
-
- nni_mtx_unlock(&ws->mtx);
-}
-
-void
-nni_ws_close_error(nni_ws *ws, uint16_t code)
+static void
+ws_close_error(nni_ws *ws, uint16_t code)
{
nni_mtx_lock(&ws->mtx);
ws_close(ws, code);
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_close(nni_ws *ws)
-{
- nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
-}
-
-const char *
-nni_ws_request_headers(nni_ws *ws)
-{
- nni_mtx_lock(&ws->mtx);
- if (ws->reqhdrs == NULL) {
- ws->reqhdrs = nni_http_req_headers(ws->req);
- }
- nni_mtx_unlock(&ws->mtx);
- return (ws->reqhdrs);
-}
-
-const char *
-nni_ws_response_headers(nni_ws *ws)
-{
- nni_mtx_lock(&ws->mtx);
- if (ws->reshdrs == NULL) {
- ws->reshdrs = nni_http_res_headers(ws->res);
- }
- nni_mtx_unlock(&ws->mtx);
- return (ws->reshdrs);
-}
-
static void
ws_fini(void *arg)
{
- nni_ws *ws = arg;
- ws_msg *wm;
+ nni_ws * ws = arg;
+ ws_frame *frame;
+ nng_aio * aio;
- nni_ws_close(ws);
+ ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
// Give a chance for the close frame to drain.
if (ws->closeaio) {
@@ -1175,25 +1184,29 @@ ws_fini(void *arg)
}
nni_mtx_lock(&ws->mtx);
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if (wm->aio) {
- nni_aio_finish_error(wm->aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((frame = nni_list_first(&ws->rxq)) != NULL) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
}
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if (wm->aio) {
- nni_aio_finish_error(wm->aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ ws_frame_fini(frame);
}
- if (ws->rxframe) {
+ if (ws->rxframe != NULL) {
ws_frame_fini(ws->rxframe);
}
+ if (ws->txframe != NULL) {
+ ws_frame_fini(ws->txframe);
+ }
+
+ while (((aio = nni_list_first(&ws->recvq)) != NULL) ||
+ ((aio = nni_list_first(&ws->sendq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
nni_mtx_unlock(&ws->mtx);
if (ws->http) {
@@ -1217,8 +1230,8 @@ ws_fini(void *arg)
NNI_FREE_STRUCT(ws);
}
-void
-nni_ws_fini(nni_ws *ws)
+static void
+ws_reap(nni_ws *ws)
{
nni_reap(&ws->reap, ws_fini, ws);
}
@@ -1232,8 +1245,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
nni_mtx_lock(&l->mtx);
nni_list_remove(&l->reply, ws);
if (nni_aio_result(aio) != 0) {
- nni_ws_fini(ws);
nni_mtx_unlock(&l->mtx);
+ ws_reap(ws);
return;
}
ws->ready = true;
@@ -1320,14 +1333,14 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
(!ws_contains_word(ptr, "upgrade")) ||
((ptr = GETH("Upgrade")) == NULL) ||
(strcmp(ptr, "websocket") != 0)) {
- nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
rv = NNG_EPROTO;
goto err;
}
if (d->proto != NULL) {
if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) ||
(!ws_contains_word(d->proto, ptr))) {
- nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
rv = NNG_EPROTO;
goto err;
}
@@ -1358,7 +1371,7 @@ err:
}
nni_mtx_unlock(&d->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
}
static void
@@ -1384,8 +1397,8 @@ ws_init(nni_ws **wsp)
return (NNG_ENOMEM);
}
nni_mtx_init(&ws->mtx);
- NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
- NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+ NNI_LIST_INIT(&ws->rxq, ws_frame, node);
+ NNI_LIST_INIT(&ws->txq, ws_frame, node);
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);
@@ -1401,17 +1414,25 @@ ws_init(nni_ws **wsp)
nni_aio_set_timeout(ws->closeaio, 100);
nni_aio_set_timeout(ws->httpaio, 2000);
+ ws->ops.s_close = ws_str_close;
+ ws->ops.s_free = ws_str_free;
+ ws->ops.s_send = ws_str_send;
+ ws->ops.s_recv = ws_str_recv;
+ ws->ops.s_getx = ws_str_getx;
+ ws->ops.s_setx = ws_str_setx;
+
ws->fragsize = 1 << 20; // we won't send a frame larger than this
*wsp = ws;
return (0);
}
-void
-nni_ws_listener_fini(nni_ws_listener *l)
+static void
+ws_listener_free(void *arg)
{
- ws_header *hdr;
+ nni_ws_listener *l = arg;
+ ws_header * hdr;
- nni_ws_listener_close(l);
+ ws_listener_close(l);
nni_mtx_lock(&l->mtx);
while (!nni_list_empty(&l->reply)) {
@@ -1437,7 +1458,7 @@ nni_ws_listener_fini(nni_ws_listener *l)
NNI_FREE_STRUCT(hdr);
}
if (l->url) {
- nni_url_free(l->url);
+ nng_url_free(l->url);
}
NNI_FREE_STRUCT(l);
}
@@ -1456,6 +1477,7 @@ ws_handler(nni_aio *aio)
uint16_t status;
int rv;
char key[29];
+ ws_header * hdr;
req = nni_aio_get_input(aio, 0);
h = nni_aio_get_input(aio, 1);
@@ -1542,6 +1564,20 @@ ws_handler(nni_aio *aio)
goto err;
}
+ // Set any user supplied headers. This is better than using a hook
+ // for most things, because it is loads easier.
+ NNI_LIST_FOREACH (&l->headers, hdr) {
+ if (SETH(hdr->name, hdr->value) != 0) {
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_free(res);
+ goto err;
+ }
+ }
+
+ // The hook function gives us the ability to intercept the HTTP
+ // response altogether. Its best not to do this unless you really
+ // need to, because it's much more complex. But if you want to set
+ // up an HTTP Authorization handler this might be the only choice.
if (l->hookfn != NULL) {
rv = l->hookfn(l->hookarg, req, res);
if (rv != 0) {
@@ -1583,8 +1619,9 @@ ws_handler(nni_aio *aio)
ws->res = res;
ws->server = true;
ws->maxframe = l->maxframe;
-
- // XXX: Inherit fragmentation? (Frag is limited for now).
+ ws->fragsize = l->fragsize;
+ ws->recvmax = l->recvmax;
+ ws->isstream = l->isstream;
nni_list_append(&l->reply, ws);
nni_aio_set_data(ws->httpaio, 0, l);
@@ -1603,71 +1640,6 @@ err:
}
}
-int
-nni_ws_listener_init(nni_ws_listener **wslp, nni_url *url)
-{
- nni_ws_listener *l;
- int rv;
- char * host;
-
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&l->mtx);
- nni_cv_init(&l->cv, &l->mtx);
- nni_aio_list_init(&l->aios);
-
- NNI_LIST_INIT(&l->pend, nni_ws, node);
- NNI_LIST_INIT(&l->reply, nni_ws, node);
-
- // make a private copy of the url structure.
- if ((rv = nni_url_clone(&l->url, url)) != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- host = l->url->u_hostname;
- if (strlen(host) == 0) {
- host = NULL;
- }
- rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler);
- if (rv != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) ||
- ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) ||
- ((rv = nni_http_server_init(&l->server, url)) != 0)) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- l->maxframe = 0;
- *wslp = l;
- return (0);
-}
-
-int
-nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
-{
- int rv = 0;
- char *ns;
- nni_mtx_lock(&l->mtx);
- if (l->started) {
- rv = NNG_EBUSY;
- } else if ((ns = nni_strdup(proto)) == NULL) {
- rv = NNG_ENOMEM;
- } else {
- if (l->proto != NULL) {
- nni_strfree(l->proto);
- }
- l->proto = ns;
- }
- nni_mtx_unlock(&l->mtx);
- return (rv);
-}
-
static void
ws_accept_cancel(nni_aio *aio, void *arg, int rv)
{
@@ -1681,11 +1653,12 @@ ws_accept_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
+static void
+ws_listener_accept(void *arg, nni_aio *aio)
{
- nni_ws *ws;
- int rv;
+ nni_ws_listener *l = arg;
+ nni_ws * ws;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1717,10 +1690,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ws_listener_close(nni_ws_listener *l)
+static void
+ws_listener_close(void *arg)
{
- nni_ws *ws;
+ nni_ws_listener *l = arg;
+ nni_ws * ws;
nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_mtx_unlock(&l->mtx);
@@ -1733,18 +1707,30 @@ nni_ws_listener_close(nni_ws_listener *l)
l->started = false;
}
NNI_LIST_FOREACH (&l->pend, ws) {
- nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ ws_close_error(ws, WS_CLOSE_GOING_AWAY);
}
NNI_LIST_FOREACH (&l->reply, ws) {
- nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ ws_close_error(ws, WS_CLOSE_GOING_AWAY);
}
nni_mtx_unlock(&l->mtx);
}
-int
-nni_ws_listener_listen(nni_ws_listener *l)
+// XXX: Consider replacing this with an option.
+void
+nni_ws_listener_hook(
+ nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
{
- int rv;
+ nni_mtx_lock(&l->mtx);
+ l->hookfn = hookfn;
+ l->hookarg = hookarg;
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ws_listener_listen(void *arg)
+{
+ nni_ws_listener *l = arg;
+ int rv;
nni_mtx_lock(&l->mtx);
if (l->closed) {
@@ -1777,42 +1763,290 @@ nni_ws_listener_listen(nni_ws_listener *l)
return (0);
}
-void
-nni_ws_listener_hook(
- nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
+static int
+ws_listener_set_size(
+ nni_ws_listener *l, size_t *valp, const void *buf, size_t sz, nni_type t)
{
- nni_mtx_lock(&l->mtx);
- l->hookfn = hookfn;
- l->hookarg = hookarg;
- nni_mtx_unlock(&l->mtx);
+ size_t val;
+ int rv;
+
+ // Max size is limited to 4 GB, but you really never want to have
+ // to have a larger value. If you think you need that, you're doing it
+ // wrong. You *can* set the size to 0 for unlimited.
+ if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ *valp = val;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
}
-int
-nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls)
+static int
+ws_listener_get_size(
+ nni_ws_listener *l, size_t *valp, void *buf, size_t *szp, nni_type t)
{
- int rv;
+ size_t val;
nni_mtx_lock(&l->mtx);
- rv = nni_http_server_set_tls(l->server, tls);
+ val = *valp;
nni_mtx_unlock(&l->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+ws_listener_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->maxframe, buf, sz, t));
+}
+
+static int
+ws_listener_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->maxframe, buf, szp, t));
+}
+
+static int
+ws_listener_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->fragsize, buf, sz, t));
+}
+
+static int
+ws_listener_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->fragsize, buf, szp, t));
+}
+
+static int
+ws_listener_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->recvmax, buf, sz, t));
+}
+
+static int
+ws_listener_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->recvmax, buf, szp, t));
+}
+
+static int
+ws_listener_set_res_headers(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ rv = ws_set_headers(&l->headers, buf);
+ nni_mtx_unlock(&l->mtx);
+ }
return (rv);
}
-int
-nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp)
+static int
+ws_listener_set_proto(void *arg, const void *buf, size_t sz, nni_type t)
{
- int rv;
+ nni_ws_listener *l = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ char *ns;
+ if ((ns = nni_strdup(buf)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ nni_mtx_lock(&l->mtx);
+ if (l->proto != NULL) {
+ nni_strfree(l->proto);
+ }
+ l->proto = ns;
+ nni_mtx_unlock(&l->mtx);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
nni_mtx_lock(&l->mtx);
- rv = nni_http_server_get_tls(l->server, tlsp);
+ rv = nni_copyout_str(l->proto != NULL ? l->proto : "", buf, szp, t);
nni_mtx_unlock(&l->mtx);
return (rv);
}
-void
-nni_ws_listener_set_maxframe(nni_ws_listener *l, size_t maxframe)
+static int
+ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->isstream = !b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
nni_mtx_lock(&l->mtx);
- l->maxframe = maxframe;
+ rv = nni_copyout_str(l->url->u_rawurl, buf, szp, t);
nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static const nni_option ws_listener_options[] = {
+ {
+ .o_name = NNI_OPT_WS_MSGMODE,
+ .o_set = ws_listener_set_msgmode,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_set = ws_listener_set_maxframe,
+ .o_get = ws_listener_get_maxframe,
+ },
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_set = ws_listener_set_fragsize,
+ .o_get = ws_listener_get_fragsize,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_set = ws_listener_set_recvmax,
+ .o_get = ws_listener_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_set = ws_listener_set_res_headers,
+ // XXX: Get not implemented yet; likely of marginal value.
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_set = ws_listener_set_proto,
+ .o_get = ws_listener_get_proto,
+ },
+ {
+ .o_name = NNG_OPT_URL,
+ .o_get = ws_listener_get_url,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_listener_set_header(nni_ws_listener *l, const char *name, const void *buf,
+ size_t sz, nni_type t)
+{
+ int rv;
+ name += strlen(NNG_OPT_WS_RESPONSE_HEADER);
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ rv = ws_set_header(&l->headers, name, buf);
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ rv = nni_setopt(ws_listener_options, name, l, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_server_setx(l->server, name, buf, sz, t);
+ }
+
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_listener_set_header(l, name, buf, sz, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ rv = nni_getopt(ws_listener_options, name, l, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_server_getx(l->server, name, buf, szp, t);
+ }
+ return (rv);
+}
+
+int
+nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url)
+{
+ nni_ws_listener *l;
+ int rv;
+ char * host;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ nni_aio_list_init(&l->aios);
+
+ NNI_LIST_INIT(&l->pend, nni_ws, node);
+ NNI_LIST_INIT(&l->reply, nni_ws, node);
+
+ // make a private copy of the url structure.
+ if ((rv = nng_url_clone(&l->url, url)) != 0) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ host = l->url->u_hostname;
+ if (strlen(host) == 0) {
+ host = NULL;
+ }
+ rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler);
+ if (rv != 0) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) ||
+ ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) ||
+ ((rv = nni_http_server_init(&l->server, url)) != 0)) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ l->fragsize = WS_DEF_MAXTXFRAME;
+ l->maxframe = WS_DEF_MAXRXFRAME;
+ l->recvmax = WS_DEF_RECVMAX;
+ l->isstream = false;
+ l->ops.sl_free = ws_listener_free;
+ l->ops.sl_close = ws_listener_close;
+ l->ops.sl_accept = ws_listener_accept;
+ l->ops.sl_listen = ws_listener_listen;
+ l->ops.sl_setx = ws_listener_setx;
+ l->ops.sl_getx = ws_listener_getx;
+ *wslp = (void *) l;
+ return (0);
}
void
@@ -1846,7 +2080,7 @@ ws_conn_cb(void *arg)
nni_cv_wake(&d->cv);
}
nni_mtx_unlock(&d->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
} else {
nni_mtx_unlock(&d->mtx);
}
@@ -1861,7 +2095,7 @@ ws_conn_cb(void *arg)
// This request was canceled for some reason.
nni_http_conn_fini(http);
nni_mtx_unlock(&ws->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
return;
}
@@ -1908,13 +2142,14 @@ err:
if (req != NULL) {
nni_http_req_free(req);
}
- nni_ws_fini(ws);
+ ws_reap(ws);
}
-void
-nni_ws_dialer_fini(nni_ws_dialer *d)
+static void
+ws_dialer_free(void *arg)
{
- ws_header *hdr;
+ nni_ws_dialer *d = arg;
+ ws_header * hdr;
nni_mtx_lock(&d->mtx);
while (!nni_list_empty(&d->wspend)) {
@@ -1933,65 +2168,18 @@ nni_ws_dialer_fini(nni_ws_dialer *d)
nni_http_client_fini(d->client);
}
if (d->url) {
- nni_url_free(d->url);
+ nng_url_free(d->url);
}
nni_cv_fini(&d->cv);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
-int
-nni_ws_dialer_init(nni_ws_dialer **dp, nni_url *url)
-{
- nni_ws_dialer *d;
- int rv;
-
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
- NNI_LIST_INIT(&d->headers, ws_header, node);
- NNI_LIST_INIT(&d->wspend, nni_ws, node);
- nni_mtx_init(&d->mtx);
- nni_cv_init(&d->cv, &d->mtx);
-
- if ((rv = nni_url_clone(&d->url, url)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
-
- if ((rv = nni_http_client_init(&d->client, url)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
- d->maxframe = 0;
- *dp = d;
- return (0);
-}
-
-int
-nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls)
-{
- int rv;
- nni_mtx_lock(&d->mtx);
- rv = nni_http_client_set_tls(d->client, tls);
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
-int
-nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp)
-{
- int rv;
- nni_mtx_lock(&d->mtx);
- rv = nni_http_client_get_tls(d->client, tlsp);
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
-void
-nni_ws_dialer_close(nni_ws_dialer *d)
+static void
+ws_dialer_close(void *arg)
{
- nni_ws *ws;
+ nni_ws_dialer *d = arg;
+ nni_ws * ws;
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
@@ -2005,24 +2193,6 @@ nni_ws_dialer_close(nni_ws_dialer *d)
nni_mtx_unlock(&d->mtx);
}
-int
-nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
-{
- int rv = 0;
- char *ns;
- nni_mtx_lock(&d->mtx);
- if ((ns = nni_strdup(proto)) == NULL) {
- rv = NNG_ENOMEM;
- } else {
- if (d->proto != NULL) {
- nni_strfree(d->proto);
- }
- d->proto = ns;
- }
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
static void
ws_dial_cancel(nni_aio *aio, void *arg, int rv)
{
@@ -2038,11 +2208,12 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
+static void
+ws_dialer_dial(void *arg, nni_aio *aio)
{
- nni_ws *ws;
- int rv;
+ nni_ws_dialer *d = arg;
+ nni_ws * ws;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -2055,72 +2226,275 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
if (d->closed) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
- ws_fini(ws);
+ ws_reap(ws);
return;
}
if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- ws_fini(ws);
+ ws_reap(ws);
return;
}
ws->dialer = d;
ws->useraio = aio;
ws->server = false;
ws->maxframe = d->maxframe;
+ ws->isstream = d->isstream;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
static int
-ws_set_header(nni_list *l, const char *n, const char *v)
+ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
{
- ws_header *hdr;
- char * nv;
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
- if ((nv = nni_strdup(v)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->isstream = !b;
+ nni_mtx_unlock(&d->mtx);
}
+ return (rv);
+}
- NNI_LIST_FOREACH (l, hdr) {
- if (nni_strcasecmp(hdr->name, n) == 0) {
- nni_strfree(hdr->value);
- hdr->value = nv;
- return (0);
- }
- }
+static int
+ws_dialer_set_size(
+ nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t)
+{
+ size_t val;
+ int rv;
- if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) {
- nni_strfree(nv);
- return (NNG_ENOMEM);
- }
- if ((hdr->name = nni_strdup(n)) == NULL) {
- nni_strfree(nv);
- NNI_FREE_STRUCT(hdr);
- return (NNG_ENOMEM);
+ // Max size is limited to 4 GB, but you really never want to have
+ // to have a larger value. If you think you need that, you're doing it
+ // wrong. You *can* set the size to 0 for unlimited.
+ if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ *valp = val;
+ nni_mtx_unlock(&d->mtx);
}
- hdr->value = nv;
- nni_list_append(l, hdr);
- return (0);
+ return (rv);
}
-int
-nni_ws_dialer_header(nni_ws_dialer *d, const char *n, const char *v)
+static int
+ws_dialer_get_size(
+ nni_ws_dialer *d, size_t *valp, void *buf, size_t *szp, nni_type t)
{
- int rv;
+ size_t val;
nni_mtx_lock(&d->mtx);
- rv = ws_set_header(&d->headers, n, v);
+ val = *valp;
nni_mtx_unlock(&d->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+ws_dialer_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->maxframe, buf, sz, t));
+}
+
+static int
+ws_dialer_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->maxframe, buf, szp, t));
+}
+
+static int
+ws_dialer_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->fragsize, buf, sz, t));
+}
+
+static int
+ws_dialer_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->fragsize, buf, szp, t));
+}
+
+static int
+ws_dialer_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->recvmax, buf, sz, t));
+}
+
+static int
+ws_dialer_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->recvmax, buf, szp, t));
+}
+
+static int
+ws_dialer_set_req_headers(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ rv = ws_set_headers(&d->headers, buf);
+ nni_mtx_unlock(&d->mtx);
+ }
return (rv);
}
-void
-nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe)
+static int
+ws_dialer_set_proto(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ char *ns;
+ if ((ns = nni_strdup(buf)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ nni_mtx_lock(&d->mtx);
+ if (d->proto != NULL) {
+ nni_strfree(d->proto);
+ }
+ d->proto = ns;
+ nni_mtx_unlock(&d->mtx);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
{
+ nni_ws_dialer *d = arg;
+ int rv;
nni_mtx_lock(&d->mtx);
- d->maxframe = maxframe;
+ rv = nni_copyout_str(d->proto != NULL ? d->proto : "", buf, szp, t);
nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static const nni_option ws_dialer_options[] = {
+ {
+ .o_name = NNI_OPT_WS_MSGMODE,
+ .o_set = ws_dialer_set_msgmode,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_set = ws_dialer_set_maxframe,
+ .o_get = ws_dialer_get_maxframe,
+ },
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_set = ws_dialer_set_fragsize,
+ .o_get = ws_dialer_get_fragsize,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_set = ws_dialer_set_recvmax,
+ .o_get = ws_dialer_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_set = ws_dialer_set_req_headers,
+ // XXX: Get not implemented yet; likely of marginal value.
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_set = ws_dialer_set_proto,
+ .o_get = ws_dialer_get_proto,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_dialer_set_header(
+ nni_ws_dialer *d, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ name += strlen(NNG_OPT_WS_REQUEST_HEADER);
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ rv = ws_set_header(&d->headers, name, buf);
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ rv = nni_setopt(ws_dialer_options, name, d, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_client_setx(d->client, name, buf, sz, t);
+ }
+
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_REQUEST_HEADER)) {
+ rv = ws_dialer_set_header(d, name, buf, sz, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ rv = nni_getopt(ws_dialer_options, name, d, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_client_getx(d->client, name, buf, szp, t);
+ }
+ return (rv);
+}
+
+int
+nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
+{
+ nni_ws_dialer *d;
+ int rv;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&d->headers, ws_header, node);
+ NNI_LIST_INIT(&d->wspend, nni_ws, node);
+ nni_mtx_init(&d->mtx);
+ nni_cv_init(&d->cv, &d->mtx);
+
+ if ((rv = nng_url_clone(&d->url, url)) != 0) {
+ ws_dialer_free(d);
+ return (rv);
+ }
+
+ if ((rv = nni_http_client_init(&d->client, url)) != 0) {
+ ws_dialer_free(d);
+ return (rv);
+ }
+ d->isstream = true;
+ d->recvmax = WS_DEF_RECVMAX;
+ d->maxframe = WS_DEF_MAXRXFRAME;
+ d->fragsize = WS_DEF_MAXTXFRAME;
+
+ d->ops.sd_free = ws_dialer_free;
+ d->ops.sd_close = ws_dialer_close;
+ d->ops.sd_dial = ws_dialer_dial;
+ d->ops.sd_setx = ws_dialer_setx;
+ d->ops.sd_getx = ws_dialer_getx;
+ *dp = (void *) d;
+ return (0);
}
// Dialer does not get a hook chance, as it can examine the request
@@ -2130,3 +2504,293 @@ nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe)
// The implementation will send periodic PINGs, and respond with
// PONGs.
+
+static void
+ws_str_free(void *arg)
+{
+ nni_ws *ws = arg;
+ nni_reap(&ws->reap, ws_fini, ws);
+}
+
+static void
+ws_str_close(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
+}
+
+static void
+ws_str_send(void *arg, nni_aio *aio)
+{
+ nni_ws * ws = arg;
+ int rv;
+ ws_frame *frame;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (!ws->isstream) {
+ nni_msg *msg;
+ unsigned niov;
+ nni_iov iov[2];
+ if ((msg = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ return;
+ }
+ niov = 0;
+ if (nng_msg_header_len(msg) > 0) {
+ iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ niov++;
+ }
+ iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ niov++;
+
+ // Scribble into the iov for now.
+ nni_aio_set_iov(aio, niov, iov);
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ nni_aio_finish_error(aio, NNG_ENOMEM);
+ return;
+ }
+ frame->aio = aio;
+ if ((rv = ws_frame_prep_tx(ws, frame)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
+ return;
+ }
+
+ nni_mtx_lock(&ws->mtx);
+
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ ws_frame_fini(frame);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
+ return;
+ }
+ nni_aio_set_prov_extra(aio, 0, frame);
+ nni_list_append(&ws->sendq, aio);
+ nni_list_append(&ws->txq, frame);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_str_recv(void *arg, nng_aio *aio)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&ws->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&ws->recvq, aio);
+ if (nni_list_first(&ws->recvq) == aio) {
+ ws_read_finish_msg(ws);
+ }
+ ws_start_read(ws);
+
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static int
+ws_get_request_headers(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->reqhdrs == NULL) {
+ ws->reqhdrs = nni_http_req_headers(ws->req);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return (nni_copyout_str(ws->reqhdrs, buf, szp, t));
+}
+
+static int
+ws_get_response_headers(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->reshdrs == NULL) {
+ ws->reshdrs = nni_http_res_headers(ws->res);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return (nni_copyout_str(ws->reshdrs, buf, szp, t));
+}
+
+static int
+ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t));
+}
+
+static const nni_option ws_options[] = {
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_get = ws_get_request_headers,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_get = ws_get_response_headers,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_URI,
+ .o_get = ws_get_request_uri,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_str_setx(void *arg, const char *nm, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ // Headers can only be set.
+ nni_mtx_lock(&ws->mtx);
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ rv = nni_http_conn_setopt(ws->http, nm, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_setopt(ws_options, nm, ws, buf, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER) ||
+ startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) {
+ return (NNG_EREADONLY);
+ }
+ }
+
+ return (rv);
+}
+
+static int
+ws_get_req_header(
+ nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ const char *s;
+ nm += strlen(NNG_OPT_WS_REQUEST_HEADER);
+ s = nni_http_req_get_header(ws->req, nm);
+ if (s == NULL) {
+ return (NNG_ENOENT);
+ }
+ return (nni_copyout_str(s, buf, szp, t));
+}
+
+static int
+ws_get_res_header(
+ nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ const char *s;
+ nm += strlen(NNG_OPT_WS_RESPONSE_HEADER);
+ s = nni_http_res_get_header(ws->res, nm);
+ if (s == NULL) {
+ return (NNG_ENOENT);
+ }
+ return (nni_copyout_str(s, buf, szp, t));
+}
+
+static int
+ws_str_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ rv = nni_http_conn_getopt(ws->http, nm, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_getopt(ws_options, nm, ws, buf, szp, t);
+ }
+ // Check for generic headers...
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER)) {
+ rv = ws_get_req_header(ws, nm, buf, szp, t);
+ } else if (startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_get_res_header(ws, nm, buf, szp, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_check_size(const void *buf, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
+}
+
+static const nni_chkoption ws_chkopts[] = {
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+int
+nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ int rv;
+
+ rv = nni_chkopt(ws_chkopts, name, data, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_stream_checkopt("tcp", name, data, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_stream_checkopt("tls+tcp", name, data, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_REQUEST_HEADER) ||
+ startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_check_string(data, sz, t);
+ }
+ }
+ // Potentially, add checks for header options.
+ return (rv);
+}
diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h
index 88b4bfb4..bbc58d30 100644
--- a/src/supplemental/websocket/websocket.h
+++ b/src/supplemental/websocket/websocket.h
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -18,7 +18,10 @@ typedef struct nni_ws nni_ws;
typedef struct nni_ws_listener nni_ws_listener;
typedef struct nni_ws_dialer nni_ws_dialer;
-typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
+// Internal option, not for normal use (at present). This sets the
+// dialer/listener into message mode. This is used by the SP transport.
+// This is a boolean.
+#define NNI_OPT_WS_MSGMODE "ws:msgmode"
// Specify URL as ws://[<host>][:port][/path]
// If host is missing, INADDR_ANY is assumed. If port is missing,
@@ -26,44 +29,10 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
// on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY
// makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?)
-extern int nni_ws_listener_init(nni_ws_listener **, nni_url *);
-extern void nni_ws_listener_fini(nni_ws_listener *);
-extern void nni_ws_listener_close(nni_ws_listener *);
-extern int nni_ws_listener_proto(nni_ws_listener *, const char *);
-extern int nni_ws_listener_listen(nni_ws_listener *);
-extern void nni_ws_listener_accept(nni_ws_listener *, nng_aio *);
-extern void nni_ws_listener_hook(
- nni_ws_listener *, nni_ws_listen_hook, void *);
-extern int nni_ws_listener_set_tls(nni_ws_listener *, nng_tls_config *);
-extern int nni_ws_listener_get_tls(nni_ws_listener *, nng_tls_config **s);
-extern void nni_ws_listener_set_maxframe(nni_ws_listener *, size_t);
-
-extern int nni_ws_dialer_init(nni_ws_dialer **, nni_url *);
-extern void nni_ws_dialer_fini(nni_ws_dialer *);
-extern void nni_ws_dialer_close(nni_ws_dialer *);
-extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *);
-extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
-extern void nni_ws_dialer_set_maxframe(nni_ws_dialer *, size_t);
-extern void nni_ws_dialer_dial(nni_ws_dialer *, nng_aio *);
-extern int nni_ws_dialer_set_tls(nni_ws_dialer *, nng_tls_config *);
-extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **);
-
-// Dialer does not get a hook chance, as it can examine the request and reply
-// after dial is done; this is not a 3-way handshake, so the dialer does
-// not confirm the server's response at the HTTP level. (It can still issue
-// a websocket close).
-
-extern void nni_ws_send_msg(nni_ws *, nng_aio *);
-extern void nni_ws_recv_msg(nni_ws *, nng_aio *);
-extern void nni_ws_close(nni_ws *);
-extern void nni_ws_close_error(nni_ws *, uint16_t);
-extern void nni_ws_fini(nni_ws *);
-extern const char *nni_ws_response_headers(nni_ws *);
-extern const char *nni_ws_request_headers(nni_ws *);
-extern int nni_ws_getopt(nni_ws *, const char *, void *, size_t *, nni_type);
-extern int nni_ws_setopt(
- nni_ws *, const char *, const void *, size_t, nni_type);
-
-// The implementation will send periodic PINGs, and respond with PONGs.
+// Much of the websocket API is still "private", meeaning you should not
+// rely upon it being around.
+extern int nni_ws_listener_alloc(nng_stream_listener **, const nni_url *);
+extern int nni_ws_dialer_alloc(nng_stream_dialer **, const nni_url *);
+extern int nni_ws_checkopt(const char *, const void *, size_t, nni_type);
#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H