diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-01-21 22:40:10 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-16 19:22:27 -0800 |
| commit | 5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch) | |
| tree | bf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/supplemental | |
| parent | ca655b9db689ee0e655248b1a9f166b8db6cc984 (diff) | |
| download | nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2 nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip | |
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic
stream API for all transports. There have been related bugs fixed
along the way. Additionally the man pages have changed.
The old non-polymorphic APIs are removed now. This is a breaking
change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/supplemental')
| -rw-r--r-- | src/supplemental/http/http_api.h | 18 | ||||
| -rw-r--r-- | src/supplemental/http/http_client.c | 181 | ||||
| -rw-r--r-- | src/supplemental/http/http_conn.c | 125 | ||||
| -rw-r--r-- | src/supplemental/http/http_server.c | 192 | ||||
| -rw-r--r-- | src/supplemental/ipc/CMakeLists.txt | 16 | ||||
| -rw-r--r-- | src/supplemental/ipc/ipc.c | 138 | ||||
| -rw-r--r-- | src/supplemental/tcp/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | src/supplemental/tcp/tcp.c | 444 | ||||
| -rw-r--r-- | src/supplemental/tls/mbedtls/tls.c | 363 | ||||
| -rw-r--r-- | src/supplemental/tls/none/tls.c | 92 | ||||
| -rw-r--r-- | src/supplemental/tls/tls_api.h | 52 | ||||
| -rw-r--r-- | src/supplemental/tls/tls_common.c | 754 | ||||
| -rw-r--r-- | src/supplemental/websocket/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/supplemental/websocket/stub.c | 40 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 2066 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.h | 53 |
16 files changed, 2621 insertions, 1921 deletions
diff --git a/src/supplemental/http/http_api.h b/src/supplemental/http/http_api.h index 569e8532..45738318 100644 --- a/src/supplemental/http/http_api.h +++ b/src/supplemental/http/http_api.h @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -96,9 +96,7 @@ extern void *nni_http_conn_get_ctx(nni_http_conn *); // These initialization functions create stream for HTTP transactions. // They should only be used by the server or client HTTP implementations, // and are not for use by other code. -extern int nni_http_conn_init_tcp(nni_http_conn **, nni_tcp_conn *); -extern int nni_http_conn_init_tls( - nni_http_conn **, struct nng_tls_config *, nni_tcp_conn *); +extern int nni_http_conn_init(nni_http_conn **, nng_stream *); extern void nni_http_conn_close(nni_http_conn *); extern void nni_http_conn_fini(nni_http_conn *); @@ -207,6 +205,11 @@ extern int nni_http_server_set_tls(nni_http_server *, struct nng_tls_config *); extern int nni_http_server_get_tls( nni_http_server *, struct nng_tls_config **); +extern int nni_http_server_setx( + nni_http_server *, const char *, const void *, size_t, nni_type); +extern int nni_http_server_getx( + nni_http_server *, const char *, void *, size_t *, nni_type); + // nni_http_server_start starts listening on the supplied port. extern int nni_http_server_start(nni_http_server *); @@ -350,6 +353,11 @@ extern int nni_http_client_set_tls(nni_http_client *, struct nng_tls_config *); extern int nni_http_client_get_tls( nni_http_client *, struct nng_tls_config **); +extern int nni_http_client_setx( + nni_http_client *, const char *, const void *, size_t, nni_type); +extern int nni_http_client_getx( + nni_http_client *, const char *, void *, size_t *, nni_type); + extern void nni_http_client_connect(nni_http_client *, nni_aio *); // nni_http_transact_conn is used to perform a round-trip exchange (i.e. a diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index 798cbe14..c35dcaa8 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -1,6 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -14,25 +15,20 @@ #include <string.h> #include "core/nng_impl.h" -#include "nng/supplemental/tls/tls.h" #include "supplemental/tls/tls_api.h" +#include <nng/supplemental/tls/tls.h> + #include "http_api.h" static nni_mtx http_txn_lk; struct nng_http_client { - nni_list aios; - nni_mtx mtx; - bool closed; - bool resolving; - nng_tls_config *tls; - nni_aio * aio; - nng_sockaddr sa; - nni_tcp_dialer *dialer; - char * host; - char * port; - nni_url * url; + nni_list aios; + nni_mtx mtx; + bool closed; + nni_aio * aio; + nng_stream_dialer *dialer; }; static void @@ -43,9 +39,7 @@ http_dial_start(nni_http_client *c) if ((aio = nni_list_first(&c->aios)) == NULL) { return; } - c->resolving = true; - nni_aio_set_input(c->aio, 0, &c->sa); - nni_tcp_resolv(c->host, c->port, NNG_AF_UNSPEC, 0, c->aio); + nng_stream_dialer_dial(c->dialer, c->aio); } static void @@ -54,7 +48,7 @@ http_dial_cb(void *arg) nni_http_client *c = arg; nni_aio * aio; int rv; - nni_tcp_conn * tcp; + nng_stream * stream; nni_http_conn * conn; nni_mtx_lock(&c->mtx); @@ -63,9 +57,9 @@ http_dial_cb(void *arg) if ((aio = nni_list_first(&c->aios)) == NULL) { // User abandoned request, and no residuals left. nni_mtx_unlock(&c->mtx); - if ((rv == 0) && !c->resolving) { - tcp = nni_aio_get_output(c->aio, 0); - nni_tcp_conn_fini(tcp); + if (rv == 0) { + stream = nni_aio_get_output(c->aio, 0); + nng_stream_free(stream); } return; } @@ -78,28 +72,16 @@ http_dial_cb(void *arg) return; } - if (c->resolving) { - // This was a DNS lookup -- advance to normal TCP connect. - c->resolving = false; - nni_tcp_dialer_dial(c->dialer, &c->sa, c->aio); - nni_mtx_unlock(&c->mtx); - return; - } - nni_aio_list_remove(aio); - tcp = nni_aio_get_output(c->aio, 0); - NNI_ASSERT(tcp != NULL); + stream = nni_aio_get_output(c->aio, 0); + NNI_ASSERT(stream != NULL); - if (c->tls != NULL) { - rv = nni_http_conn_init_tls(&conn, c->tls, tcp); - } else { - rv = nni_http_conn_init_tcp(&conn, tcp); - } + rv = nni_http_conn_init(&conn, stream); http_dial_start(c); nni_mtx_unlock(&c->mtx); if (rv != 0) { - // the conn_init function will have already discard tcp. + // the conn_init function will have already discard stream. nni_aio_finish_error(aio, rv); return; } @@ -112,16 +94,8 @@ void nni_http_client_fini(nni_http_client *c) { nni_aio_fini(c->aio); - nni_tcp_dialer_fini(c->dialer); + nng_stream_dialer_free(c->dialer); nni_mtx_fini(&c->mtx); -#ifdef NNG_SUPP_TLS - if (c->tls != NULL) { - nni_tls_config_fini(c->tls); - } -#endif - nni_strfree(c->host); - nni_strfree(c->port); - NNI_FREE_STRUCT(c); } @@ -130,59 +104,37 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) { int rv; nni_http_client *c; + nng_url myurl; + + // Rewrite URLs to either TLS or TCP. + memcpy(&myurl, url, sizeof(myurl)); + if ((strcmp(url->u_scheme, "http") == 0) || + (strcmp(url->u_scheme, "ws") == 0)) { + myurl.u_scheme = "tcp"; + } else if ((strcmp(url->u_scheme, "https") == 0) || + (strcmp(url->u_scheme, "wss") == 0)) { + myurl.u_scheme = "tls+tcp"; + } else { + return (NNG_EADDRINVAL); + } if (strlen(url->u_hostname) == 0) { // We require a valid hostname. return (NNG_EADDRINVAL); } - if ((strcmp(url->u_scheme, "http") != 0) && -#ifdef NNG_SUPP_TLS - (strcmp(url->u_scheme, "https") != 0) && - (strcmp(url->u_scheme, "wss") != 0) && -#endif - (strcmp(url->u_scheme, "ws") != 0)) { - return (NNG_EADDRINVAL); - } if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&c->mtx); nni_aio_list_init(&c->aios); - if (((c->host = nni_strdup(url->u_hostname)) == NULL) || - ((strlen(url->u_port) != 0) && - ((c->port = nni_strdup(url->u_port)) == NULL))) { - nni_http_client_fini(c); - return (NNG_ENOMEM); - } - -#ifdef NNG_SUPP_TLS - if ((strcmp(url->u_scheme, "https") == 0) || - (strcmp(url->u_scheme, "wss") == 0)) { - rv = nni_tls_config_init(&c->tls, NNG_TLS_MODE_CLIENT); - if (rv != 0) { - nni_http_client_fini(c); - return (rv); - } - // Take the server name right from the client URL. We only - // consider the name, as the port is never part of the - // certificate. - rv = nng_tls_config_server_name(c->tls, url->u_hostname); - if (rv != 0) { - nni_http_client_fini(c); - return (rv); - } - // Note that the application has to supply the location of - // certificates. We could probably use a default based - // on environment or common locations used by OpenSSL, but - // as there is no way to *unload* the cert file, lets not - // do that. (We might want to consider a mode to reset.) + if ((rv = nng_stream_dialer_alloc_url(&c->dialer, &myurl)) != 0) { + nni_http_client_fini(c); + return (rv); } -#endif - if (((rv = nni_tcp_dialer_init(&c->dialer)) != 0) || - ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0)) { + if ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0) { nni_http_client_fini(c); return (rv); } @@ -192,46 +144,37 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url) } int -nni_http_client_set_tls(nni_http_client *c, struct nng_tls_config *tls) +nni_http_client_set_tls(nni_http_client *c, nng_tls_config *tls) { -#ifdef NNG_SUPP_TLS - struct nng_tls_config *old; - nni_mtx_lock(&c->mtx); - old = c->tls; - c->tls = tls; - if (tls != NULL) { - nni_tls_config_hold(tls); - } - nni_mtx_unlock(&c->mtx); - if (old != NULL) { - nni_tls_config_fini(old); - } - return (0); -#else - NNI_ARG_UNUSED(c); - NNI_ARG_UNUSED(tls); - return (NNG_EINVAL); -#endif + int rv; + rv = nni_stream_dialer_setx(c->dialer, NNG_OPT_TLS_CONFIG, &tls, + sizeof(tls), NNI_TYPE_POINTER); + return (rv); } int -nni_http_client_get_tls(nni_http_client *c, struct nng_tls_config **tlsp) +nni_http_client_get_tls(nni_http_client *c, nng_tls_config **tlsp) { -#ifdef NNG_SUPP_TLS - nni_mtx_lock(&c->mtx); - if (c->tls == NULL) { - nni_mtx_unlock(&c->mtx); - return (NNG_EINVAL); - } - nni_tls_config_hold(c->tls); - *tlsp = c->tls; - nni_mtx_unlock(&c->mtx); - return (0); -#else - NNI_ARG_UNUSED(c); - NNI_ARG_UNUSED(tlsp); - return (NNG_ENOTSUP); -#endif + size_t sz = sizeof(*tlsp); + int rv; + rv = nni_stream_dialer_getx( + c->dialer, NNG_OPT_TLS_CONFIG, tlsp, &sz, NNI_TYPE_POINTER); + return (rv); +} + +int +nni_http_client_setx(nni_http_client *c, const char *name, const void *buf, + size_t sz, nni_type t) +{ + // We have no local options, but we just pass them straight through. + return (nni_stream_dialer_setx(c->dialer, name, buf, sz, t)); +} + +int +nni_http_client_getx( + nni_http_client *c, const char *name, void *buf, size_t *szp, nni_type t) +{ + return (nni_stream_dialer_getx(c->dialer, name, buf, szp, t)); } static void diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 7c6159cd..1fc2c34e 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -40,25 +40,6 @@ enum write_flavor { HTTP_WR_RES, }; -typedef void (*http_read_fn)(void *, nni_aio *); -typedef void (*http_write_fn)(void *, nni_aio *); -typedef void (*http_close_fn)(void *); -typedef void (*http_fini_fn)(void *); -typedef int (*http_addr_fn)(void *, nni_sockaddr *); -typedef int (*http_getopt_fn)( - void *, const char *, void *, size_t *, nni_type); -typedef int (*http_setopt_fn)( - void *, const char *, const void *, size_t, nni_type); - -typedef struct { - http_read_fn h_read; - http_write_fn h_write; - http_getopt_fn h_getopt; - http_setopt_fn h_setopt; - http_close_fn h_close; - http_fini_fn h_fini; -} http_tran; - #define SET_RD_FLAVOR(aio, f) \ nni_aio_set_prov_extra(aio, 0, ((void *) (intptr_t)(f))) #define GET_RD_FLAVOR(aio) (int) ((intptr_t) nni_aio_get_prov_extra(aio, 0)) @@ -67,17 +48,11 @@ typedef struct { #define GET_WR_FLAVOR(aio) (int) ((intptr_t) nni_aio_get_prov_extra(aio, 0)) struct nng_http_conn { - void * sock; - http_read_fn rd; - http_write_fn wr; - http_setopt_fn setopt; - http_getopt_fn getopt; - http_close_fn close; - http_fini_fn fini; - void * ctx; - bool closed; - nni_list rdq; // high level http read requests - nni_list wrq; // high level http write requests + nng_stream *sock; + void * ctx; + bool closed; + nni_list rdq; // high level http read requests + nni_list wrq; // high level http write requests nni_aio *rd_uaio; // user aio for read nni_aio *wr_uaio; // user aio for write @@ -138,7 +113,7 @@ http_close(nni_http_conn *conn) } if (conn->sock != NULL) { - conn->close(conn->sock); + nng_stream_close(conn->sock); } } @@ -204,7 +179,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio) // to get *any* data for a partial RAW read.) nni_aio_set_data(conn->rd_aio, 1, NULL); nni_aio_set_iov(conn->rd_aio, niov, iov); - conn->rd(conn->sock, conn->rd_aio); + nng_stream_recv(conn->sock, conn->rd_aio); return (NNG_EAGAIN); case HTTP_RD_REQ: @@ -220,7 +195,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio) iov1.iov_len = conn->rd_bufsz - conn->rd_put; nni_aio_set_iov(conn->rd_aio, 1, &iov1); nni_aio_set_data(conn->rd_aio, 1, aio); - conn->rd(conn->sock, conn->rd_aio); + nng_stream_recv(conn->sock, conn->rd_aio); } return (rv); @@ -237,7 +212,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio) iov1.iov_len = conn->rd_bufsz - conn->rd_put; nni_aio_set_iov(conn->rd_aio, 1, &iov1); nni_aio_set_data(conn->rd_aio, 1, aio); - conn->rd(conn->sock, conn->rd_aio); + nng_stream_recv(conn->sock, conn->rd_aio); } return (rv); @@ -254,7 +229,7 @@ http_rd_buf(nni_http_conn *conn, nni_aio *aio) iov1.iov_len = conn->rd_bufsz - conn->rd_put; nni_aio_set_iov(conn->rd_aio, 1, &iov1); nni_aio_set_data(conn->rd_aio, 1, aio); - conn->rd(conn->sock, conn->rd_aio); + nng_stream_recv(conn->sock, conn->rd_aio); } return (rv); } @@ -428,7 +403,7 @@ http_wr_start(nni_http_conn *conn) nni_aio_get_iov(aio, &niov, &iov); nni_aio_set_iov(conn->wr_aio, niov, iov); - conn->wr(conn->sock, conn->wr_aio); + nng_stream_send(conn->sock, conn->wr_aio); } static void @@ -475,7 +450,7 @@ http_wr_cb(void *arg) if (nni_aio_iov_count(aio) > 0) { // We have more to transmit - start another and leave // (we will get called again when it is done). - conn->wr(conn->sock, aio); + nng_stream_send(conn->sock, aio); nni_mtx_unlock(&conn->mtx); return; } @@ -680,7 +655,7 @@ nni_http_conn_getopt( if (conn->closed) { rv = NNG_ECLOSED; } else { - rv = conn->getopt(conn->sock, name, buf, szp, t); + rv = nni_stream_getx(conn->sock, name, buf, szp, t); } nni_mtx_unlock(&conn->mtx); return (rv); @@ -695,7 +670,7 @@ nni_http_conn_setopt(nni_http_conn *conn, const char *name, const void *buf, if (conn->closed) { rv = NNG_ECLOSED; } else { - rv = conn->setopt(conn->sock, name, buf, sz, t); + rv = nni_stream_setx(conn->sock, name, buf, sz, t); } nni_mtx_unlock(&conn->mtx); return (rv); @@ -709,8 +684,8 @@ nni_http_conn_fini(nni_http_conn *conn) nni_mtx_lock(&conn->mtx); http_close(conn); - if ((conn->sock != NULL) && (conn->fini != NULL)) { - conn->fini(conn->sock); + if (conn->sock != NULL) { + nng_stream_free(conn->sock); conn->sock = NULL; } nni_mtx_unlock(&conn->mtx); @@ -723,7 +698,7 @@ nni_http_conn_fini(nni_http_conn *conn) } static int -http_init(nni_http_conn **connp, http_tran *tran, void *data) +http_init(nni_http_conn **connp, nng_stream *data) { nni_http_conn *conn; int rv; @@ -747,73 +722,19 @@ http_init(nni_http_conn **connp, http_tran *tran, void *data) return (rv); } - conn->sock = data; - conn->rd = tran->h_read; - conn->wr = tran->h_write; - conn->close = tran->h_close; - conn->fini = tran->h_fini; - conn->getopt = tran->h_getopt; - conn->setopt = tran->h_setopt; + conn->sock = data; *connp = conn; return (0); } -static http_tran http_tcp_ops = { - .h_read = (http_read_fn) nni_tcp_conn_recv, - .h_write = (http_write_fn) nni_tcp_conn_send, - .h_close = (http_close_fn) nni_tcp_conn_close, - .h_fini = (http_fini_fn) nni_tcp_conn_fini, - .h_getopt = (http_getopt_fn) nni_tcp_conn_getopt, - .h_setopt = (http_setopt_fn) nni_tcp_conn_setopt, -}; - int -nni_http_conn_init_tcp(nni_http_conn **connp, nni_tcp_conn *tcp) +nni_http_conn_init(nni_http_conn **connp, nng_stream *stream) { int rv; - if ((rv = http_init(connp, &http_tcp_ops, tcp)) != 0) { - nni_tcp_conn_fini(tcp); - } - return (rv); -} - -#ifdef NNG_SUPP_TLS -static http_tran http_tls_ops = { - .h_read = (http_read_fn) nni_tls_recv, - .h_write = (http_write_fn) nni_tls_send, - .h_close = (http_close_fn) nni_tls_close, - .h_fini = (http_fini_fn) nni_tls_fini, - .h_getopt = (http_getopt_fn) nni_tls_getopt, - .h_setopt = (http_setopt_fn) nni_tls_setopt, -}; - -int -nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) -{ - nni_tls *tls; - int rv; - - if ((rv = nni_tls_init(&tls, cfg, tcp)) != 0) { - nni_tcp_conn_fini(tcp); - return (rv); - } - - if ((rv = http_init(connp, &http_tls_ops, tls)) != 0) { - nni_tls_fini(tls); + if ((rv = http_init(connp, stream)) != 0) { + nng_stream_free(stream); } return (rv); } -#else -int -nni_http_conn_init_tls( - nni_http_conn **connp, struct nng_tls_config *cfg, nni_tcp_conn *tcp) -{ - NNI_ARG_UNUSED(connp); - NNI_ARG_UNUSED(cfg); - nni_tcp_conn_fini(tcp); - return (NNG_ENOTSUP); -} -#endif // NNG_SUPP_TLS diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index a6343d87..939273b7 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -1,7 +1,8 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 QXSoftware <lh563566994@126.com> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -68,22 +69,21 @@ typedef struct http_error { } http_error; struct nng_http_server { - nng_sockaddr addr; - nni_list_node node; - int refcnt; - int starts; - nni_list handlers; - nni_list conns; - nni_mtx mtx; - bool closed; - nng_tls_config * tls; - nni_aio * accaio; - nni_tcp_listener *listener; - char * port; - char * hostname; - nni_list errors; - nni_mtx errors_mtx; - nni_reap_item reap; + nng_sockaddr addr; + nni_list_node node; + int refcnt; + int starts; + nni_list handlers; + nni_list conns; + nni_mtx mtx; + bool closed; + nni_aio * accaio; + nng_stream_listener *listener; + char * port; + char * hostname; + nni_list errors; + nni_mtx errors_mtx; + nni_reap_item reap; }; int @@ -720,13 +720,13 @@ http_sconn_cbdone(void *arg) } static int -http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp) +http_sconn_init(http_sconn **scp, nng_stream *stream) { http_sconn *sc; int rv; if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) { - nni_tcp_conn_fini(tcp); + nng_stream_free(stream); return (NNG_ENOMEM); } @@ -741,11 +741,7 @@ http_sconn_init(http_sconn **scp, nni_http_server *s, nni_tcp_conn *tcp) return (rv); } - if (s->tls != NULL) { - rv = nni_http_conn_init_tls(&sc->conn, s->tls, tcp); - } else { - rv = nni_http_conn_init_tcp(&sc->conn, tcp); - } + rv = nni_http_conn_init(&sc->conn, stream); if (rv != 0) { http_sconn_close(sc); return (rv); @@ -760,7 +756,7 @@ http_server_acccb(void *arg) { nni_http_server *s = arg; nni_aio * aio = s->accaio; - nni_tcp_conn * tcp; + nng_stream * stream; http_sconn * sc; int rv; @@ -768,22 +764,22 @@ http_server_acccb(void *arg) if ((rv = nni_aio_result(aio)) != 0) { if (!s->closed) { // try again? - nni_tcp_listener_accept(s->listener, s->accaio); + nng_stream_listener_accept(s->listener, s->accaio); } nni_mtx_unlock(&s->mtx); return; } - tcp = nni_aio_get_output(aio, 0); + stream = nni_aio_get_output(aio, 0); if (s->closed) { // If we're closing, then reject this one. - nni_tcp_conn_fini(tcp); + nng_stream_free(stream); nni_mtx_unlock(&s->mtx); return; } - if (http_sconn_init(&sc, s, tcp) != 0) { - // The TCP structure is already cleaned up. + if (http_sconn_init(&sc, stream) != 0) { + // The stream structure is already cleaned up. // Start another accept attempt. - nni_tcp_listener_accept(s->listener, s->accaio); + nng_stream_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); return; } @@ -792,7 +788,7 @@ http_server_acccb(void *arg) sc->handler = NULL; nni_http_read_req(sc->conn, sc->req, sc->rxaio); - nni_tcp_listener_accept(s->listener, s->accaio); + nng_stream_listener_accept(s->listener, s->accaio); nni_mtx_unlock(&s->mtx); } @@ -812,20 +808,13 @@ http_server_fini(nni_http_server *s) nni_mtx_unlock(&s->mtx); return; } - if (s->listener != NULL) { - nni_tcp_listener_fini(s->listener); - } + nng_stream_listener_free(s->listener); while ((h = nni_list_first(&s->handlers)) != NULL) { nni_list_remove(&s->handlers, h); h->refcnt--; nni_http_handler_fini(h); } nni_mtx_unlock(&s->mtx); -#ifdef NNG_SUPP_TLS - if (s->tls != NULL) { - nni_tls_config_fini(s->tls); - } -#endif nni_mtx_lock(&s->errors_mtx); while ((epage = nni_list_first(&s->errors)) != NULL) { nni_list_remove(&s->errors, epage); @@ -847,16 +836,20 @@ http_server_init(nni_http_server **serverp, const nni_url *url) { nni_http_server *s; int rv; - nni_aio * aio; - - if ((strcmp(url->u_scheme, "http") != 0) && -#ifdef NNG_SUPP_TLS - (strcmp(url->u_scheme, "https") != 0) && - (strcmp(url->u_scheme, "wss") != 0) && -#endif - (strcmp(url->u_scheme, "ws") != 0)) { + nng_url myurl; + + // Rewrite URLs to either TLS or TCP. + memcpy(&myurl, url, sizeof(myurl)); + if ((strcmp(url->u_scheme, "http") == 0) || + (strcmp(url->u_scheme, "ws") == 0)) { + myurl.u_scheme = "tcp"; + } else if ((strcmp(url->u_scheme, "https") == 0) || + (strcmp(url->u_scheme, "wss") == 0)) { + myurl.u_scheme = "tls+tcp"; + } else { return (NNG_EADDRINVAL); } + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } @@ -884,34 +877,11 @@ http_server_init(nni_http_server **serverp, const nni_url *url) return (NNG_ENOMEM); } -#ifdef NNG_SUPP_TLS - if ((strcmp(url->u_scheme, "https") == 0) || - (strcmp(url->u_scheme, "wss") == 0)) { - rv = nni_tls_config_init(&s->tls, NNG_TLS_MODE_SERVER); - if (rv != 0) { - http_server_fini(s); - return (rv); - } - } -#endif - - // Do the DNS lookup *now*. This means that this is - // synchronous, but it should be fast, since it should either - // resolve as a number, or resolve locally, without having to - // hit up DNS. - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - http_server_fini(s); - return (rv); - } - nni_aio_set_input(aio, 0, &s->addr); - nni_tcp_resolv(s->hostname, s->port, NNG_AF_UNSPEC, true, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - if (rv != 0) { + if ((rv = nng_stream_listener_alloc_url(&s->listener, &myurl)) != 0) { http_server_fini(s); return (rv); } + s->refcnt = 1; *serverp = s; return (0); @@ -950,15 +920,10 @@ static int http_server_start(nni_http_server *s) { int rv; - if ((rv = nni_tcp_listener_init(&s->listener)) != 0) { - return (rv); - } - if ((rv = nni_tcp_listener_listen(s->listener, &s->addr)) != 0) { - nni_tcp_listener_fini(s->listener); - s->listener = NULL; + if ((rv = nng_stream_listener_listen(s->listener)) != 0) { return (rv); } - nni_tcp_listener_accept(s->listener, s->accaio); + nng_stream_listener_accept(s->listener, s->accaio); return (0); } @@ -992,7 +957,7 @@ http_server_stop(nni_http_server *s) // Close the TCP endpoint that is listening. if (s->listener) { - nni_tcp_listener_close(s->listener); + nng_stream_listener_close(s->listener); } // Stopping the server is a hard stop -- it aborts any work @@ -1764,50 +1729,37 @@ nni_http_handler_init_static(nni_http_handler **hpp, const char *uri, } int -nni_http_server_set_tls(nni_http_server *s, nng_tls_config *tcfg) +nni_http_server_set_tls(nni_http_server *s, nng_tls_config *tls) { -#ifdef NNG_SUPP_TLS - nng_tls_config *old; - nni_mtx_lock(&s->mtx); - if (s->starts) { - nni_mtx_unlock(&s->mtx); - return (NNG_EBUSY); - } - old = s->tls; - s->tls = tcfg; - if (tcfg) { - nni_tls_config_hold(tcfg); - } - nni_mtx_unlock(&s->mtx); - if (old) { - nni_tls_config_fini(old); - } - return (0); -#else - NNI_ARG_UNUSED(s); - NNI_ARG_UNUSED(tcfg); - return (NNG_ENOTSUP); -#endif + int rv; + rv = nni_stream_listener_setx(s->listener, NNG_OPT_TLS_CONFIG, &tls, + sizeof(tls), NNI_TYPE_POINTER); + return (rv); } int -nni_http_server_get_tls(nni_http_server *s, nng_tls_config **tp) +nni_http_server_get_tls(nni_http_server *s, nng_tls_config **tlsp) { -#ifdef NNG_SUPP_TLS - nni_mtx_lock(&s->mtx); - if (s->tls == NULL) { - nni_mtx_unlock(&s->mtx); - return (NNG_EINVAL); - } - nni_tls_config_hold(s->tls); - *tp = s->tls; - nni_mtx_unlock(&s->mtx); - return (0); -#else - NNI_ARG_UNUSED(s); - NNI_ARG_UNUSED(tp); - return (NNG_ENOTSUP); -#endif + size_t sz = sizeof(*tlsp); + int rv; + rv = nni_stream_listener_getx( + s->listener, NNG_OPT_TLS_CONFIG, tlsp, &sz, NNI_TYPE_POINTER); + return (rv); +} + +int +nni_http_server_setx(nni_http_server *s, const char *name, const void *buf, + size_t sz, nni_type t) +{ + // We have no local options, but we just pass them straight through. + return (nni_stream_listener_setx(s->listener, name, buf, sz, t)); +} + +int +nni_http_server_getx( + nni_http_server *s, const char *name, void *buf, size_t *szp, nni_type t) +{ + return (nni_stream_listener_getx(s->listener, name, buf, szp, t)); } void diff --git a/src/supplemental/ipc/CMakeLists.txt b/src/supplemental/ipc/CMakeLists.txt deleted file mode 100644 index 3bc0e4de..00000000 --- a/src/supplemental/ipc/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -# -# Copyright 2018 Capitar IT Group BV <info@capitar.com> -# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -# Copyright 2018 Devolutions <info@devolutions.net> -# -# This software is supplied under the terms of the MIT License, a -# copy of which should be located in the distribution where this -# file was obtained (LICENSE.txt). A copy of the license may also be -# found online at https://opensource.org/licenses/MIT. -# - -set(_SRCS supplemental/ipc/ipc.c - ${PROJECT_SOURCE_DIR}/include/nng/supplemental/ipc/ipc.h -) - -set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE) diff --git a/src/supplemental/ipc/ipc.c b/src/supplemental/ipc/ipc.c deleted file mode 100644 index cf78dfbd..00000000 --- a/src/supplemental/ipc/ipc.c +++ /dev/null @@ -1,138 +0,0 @@ -// -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <stddef.h> -#include <stdint.h> - -#include <nng/nng.h> -#include <nng/supplemental/ipc/ipc.h> - -#include "core/nng_impl.h" - -// This is our "public" IPC API. This allows applications to access -// basic IPC functions, using our AIO framework. Most applications will -// not need this. - -// We treat nng_ipc as nni_ipc_conn, nng_ipc_dialer as nni_ipc_dialer, -// and nng_ipc_listener as nni_ipc_listener. We cast through void to -// provide isolation of the names in a way that makes the compiler happy. -// It turns out we can pretty much just wrap the platform API for IPC that -// we have already created. - -void -nng_ipc_close(nng_ipc *ipc) -{ - nni_ipc_conn_close((void *) ipc); -} - -void -nng_ipc_free(nng_ipc *ipc) -{ - nni_ipc_conn_fini((void *) ipc); -} - -void -nng_ipc_send(nng_ipc *ipc, nng_aio *aio) -{ - nni_ipc_conn_send((void *) ipc, aio); -} - -void -nng_ipc_recv(nng_ipc *ipc, nng_aio *aio) -{ - nni_ipc_conn_recv((void *) ipc, aio); -} - -int -nng_ipc_setopt(nng_ipc *ipc, const char *name, const void *val, size_t sz) -{ - return ( - nni_ipc_conn_setopt((void *) ipc, name, val, sz, NNI_TYPE_OPAQUE)); -} - -int -nng_ipc_getopt(nng_ipc *ipc, const char *name, void *val, size_t *szp) -{ - return (nni_ipc_conn_getopt( - (void *) ipc, name, val, szp, NNI_TYPE_OPAQUE)); -} - -int -nng_ipc_dialer_alloc(nng_ipc_dialer **dp) -{ - nni_ipc_dialer *d; - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); - } - if ((rv = nni_ipc_dialer_init(&d)) == 0) { - *dp = (void *) d; - } - return (rv); -} - -void -nng_ipc_dialer_close(nng_ipc_dialer *d) -{ - nni_ipc_dialer_close((void *) d); -} - -void -nng_ipc_dialer_free(nng_ipc_dialer *d) -{ - nni_ipc_dialer_fini((void *) d); -} - -void -nng_ipc_dialer_dial(nng_ipc_dialer *d, const nng_sockaddr *sa, nng_aio *aio) -{ - nni_ipc_dialer_dial((void *) d, sa, aio); -} - -int -nng_ipc_listener_alloc(nng_ipc_listener **lp) -{ - nni_ipc_listener *l; - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); - } - if ((rv = nni_ipc_listener_init(&l)) == 0) { - *lp = (void *) l; - } - return (rv); -} - -void -nng_ipc_listener_close(nng_ipc_listener *l) -{ - nni_ipc_listener_close((void *) l); -} - -void -nng_ipc_listener_free(nng_ipc_listener *l) -{ - nni_ipc_listener_fini((void *) l); -} - -int -nng_ipc_listener_listen(nng_ipc_listener *l, const nng_sockaddr *sa) -{ - return (nni_ipc_listener_listen((void *) l, sa)); -} - -void -nng_ipc_listener_accept(nng_ipc_listener *l, nng_aio *aio) -{ - nni_ipc_listener_accept((void *) l, aio); -} diff --git a/src/supplemental/tcp/CMakeLists.txt b/src/supplemental/tcp/CMakeLists.txt index ef82b098..09f917f8 100644 --- a/src/supplemental/tcp/CMakeLists.txt +++ b/src/supplemental/tcp/CMakeLists.txt @@ -1,6 +1,6 @@ # # Copyright 2018 Capitar IT Group BV <info@capitar.com> -# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -8,8 +8,6 @@ # found online at https://opensource.org/licenses/MIT. # -set(_SRCS supplemental/tcp/tcp.c - ${PROJECT_SOURCE_DIR}/include/nng/supplemental/tcp/tcp.h -) +set(_SRCS supplemental/tcp/tcp.c) set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE) diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c index b8410b16..3ec396b8 100644 --- a/src/supplemental/tcp/tcp.c +++ b/src/supplemental/tcp/tcp.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -11,144 +11,428 @@ #include <stddef.h> #include <stdint.h> +#include <string.h> #include <nng/nng.h> -#include <nng/supplemental/tcp/tcp.h> #include "core/nng_impl.h" +#include "core/tcp.h" -// This is our "public" TCP API. This allows applications to access -// basic TCP functions, using our AIO framework. Most applications will -// not need this. +typedef struct { + nng_stream_dialer ops; + char * host; + char * port; + int af; // address family + bool closed; + nng_sockaddr sa; + nni_tcp_dialer * d; // platform dialer implementation + nni_aio * resaio; // resolver aio + nni_aio * conaio; // platform connection aio + nni_list resaios; + nni_list conaios; + nni_mtx mtx; +} tcp_dialer; -// We treat nng_tcp as nni_tcp_conn, nng_tcp_dialer as nni_tcp_dialer, -// and nng_tcp_listener as nni_tcp_listener. We cast through void to -// provide isolation of the names in a way that makes the compiler happy. -// It turns out we can pretty much just wrap the platform API for TCP that -// we have already created. - -void -nng_tcp_close(nng_tcp *tcp) +static void +tcp_dial_cancel(nni_aio *aio, void *arg, int rv) { - nni_tcp_conn_close((void *) tcp); -} + tcp_dialer *d = arg; -void -nng_tcp_free(nng_tcp *tcp) -{ - nni_tcp_conn_fini((void *) tcp); + nni_mtx_lock(&d->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + + if (nni_list_empty(&d->conaios)) { + nni_aio_abort(d->conaio, NNG_ECANCELED); + } + if (nni_list_empty(&d->resaios)) { + nni_aio_abort(d->resaio, NNG_ECANCELED); + } + } + nni_mtx_unlock(&d->mtx); } -void -nng_tcp_send(nng_tcp *tcp, nng_aio *aio) +static void +tcp_dial_res_cb(void *arg) { - nni_tcp_conn_send((void *) tcp, aio); + tcp_dialer *d = arg; + nni_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + if (d->closed || ((aio = nni_list_first(&d->resaios)) == NULL)) { + // ignore this. + while ((aio = nni_list_first(&d->resaios)) != NULL) { + nni_list_remove(&d->resaios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&d->mtx); + return; + } + + nni_list_remove(&d->resaios, aio); + + if ((rv = nni_aio_result(d->resaio)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nng_sockaddr sa; + nni_aio_get_sockaddr(d->resaio, &sa); + nni_aio_set_sockaddr(aio, &sa); + nni_list_append(&d->conaios, aio); + if (nni_list_first(&d->conaios) == aio) { + nni_aio_set_sockaddr(d->conaio, &sa); + nni_tcp_dial(d->d, d->conaio); + } + } + + if (!nni_list_empty(&d->resaios)) { + nni_tcp_resolv(d->host, d->port, d->af, 0, d->resaio); + } + nni_mtx_unlock(&d->mtx); } -void -nng_tcp_recv(nng_tcp *tcp, nng_aio *aio) +static void +tcp_dial_con_cb(void *arg) { - nni_tcp_conn_recv((void *) tcp, aio); + tcp_dialer *d = arg; + nng_aio * aio; + int rv; + + nni_mtx_lock(&d->mtx); + rv = nni_aio_result(d->conaio); + if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) { + if (rv == 0) { + // Make sure we discard the underlying connection. + nng_stream_free(nni_aio_get_output(d->conaio, 0)); + nni_aio_set_output(d->conaio, 0, NULL); + } + nni_mtx_unlock(&d->mtx); + return; + } + nni_list_remove(&d->conaios, aio); + if (rv != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_set_output(aio, 0, nni_aio_get_output(d->conaio, 0)); + nni_aio_finish(aio, 0, 0); + } + + if ((aio = nni_list_first(&d->conaios)) != NULL) { + nng_sockaddr sa; + nni_aio_get_sockaddr(aio, &sa); + nni_aio_set_sockaddr(d->conaio, &sa); + nni_tcp_dial(d->d, d->conaio); + } + nni_mtx_unlock(&d->mtx); } -int -nng_tcp_getopt(nng_tcp *tcp, const char *name, void *buf, size_t *szp) +static void +tcp_dialer_close(void *arg) { - return (nni_tcp_conn_getopt( - (void *) tcp, name, buf, szp, NNI_TYPE_OPAQUE)); + tcp_dialer *d = arg; + nni_aio * aio; + nni_mtx_lock(&d->mtx); + d->closed = true; + while ((aio = nni_list_first(&d->resaios)) != NULL) { + nni_list_remove(&d->resaios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&d->conaios)) != NULL) { + nni_list_remove(&d->conaios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_tcp_dialer_close(d->d); + nni_mtx_unlock(&d->mtx); } -int -nng_tcp_setopt(nng_tcp *tcp, const char *name, const void *buf, size_t sz) +static void +tcp_dialer_free(void *arg) { - return ( - nni_tcp_conn_setopt((void *) tcp, name, buf, sz, NNI_TYPE_OPAQUE)); + tcp_dialer *d = arg; + + if (d == NULL) { + return; + } + + if (d->d != NULL) { + nni_tcp_dialer_close(d->d); + nni_tcp_dialer_fini(d->d); + } + nni_strfree(d->host); + nni_strfree(d->port); + nni_aio_fini(d->resaio); + nni_aio_fini(d->conaio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); } -int -nng_tcp_dialer_alloc(nng_tcp_dialer **dp) +static void +tcp_dialer_dial(void *arg, nng_aio *aio) { - nni_tcp_dialer *d; - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); + tcp_dialer *d = arg; + int rv; + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; } - if ((rv = nni_tcp_dialer_init(&d)) == 0) { - *dp = (void *) d; + if (d->host != NULL) { + nni_list_append(&d->resaios, aio); + if (nni_list_first(&d->resaios) == aio) { + nni_tcp_resolv(d->host, d->port, d->af, 0, d->resaio); + } + } else { + nni_list_append(&d->conaios, aio); + if (nni_list_first(&d->conaios) == aio) { + nni_aio_set_sockaddr(d->conaio, &d->sa); + nni_tcp_dial(d->d, d->conaio); + } } - return (rv); + nni_mtx_unlock(&d->mtx); } -void -nng_tcp_dialer_close(nng_tcp_dialer *d) +static int +tcp_dialer_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - nni_tcp_dialer_close((void *) d); + tcp_dialer *d = arg; + return (nni_tcp_dialer_getopt(d->d, name, buf, szp, t)); } -void -nng_tcp_dialer_free(nng_tcp_dialer *d) +static int +tcp_dialer_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - nni_tcp_dialer_fini((void *) d); + tcp_dialer *d = arg; + return (nni_tcp_dialer_setopt(d->d, name, buf, sz, t)); } -void -nng_tcp_dialer_dial(nng_tcp_dialer *d, const nng_sockaddr *sa, nng_aio *aio) +static int +tcp_dialer_alloc(tcp_dialer **dp) { - nni_tcp_dialer_dial((void *) d, sa, aio); + int rv; + tcp_dialer *d; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->resaios); + nni_aio_list_init(&d->conaios); + + if (((rv = nni_aio_init(&d->resaio, tcp_dial_res_cb, d)) != 0) || + ((rv = nni_aio_init(&d->conaio, tcp_dial_con_cb, d)) != 0) || + ((rv = nni_tcp_dialer_init(&d->d)) != 0)) { + tcp_dialer_free(d); + return (rv); + } + + d->ops.sd_close = tcp_dialer_close; + d->ops.sd_free = tcp_dialer_free; + d->ops.sd_dial = tcp_dialer_dial; + d->ops.sd_getx = tcp_dialer_getx; + d->ops.sd_setx = tcp_dialer_setx; + + *dp = d; + return (0); } int -nng_tcp_listener_alloc(nng_tcp_listener **lp) +nni_tcp_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) { - nni_tcp_listener *l; - int rv; + tcp_dialer *d; + int rv; + const char *p; if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_tcp_listener_init(&l)) == 0) { - *lp = (void *) l; + + if ((rv = tcp_dialer_alloc(&d)) != 0) { + return (rv); } - return (rv); + + if (((p = url->u_port) == NULL) || (strlen(p) == 0)) { + p = nni_url_default_port(url->u_scheme); + } + + if ((strlen(p) == 0) || (strlen(url->u_hostname) == 0)) { + // Dialer needs both a destination hostname and port. + tcp_dialer_free(d); + return (NNG_EADDRINVAL); + } + + if (strchr(url->u_scheme, '4') != NULL) { + d->af = NNG_AF_INET; + } else if (strchr(url->u_scheme, '6') != NULL) { + d->af = NNG_AF_INET6; + } else { + d->af = NNG_AF_UNSPEC; + } + + if (((d->host = nng_strdup(url->u_hostname)) == NULL) || + ((d->port = nng_strdup(p)) == NULL)) { + tcp_dialer_free(d); + return (NNG_ENOMEM); + } + + *dp = (void *) d; + return (0); } -void -nng_tcp_listener_close(nng_tcp_listener *l) +typedef struct { + nng_stream_listener ops; + nni_tcp_listener * l; + nng_sockaddr sa; +} tcp_listener; + +static void +tcp_listener_close(void *arg) { - nni_tcp_listener_close((void *) l); + tcp_listener *l = arg; + nni_tcp_listener_close(l->l); } -void -nng_tcp_listener_free(nng_tcp_listener *l) +static void +tcp_listener_free(void *arg) { - nni_tcp_listener_fini((void *) l); + tcp_listener *l = arg; + nni_tcp_listener_fini(l->l); + NNI_FREE_STRUCT(l); } -int -nng_tcp_listener_listen(nng_tcp_listener *l, const nng_sockaddr *sa) +static int +tcp_listener_listen(void *arg) +{ + tcp_listener *l = arg; + return (nni_tcp_listener_listen(l->l, &l->sa)); +} + +static void +tcp_listener_accept(void *arg, nng_aio *aio) { - return (nni_tcp_listener_listen((void *) l, sa)); + tcp_listener *l = arg; + nni_tcp_listener_accept(l->l, aio); } -void -nng_tcp_listener_accept(nng_tcp_listener *l, nng_aio *aio) +static int +tcp_listener_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - nni_tcp_listener_accept((void *) l, aio); + tcp_listener *l = arg; + return (nni_tcp_listener_getopt(l->l, name, buf, szp, t)); +} + +static int +tcp_listener_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + tcp_listener *l = arg; + return (nni_tcp_listener_setopt(l->l, name, buf, sz, t)); +} + +static int +tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa) +{ + tcp_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_tcp_listener_init(&l->l)) != 0) { + NNI_FREE_STRUCT(l); + return (rv); + } + l->sa = *sa; + + l->ops.sl_free = tcp_listener_free; + l->ops.sl_close = tcp_listener_close; + l->ops.sl_listen = tcp_listener_listen; + l->ops.sl_accept = tcp_listener_accept; + l->ops.sl_getx = tcp_listener_getx; + l->ops.sl_setx = tcp_listener_setx; + + *lp = (void *) l; + return (0); } int -nng_tcp_listener_getopt( - nng_tcp_listener *l, const char *name, void *buf, size_t *szp) +nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url) { - return (nni_tcp_listener_getopt( - (void *) l, name, buf, szp, NNI_TYPE_OPAQUE)); + nni_aio * aio; + int af; + int rv; + nng_sockaddr sa; + const char * h; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if (strchr(url->u_scheme, '4') != NULL) { + af = NNG_AF_INET; + } else if (strchr(url->u_scheme, '6') != NULL) { + af = NNG_AF_INET6; + } else { + af = NNG_AF_UNSPEC; + } + + if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) { + return (rv); + } + + h = url->u_hostname; + + // Wildcard special case, which means bind to INADDR_ANY. + if ((h != NULL) && ((strcmp(h, "*") == 0) || (strlen(h) == 0))) { + h = NULL; + } + nni_tcp_resolv(h, url->u_port, af, 1, aio); + nni_aio_wait(aio); + + if ((rv = nni_aio_result(aio)) != 0) { + nni_aio_fini(aio); + return (rv); + } + nni_aio_get_sockaddr(aio, &sa); + nni_aio_fini(aio); + + return (tcp_listener_alloc_addr(lp, &sa)); } +static int +tcp_check_bool(const void *val, size_t sz, nni_type t) +{ + return (nni_copyin_bool(NULL, val, sz, t)); +} + +static const nni_chkoption tcp_chkopts[] = { + { + .o_name = NNG_OPT_TCP_KEEPALIVE, + .o_check = tcp_check_bool, + }, + { + .o_name = NNG_OPT_TCP_NODELAY, + .o_check = tcp_check_bool, + }, + { + .o_name = NULL, + }, +}; + int -nng_tcp_listener_setopt( - nng_tcp_listener *l, const char *name, const void *buf, size_t sz) +nni_tcp_checkopt(const char *name, const void *data, size_t sz, nni_type t) { - return (nni_tcp_listener_setopt( - (void *) l, name, buf, sz, NNI_TYPE_OPAQUE)); -}
\ No newline at end of file + return (nni_chkopt(tcp_chkopts, name, data, sz, t)); +} diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index 9f1e8f83..d3816747 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -28,10 +28,9 @@ #include "mbedtls/ssl.h" #include "core/nng_impl.h" +#include "core/tcp.h" #include "supplemental/tls/tls_api.h" -#include <nng/supplemental/tls/tls.h> - // Implementation note. This implementation buffers data between the TLS // encryption layer (mbedTLS) and the underlying TCP socket. As a result, // there may be some additional latency caused by buffer draining and @@ -59,8 +58,9 @@ typedef struct nni_tls_certkey { nni_list_node node; } nni_tls_certkey; -struct nni_tls { - nni_tcp_conn * tcp; +typedef struct { + nni_tls_common com; + nng_stream * tcp; mbedtls_ssl_context ctx; nng_tls_config * cfg; // kept so we can release it nni_mtx lk; @@ -81,7 +81,7 @@ struct nni_tls { nni_list sends; // upper side sends nni_list recvs; // upper recv aios nni_aio * handshake; // handshake aio (upper) -}; +} tls; struct nng_tls_config { mbedtls_ssl_config cfg_ctx; @@ -100,18 +100,18 @@ struct nng_tls_config { nni_list certkeys; }; -static void nni_tls_send_cb(void *); -static void nni_tls_recv_cb(void *); +static void tls_send_cb(void *); +static void tls_recv_cb(void *); -static void nni_tls_do_send(nni_tls *); -static void nni_tls_do_recv(nni_tls *); -static void nni_tls_do_handshake(nni_tls *); +static void tls_do_send(tls *); +static void tls_do_recv(tls *); +static void tls_do_handshake(tls *); -static int nni_tls_net_send(void *, const unsigned char *, size_t); -static int nni_tls_net_recv(void *, unsigned char *, size_t); +static int tls_net_send(void *, const unsigned char *, size_t); +static int tls_net_recv(void *, unsigned char *, size_t); static void -nni_tls_dbg(void *ctx, int level, const char *file, int line, const char *s) +tls_dbg(void *ctx, int level, const char *file, int line, const char *s) { char buf[128]; NNI_ARG_UNUSED(ctx); @@ -121,7 +121,7 @@ nni_tls_dbg(void *ctx, int level, const char *file, int line, const char *s) } static int -nni_tls_get_entropy(void *arg, unsigned char *buf, size_t len) +tls_get_entropy(void *arg, unsigned char *buf, size_t len) { NNI_ARG_UNUSED(arg); while (len) { @@ -137,7 +137,7 @@ nni_tls_get_entropy(void *arg, unsigned char *buf, size_t len) } static int -nni_tls_random(void *arg, unsigned char *buf, size_t sz) +tls_random(void *arg, unsigned char *buf, size_t sz) { #ifdef NNG_TLS_USE_CTR_DRBG int rv; @@ -149,7 +149,7 @@ nni_tls_random(void *arg, unsigned char *buf, size_t sz) nni_mtx_unlock(&cfg->rng_lk); return (rv); #else - return (nni_tls_get_entropy(arg, buf, sz)); + return (tls_get_entropy(arg, buf, sz)); #endif } @@ -230,15 +230,15 @@ nni_tls_config_init(nng_tls_config **cpp, enum nng_tls_mode mode) #ifdef NNG_TLS_USE_CTR_DRBG mbedtls_ctr_drbg_init(&cfg->rng_ctx); rv = mbedtls_ctr_drbg_seed( - &cfg->rng_ctx, nni_tls_get_entropy, NULL, NULL, 0); + &cfg->rng_ctx, tls_get_entropy, NULL, NULL, 0); if (rv != 0) { nni_tls_config_fini(cfg); return (rv); } #endif - mbedtls_ssl_conf_rng(&cfg->cfg_ctx, nni_tls_random, cfg); + mbedtls_ssl_conf_rng(&cfg->cfg_ctx, tls_random, cfg); - mbedtls_ssl_conf_dbg(&cfg->cfg_ctx, nni_tls_dbg, cfg); + mbedtls_ssl_conf_dbg(&cfg->cfg_ctx, tls_dbg, cfg); *cpp = cfg; return (0); @@ -252,41 +252,12 @@ nni_tls_config_hold(nng_tls_config *cfg) nni_mtx_unlock(&cfg->lk); } -void -nni_tls_fini(nni_tls *tp) -{ - // Shut it all down first. - if (tp != NULL) { - if (tp->tcp) { - nni_tcp_conn_close(tp->tcp); - } - nni_aio_stop(tp->tcp_send); - nni_aio_stop(tp->tcp_recv); - - // And finalize / free everything. - if (tp->tcp) { - nni_tcp_conn_fini(tp->tcp); - } - nni_aio_fini(tp->tcp_send); - nni_aio_fini(tp->tcp_recv); - mbedtls_ssl_free(&tp->ctx); - nni_mtx_fini(&tp->lk); - nni_free(tp->recvbuf, NNG_TLS_MAX_RECV_SIZE); - nni_free(tp->sendbuf, NNG_TLS_MAX_RECV_SIZE); - if (tp->cfg != NULL) { - // release the hold we got on it - nni_tls_config_fini(tp->cfg); - } - NNI_FREE_STRUCT(tp); - } -} - -// nni_tls_mkerr converts an mbed error to an NNG error. In all cases +// tls_mkerr converts an mbed error to an NNG error. In all cases // we just encode with NNG_ETRANERR. static struct { int tls; int nng; -} nni_tls_errs[] = { +} tls_errs[] = { { MBEDTLS_ERR_SSL_NO_CLIENT_CERTIFICATE, NNG_EPEERAUTH }, { MBEDTLS_ERR_SSL_CA_CHAIN_REQUIRED, NNG_EPEERAUTH }, { MBEDTLS_ERR_SSL_PEER_VERIFY_FAILED, NNG_EPEERAUTH }, @@ -300,60 +271,74 @@ static struct { }; static int -nni_tls_mkerr(int err) +tls_mkerr(int err) { - for (int i = 0; nni_tls_errs[i].tls != 0; i++) { - if (nni_tls_errs[i].tls == err) { - return (nni_tls_errs[i].nng); + for (int i = 0; tls_errs[i].tls != 0; i++) { + if (tls_errs[i].tls == err) { + return (tls_errs[i].nng); } } return (NNG_ECRYPTO); } -int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) +// The common code should call this only after it has released +// it's upper layer stuff. +static void +tls_free(void *arg) { - nni_tls *tp; - int rv; - bool on = true; + tls *tls = arg; + + // Shut it all down first. + if (tls != NULL) { + if (tls->tcp != NULL) { + nng_stream_close(tls->tcp); + } + nni_aio_stop(tls->tcp_send); + nni_aio_stop(tls->tcp_recv); - // During the handshake, disable Nagle to shorten the - // negotiation. Once things are set up the caller can - // re-enable Nagle if so desired. - (void) nni_tcp_conn_setopt( - tcp, NNG_OPT_TCP_NODELAY, &on, sizeof(on), NNI_TYPE_BOOL); + nni_aio_fini(tls->com.aio); + nng_tls_config_free(tls->com.cfg); - if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) { - return (NNG_ENOMEM); + // And finalize / free everything. + nng_stream_free(tls->tcp); + nni_aio_fini(tls->tcp_send); + nni_aio_fini(tls->tcp_recv); + mbedtls_ssl_free(&tls->ctx); + if (tls->recvbuf != NULL) { + nni_free(tls->recvbuf, NNG_TLS_MAX_RECV_SIZE); + } + if (tls->sendbuf != NULL) { + nni_free(tls->sendbuf, NNG_TLS_MAX_RECV_SIZE); + } + nni_mtx_fini(&tls->lk); + NNI_FREE_STRUCT(tls); } +} + +int +nni_tls_start(nng_stream *arg, nng_stream *tcp) +{ + tls * tp = (void *) arg; + nng_tls_config *cfg = tp->com.cfg; + int rv; + if ((tp->recvbuf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) == NULL) { - NNI_FREE_STRUCT(tp); return (NNG_ENOMEM); } if ((tp->sendbuf = nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) == NULL) { - nni_free(tp->sendbuf, NNG_TLS_MAX_RECV_SIZE); - NNI_FREE_STRUCT(tp); return (NNG_ENOMEM); } nni_mtx_lock(&cfg->lk); // No more changes allowed to config. cfg->active = true; - cfg->refcnt++; - tp->cfg = cfg; nni_mtx_unlock(&cfg->lk); - nni_aio_list_init(&tp->sends); - nni_aio_list_init(&tp->recvs); - nni_mtx_init(&tp->lk); mbedtls_ssl_init(&tp->ctx); - mbedtls_ssl_set_bio( - &tp->ctx, tp, nni_tls_net_send, nni_tls_net_recv, NULL); + mbedtls_ssl_set_bio(&tp->ctx, tp, tls_net_send, tls_net_recv, NULL); if ((rv = mbedtls_ssl_setup(&tp->ctx, &cfg->cfg_ctx)) != 0) { - rv = nni_tls_mkerr(rv); - nni_tls_fini(tp); - return (rv); + return (tls_mkerr(rv)); } if (cfg->server_name) { @@ -362,25 +347,23 @@ nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) tp->tcp = tcp; - if (((rv = nni_aio_init(&tp->tcp_send, nni_tls_send_cb, tp)) != 0) || - ((rv = nni_aio_init(&tp->tcp_recv, nni_tls_recv_cb, tp)) != 0)) { - nni_tls_fini(tp); + if (((rv = nni_aio_init(&tp->tcp_send, tls_send_cb, tp)) != 0) || + ((rv = nni_aio_init(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) { return (rv); } nni_mtx_lock(&tp->lk); // Kick off a handshake operation. - nni_tls_do_handshake(tp); + tls_do_handshake(tp); nni_mtx_unlock(&tp->lk); - *tpp = tp; return (0); } static void -nni_tls_cancel(nni_aio *aio, void *arg, int rv) +tls_cancel(nni_aio *aio, void *arg, int rv) { - nni_tls *tp = arg; + tls *tp = arg; nni_mtx_lock(&tp->lk); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); @@ -389,33 +372,16 @@ nni_tls_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&tp->lk); } +// tls_send_cb is called when the underlying TCP send completes. static void -nni_tls_fail(nni_tls *tp, int rv) -{ - nni_aio *aio; - tp->tls_closed = true; - nni_tcp_conn_close(tp->tcp); - tp->tcp_closed = true; - while ((aio = nni_list_first(&tp->recvs)) != NULL) { - nni_list_remove(&tp->recvs, aio); - nni_aio_finish_error(aio, rv); - } - while ((aio = nni_list_first(&tp->sends)) != NULL) { - nni_list_remove(&tp->recvs, aio); - nni_aio_finish_error(aio, rv); - } -} - -// nni_tls_send_cb is called when the underlying TCP send completes. -static void -nni_tls_send_cb(void *ctx) +tls_send_cb(void *ctx) { - nni_tls *tp = ctx; + tls * tp = ctx; nni_aio *aio = tp->tcp_send; nni_mtx_lock(&tp->lk); if (nni_aio_result(aio) != 0) { - nni_tcp_conn_close(tp->tcp); + nng_stream_close(tp->tcp); tp->tcp_closed = true; } else { size_t n = nni_aio_count(aio); @@ -428,25 +394,24 @@ nni_tls_send_cb(void *ctx) iov.iov_len = tp->sendlen; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(aio, NNG_DURATION_INFINITE); - nni_tcp_conn_send(tp->tcp, aio); + nng_stream_send(tp->tcp, aio); nni_mtx_unlock(&tp->lk); return; } tp->sendoff = 0; tp->sending = false; } - if (!tp->hsdone) { - nni_tls_do_handshake(tp); - } + + tls_do_handshake(tp); if (tp->hsdone) { - nni_tls_do_send(tp); - nni_tls_do_recv(tp); + tls_do_send(tp); + tls_do_recv(tp); } nni_mtx_unlock(&tp->lk); } static void -nni_tls_recv_start(nni_tls *tp) +tls_recv_start(tls *tp) { nni_aio *aio; nni_iov iov; @@ -467,13 +432,13 @@ nni_tls_recv_start(nni_tls *tp) iov.iov_len = NNG_TLS_MAX_RECV_SIZE; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE); - nni_tcp_conn_recv(tp->tcp, aio); + nng_stream_recv(tp->tcp, aio); } static void -nni_tls_recv_cb(void *ctx) +tls_recv_cb(void *ctx) { - nni_tls *tp = ctx; + tls * tp = ctx; nni_aio *aio = tp->tcp_recv; nni_mtx_lock(&tp->lk); @@ -481,7 +446,7 @@ nni_tls_recv_cb(void *ctx) if (nni_aio_result(aio) != 0) { // Close the underlying TCP channel, but permit data we // already received to continue to be received. - nni_tcp_conn_close(tp->tcp); + nng_stream_close(tp->tcp); tp->tcp_closed = true; } else { NNI_ASSERT(tp->recvlen == 0); @@ -492,12 +457,10 @@ nni_tls_recv_cb(void *ctx) // If we were closed (above), the upper layer will detect and // react properly. Otherwise the upper layer will consume // data. - if (!tp->hsdone) { - nni_tls_do_handshake(tp); - } + tls_do_handshake(tp); if (tp->hsdone) { - nni_tls_do_recv(tp); - nni_tls_do_send(tp); + tls_do_recv(tp); + tls_do_send(tp); } nni_mtx_unlock(&tp->lk); @@ -511,10 +474,10 @@ nni_tls_recv_cb(void *ctx) // ridiculous over queueing. This is always called with the pipe // lock held, and never blocks. static int -nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len) +tls_net_send(void *ctx, const unsigned char *buf, size_t len) { - nni_tls *tp = ctx; - nni_iov iov; + tls * tp = ctx; + nni_iov iov; if (len > NNG_TLS_MAX_SEND_SIZE) { len = NNG_TLS_MAX_SEND_SIZE; @@ -538,22 +501,24 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len) iov.iov_len = len; nni_aio_set_iov(tp->tcp_send, 1, &iov); nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE); - nni_tcp_conn_send(tp->tcp, tp->tcp_send); + nng_stream_send(tp->tcp, tp->tcp_send); return (len); } static int -nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len) +tls_net_recv(void *ctx, unsigned char *buf, size_t len) { - nni_tls *tp = ctx; + tls *tp = ctx; // We should already be running with the pipe lock held, // as we are running in that context. - if (tp->tcp_closed && tp->recvlen == 0) { - return (MBEDTLS_ERR_NET_RECV_FAILED); - } if (tp->recvlen == 0) { + if (tp->tcp_closed) { + // The underlying TCP transport has closed, and we + // have no more data in our receive buffer. + return (MBEDTLS_ERR_NET_RECV_FAILED); + } len = MBEDTLS_ERR_SSL_WANT_READ; } else { if (len > tp->recvlen) { @@ -564,16 +529,15 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len) tp->recvlen -= len; } - nni_tls_recv_start(tp); + tls_recv_start(tp); return ((int) len); } -// nni_tls_send is the exported send function. It has a similar -// calling convention as the platform TCP pipe. -void -nni_tls_send(nni_tls *tp, nni_aio *aio) +static void +tls_send(void *arg, nni_aio *aio) { - int rv; + int rv; + tls *tp = arg; if (nni_aio_begin(aio) != 0) { return; @@ -584,20 +548,21 @@ nni_tls_send(nni_tls *tp, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) { + if ((rv = nni_aio_schedule(aio, tls_cancel, tp)) != 0) { nni_mtx_unlock(&tp->lk); nni_aio_finish_error(aio, rv); return; } nni_list_append(&tp->sends, aio); - nni_tls_do_send(tp); + tls_do_send(tp); nni_mtx_unlock(&tp->lk); } -void -nni_tls_recv(nni_tls *tp, nni_aio *aio) +static void +tls_recv(void *arg, nni_aio *aio) { - int rv; + int rv; + tls *tp = arg; if (nni_aio_begin(aio) != 0) { return; @@ -608,22 +573,22 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) { + if ((rv = nni_aio_schedule(aio, tls_cancel, tp)) != 0) { nni_mtx_unlock(&tp->lk); nni_aio_finish_error(aio, rv); return; } nni_list_append(&tp->recvs, aio); - nni_tls_do_recv(tp); + tls_do_recv(tp); nni_mtx_unlock(&tp->lk); } static int tls_get_verified(void *arg, void *buf, size_t *szp, nni_type t) { - nni_tls *tp = arg; - bool v = (mbedtls_ssl_get_verify_result(&tp->ctx) == 0); + tls *tp = arg; + bool v = (mbedtls_ssl_get_verify_result(&tp->ctx) == 0); return (nni_copyout_bool(v, buf, szp, t)); } @@ -638,26 +603,28 @@ static const nni_option tls_options[] = { }, }; -int -nni_tls_setopt( - nni_tls *tp, const char *name, const void *buf, size_t sz, nni_type t) +static int +tls_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - int rv; + tls * tp = arg; + int rv; + nng_stream *tcp; - if ((rv = nni_tcp_conn_setopt(tp->tcp, name, buf, sz, t)) != - NNG_ENOTSUP) { + tcp = (tp != NULL) ? tp->tcp : NULL; + + if ((rv = nni_stream_setx(tcp, name, buf, sz, t)) != NNG_ENOTSUP) { return (rv); } return (nni_setopt(tls_options, name, tp, buf, sz, t)); } -int -nni_tls_getopt( - nni_tls *tp, const char *name, void *buf, size_t *szp, nni_type t) +static int +tls_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - int rv; + tls *tp = arg; + int rv; - if ((rv = nni_tcp_conn_getopt(tp->tcp, name, buf, szp, t)) != + if ((rv = nni_stream_getx(tp->tcp, name, buf, szp, t)) != NNG_ENOTSUP) { return (rv); } @@ -665,11 +632,12 @@ nni_tls_getopt( } static void -nni_tls_do_handshake(nni_tls *tp) +tls_do_handshake(tls *tp) { - int rv; + int rv; + nni_aio *aio; - if (tp->tls_closed) { + if (tp->hsdone || tp->tls_closed) { return; } rv = mbedtls_ssl_handshake(&tp->ctx); @@ -686,15 +654,24 @@ nni_tls_do_handshake(nni_tls *tp) default: // some other error occurred, this causes us to tear it down - nni_tls_fail(tp, nni_tls_mkerr(rv)); + nng_stream_close(tp->tcp); + tp->tls_closed = true; + tp->tcp_closed = true; + rv = tls_mkerr(rv); + + while (((aio = nni_list_first(&tp->recvs)) != NULL) || + ((aio = nni_list_first(&tp->sends)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } } } -// nni_tls_do_send is called to try to send more data if we have not +// tls_do_send is called to try to send more data if we have not // yet completed the I/O. It also completes any transactions that // *have* completed. It must be called with the lock held. static void -nni_tls_do_send(nni_tls *tp) +tls_do_send(tls *tp) { nni_aio *aio; @@ -732,7 +709,7 @@ nni_tls_do_send(nni_tls *tp) // Want better diagnostics. nni_aio_list_remove(aio); if (n < 0) { - nni_aio_finish_error(aio, nni_tls_mkerr(n)); + nni_aio_finish_error(aio, tls_mkerr(n)); } else { nni_aio_finish(aio, 0, n); } @@ -740,7 +717,7 @@ nni_tls_do_send(nni_tls *tp) } static void -nni_tls_do_recv(nni_tls *tp) +tls_do_recv(tls *tp) { nni_aio *aio; @@ -777,16 +754,17 @@ nni_tls_do_recv(nni_tls *tp) nni_aio_list_remove(aio); if (n < 0) { - nni_aio_finish_error(aio, nni_tls_mkerr(n)); + nni_aio_finish_error(aio, tls_mkerr(n)); } else { nni_aio_finish(aio, 0, n); } } } -void -nni_tls_close(nni_tls *tp) +static void +tls_close(void *arg) { + tls * tp = arg; nni_aio *aio; nni_aio_close(tp->tcp_send); @@ -795,12 +773,8 @@ nni_tls_close(nni_tls *tp) nni_mtx_lock(&tp->lk); tp->tls_closed = true; - while ((aio = nni_list_first(&tp->sends)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - - while ((aio = nni_list_first(&tp->recvs)) != NULL) { + while (((aio = nni_list_first(&tp->sends)) != NULL) || + ((aio = nni_list_first(&tp->recvs)) != NULL)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } @@ -813,11 +787,36 @@ nni_tls_close(nni_tls *tp) // connection at this point. (void) mbedtls_ssl_close_notify(&tp->ctx); } else { - nni_tcp_conn_close(tp->tcp); + nng_stream_close(tp->tcp); } nni_mtx_unlock(&tp->lk); } +// This allocates a TLS structure, that can be used by the caller. +// The reason we have this API is so that the base structure can be +// embedded in the parent structure. +int +nni_tls_alloc(nng_stream **tlsp) +{ + tls *tp; + + if ((tp = NNI_ALLOC_STRUCT(tp)) == NULL) { + return (NNG_ENOMEM); + } + nni_aio_list_init(&tp->sends); + nni_aio_list_init(&tp->recvs); + nni_mtx_init(&tp->lk); + tp->com.ops.s_close = tls_close; + tp->com.ops.s_free = tls_free; + tp->com.ops.s_send = tls_send; + tp->com.ops.s_recv = tls_recv; + tp->com.ops.s_getx = tls_getx; + tp->com.ops.s_setx = tls_setx; + + *tlsp = (void *) tp; + return (0); +} + int nng_tls_config_server_name(nng_tls_config *cfg, const char *name) { @@ -882,14 +881,14 @@ nng_tls_config_ca_chain( pem = (const uint8_t *) certs; len = strlen(certs) + 1; if ((rv = mbedtls_x509_crt_parse(&cfg->ca_certs, pem, len)) != 0) { - rv = nni_tls_mkerr(rv); + rv = tls_mkerr(rv); goto err; } if (crl != NULL) { pem = (const uint8_t *) crl; len = strlen(crl) + 1; if ((rv = mbedtls_x509_crl_parse(&cfg->crl, pem, len)) != 0) { - rv = nni_tls_mkerr(rv); + rv = tls_mkerr(rv); goto err; } } @@ -919,7 +918,7 @@ nng_tls_config_own_cert( pem = (const uint8_t *) cert; len = strlen(cert) + 1; if ((rv = mbedtls_x509_crt_parse(&ck->crt, pem, len)) != 0) { - rv = nni_tls_mkerr(rv); + rv = tls_mkerr(rv); goto err; } @@ -928,7 +927,7 @@ nng_tls_config_own_cert( rv = mbedtls_pk_parse_key(&ck->key, pem, len, (const uint8_t *) pass, pass != NULL ? strlen(pass) : 0); if (rv != 0) { - rv = nni_tls_mkerr(rv); + rv = tls_mkerr(rv); goto err; } @@ -941,7 +940,7 @@ nng_tls_config_own_cert( rv = mbedtls_ssl_conf_own_cert(&cfg->cfg_ctx, &ck->crt, &ck->key); if (rv != 0) { nni_mtx_unlock(&cfg->lk); - rv = nni_tls_mkerr(rv); + rv = tls_mkerr(rv); goto err; } diff --git a/src/supplemental/tls/none/tls.c b/src/supplemental/tls/none/tls.c index 257bb6b1..a1e70b73 100644 --- a/src/supplemental/tls/none/tls.c +++ b/src/supplemental/tls/none/tls.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -20,8 +20,6 @@ #include "core/nng_impl.h" #include "supplemental/tls/tls_api.h" -#include <nng/supplemental/tls/tls.h> - void nni_tls_config_fini(nng_tls_config *cfg) { @@ -42,68 +40,6 @@ nni_tls_config_hold(nng_tls_config *cfg) NNI_ARG_UNUSED(cfg); } -void -nni_tls_fini(nni_tls *tp) -{ - NNI_ARG_UNUSED(tp); -} - -int -nni_tls_init(nni_tls **tpp, nng_tls_config *cfg, nni_tcp_conn *tcp) -{ - NNI_ARG_UNUSED(tpp); - NNI_ARG_UNUSED(cfg); - NNI_ARG_UNUSED(tcp); - - return (NNG_ENOTSUP); -} - -// nni_tls_send is the exported send function. It has a similar -// calling convention as the platform TCP pipe. -void -nni_tls_send(nni_tls *tp, nni_aio *aio) -{ - NNI_ARG_UNUSED(tp); - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -void -nni_tls_recv(nni_tls *tp, nni_aio *aio) -{ - NNI_ARG_UNUSED(tp); - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -void -nni_tls_close(nni_tls *tp) -{ - NNI_ARG_UNUSED(tp); -} - -int -nni_tls_getopt( - nni_tls *tp, const char *name, void *buf, size_t *szp, nni_type t) -{ - NNI_ARG_UNUSED(tp); - NNI_ARG_UNUSED(name); - NNI_ARG_UNUSED(buf); - NNI_ARG_UNUSED(szp); - NNI_ARG_UNUSED(t); - return (NNG_ENOTSUP); -} - -int -nni_tls_setopt( - nni_tls *tp, const char *name, const void *buf, size_t sz, nni_type t) -{ - NNI_ARG_UNUSED(tp); - NNI_ARG_UNUSED(name); - NNI_ARG_UNUSED(buf); - NNI_ARG_UNUSED(sz); - NNI_ARG_UNUSED(t); - return (NNG_ENOTSUP); -} - int nng_tls_config_server_name(nng_tls_config *cfg, const char *name) { @@ -190,3 +126,29 @@ nng_tls_config_free(nng_tls_config *cfg) { NNI_ARG_UNUSED(cfg); } + +int +nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + return (NNG_ENOTSUP); +} + +int +nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + NNI_ARG_UNUSED(lp); + NNI_ARG_UNUSED(url); + return (NNG_ENOTSUP); +} + +int +nni_tls_checkopt(const char *nm, const void *buf, size_t sz, nni_type t) +{ + NNI_ARG_UNUSED(nm); + NNI_ARG_UNUSED(buf); + NNI_ARG_UNUSED(sz); + NNI_ARG_UNUSED(t); + return (NNG_ENOTSUP); +}
\ No newline at end of file diff --git a/src/supplemental/tls/tls_api.h b/src/supplemental/tls/tls_api.h index 22dd68a0..4e6146b1 100644 --- a/src/supplemental/tls/tls_api.h +++ b/src/supplemental/tls/tls_api.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -12,12 +12,28 @@ #ifndef NNG_SUPPLEMENTAL_TLS_TLS_API_H #define NNG_SUPPLEMENTAL_TLS_TLS_API_H -#include <stdbool.h> - #include <nng/supplemental/tls/tls.h> -// nni_tls represents the context for a single TLS stream. -typedef struct nni_tls nni_tls; +// This nni_tls_common structure represents the "base" structure for +// an implementation to extend. One of these must be the first member +// of the implementation specific TLS stream struct. +typedef struct { + nng_stream ops; + nni_aio * aio; // system aio for connect/accept + nni_aio * uaio; // user aio for connect/accept + nng_tls_config *cfg; +} nni_tls_common; + +// The implementation supplies this function to create the TLS connection +// object. All fields will be zeroed. +extern int nni_tls_alloc(nng_stream **); +extern int nni_tls_dialer_alloc(nng_stream_dialer **, const nng_url *); +extern int nni_tls_listener_alloc(nng_stream_listener **, const nng_url *); +extern int nni_tls_checkopt(const char *, const void *, size_t, nni_type); + +// nni_tls_start is called by the common TLS dialer/listener completions +// to start the TLS stream activity. This may also do allocations, etc. +extern int nni_tls_start(nng_stream *, nng_stream *); // nni_tls_config_init creates a new TLS configuration object. // The object is created with a reference count of one. @@ -34,30 +50,4 @@ extern void nni_tls_config_fini(nng_tls_config *); // the configuration object is created with a hold on it. extern void nni_tls_config_hold(nng_tls_config *); -extern int nni_tls_init(nni_tls **, nng_tls_config *, nni_tcp_conn *); -extern void nni_tls_close(nni_tls *); -extern void nni_tls_fini(nni_tls *); -extern void nni_tls_send(nni_tls *, nng_aio *); -extern void nni_tls_recv(nni_tls *, nng_aio *); - -extern int nni_tls_setopt( - nni_tls *, const char *, const void *, size_t, nni_type); -extern int nni_tls_getopt(nni_tls *, const char *, void *, size_t *, nni_type); - -extern int nni_tls_set( - nng_tls *, const char *, const void *, size_t, nni_type); -extern int nni_tls_get(nng_tls *, const char *, void *, size_t *, nni_type); - -extern int nni_tls_dialer_setopt( - nng_tls_dialer *, const char *, const void *, size_t, nni_type); - -extern int nni_tls_dialer_getopt( - nng_tls_dialer *, const char *, void *, size_t *, nni_type); - -extern int nni_tls_listener_setopt( - nng_tls_listener *, const char *, const void *, size_t, nni_type); - -extern int nni_tls_listener_getopt( - nng_tls_listener *, const char *, void *, size_t *, nni_type); - #endif // NNG_SUPPLEMENTAL_TLS_TLS_API_H diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index f93ca0ba..990d3add 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -15,9 +15,9 @@ #include <string.h> #include "core/nng_impl.h" +#include "core/tcp.h" #include "supplemental/tls/tls_api.h" -#include <nng/supplemental/tcp/tcp.h> #include <nng/supplemental/tls/tls.h> // This file contains common code for TLS, and is only compiled if we @@ -25,126 +25,29 @@ // parts of TLS support that are invariant relative to different TLS // libraries, such as dialer and listener support. -struct nng_tls_s { - nni_tls * c; - nni_aio * aio; // system aio for connect/accept - nni_aio * uaio; // user aio for connect/accept - nng_tls_config *cfg; -}; - -// We use a union and share an "endpoint" for both dialers and listeners. -// This allows us to reuse the bulk of the code for things like option -// handlers for both dialers and listeners. -typedef union tls_tcp_ep_u { - nni_tcp_dialer * d; - nni_tcp_listener *l; -} tls_tcp_ep; - -typedef struct nng_tls_ep_s { - tls_tcp_ep tcp; - nng_tls_config *cfg; - nni_mtx lk; -} tls_ep; - -void -nng_tls_close(nng_tls *tls) -{ - nni_tls_close(tls->c); -} +typedef struct { + nng_stream_dialer ops; + nng_stream_dialer *d; // underlying TCP dialer + nng_tls_config * cfg; + nni_mtx lk; // protects the config +} tls_dialer; -void -nng_tls_free(nng_tls *tls) -{ - if (tls != NULL) { - nni_tls_fini(tls->c); - nni_aio_fini(tls->aio); - nng_tls_config_free(tls->cfg); - NNI_FREE_STRUCT(tls); - } -} - -void -nng_tls_send(nng_tls *tls, nng_aio *aio) -{ - nni_tls_send(tls->c, aio); -} - -void -nng_tls_recv(nng_tls *tls, nng_aio *aio) -{ - nni_tls_recv(tls->c, aio); -} - -int -nni_tls_get(nng_tls *tls, const char *name, void *buf, size_t *szp, nni_type t) -{ - return (nni_tls_getopt(tls->c, name, buf, szp, t)); -} - -int -nni_tls_set( - nng_tls *tls, const char *name, const void *buf, size_t sz, nni_type t) -{ - return (nni_tls_setopt(tls->c, name, buf, sz, t)); -} - -int -nng_tls_getopt(nng_tls *tls, const char *name, void *buf, size_t *szp) -{ - return (nni_tls_getopt(tls->c, name, buf, szp, NNI_TYPE_OPAQUE)); -} - -int -nng_tls_setopt(nng_tls *tls, const char *name, const void *buf, size_t sz) -{ - return (nni_tls_setopt(tls->c, name, buf, sz, NNI_TYPE_OPAQUE)); -} - -int -nng_tls_dialer_alloc(nng_tls_dialer **dp) -{ - tls_ep *ep; - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); - } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&ep->lk); - - if ((rv = nni_tcp_dialer_init(&ep->tcp.d)) != 0) { - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); - return (rv); - } - if ((rv = nng_tls_config_alloc(&ep->cfg, NNG_TLS_MODE_CLIENT)) != 0) { - nni_tcp_dialer_fini(ep->tcp.d); - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); - return (rv); - } - *dp = (void *) ep; - return (rv); -} - -void -nng_tls_dialer_close(nng_tls_dialer *d) +static void +tls_dialer_close(void *arg) { - tls_ep *ep = (void *) d; - nni_tcp_dialer_close(ep->tcp.d); + tls_dialer *d = arg; + nng_stream_dialer_close(d->d); } -void -nng_tls_dialer_free(nng_tls_dialer *d) +static void +tls_dialer_free(void *arg) { - tls_ep *ep = (void *) d; - if (ep != NULL) { - nni_tcp_dialer_fini(ep->tcp.d); - nng_tls_config_free(ep->cfg); - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); + tls_dialer *d; + if ((d = arg) != NULL) { + nng_stream_dialer_free(d->d); + nng_tls_config_free(d->cfg); + nni_mtx_fini(&d->lk); + NNI_FREE_STRUCT(d); } } @@ -154,28 +57,28 @@ nng_tls_dialer_free(nng_tls_dialer *d) static void tls_conn_cb(void *arg) { - nng_tls * tls = arg; - nni_tcp_conn *tcp; - int rv; + nng_stream * tls = arg; + nni_tls_common *com = arg; + nng_stream * tcp; + int rv; - if ((rv = nni_aio_result(tls->aio)) != 0) { - nni_aio_finish_error(tls->uaio, rv); - nng_tls_free(tls); + if ((rv = nni_aio_result(com->aio)) != 0) { + nni_aio_finish_error(com->uaio, rv); + nng_stream_free(tls); return; } - tcp = nni_aio_get_output(tls->aio, 0); + tcp = nni_aio_get_output(com->aio, 0); - rv = nni_tls_init(&tls->c, tls->cfg, tcp); - if (rv != 0) { - nni_aio_finish_error(tls->uaio, rv); - nni_tcp_conn_fini(tcp); - nng_tls_free(tls); + if ((rv = nni_tls_start(tls, tcp)) != 0) { + nni_aio_finish_error(com->uaio, rv); + nng_stream_free(tcp); + nng_stream_free(tls); return; } - nni_aio_set_output(tls->uaio, 0, tls); - nni_aio_finish(tls->uaio, 0, 0); + nni_aio_set_output(com->uaio, 0, tls); + nni_aio_finish(com->uaio, 0, 0); } // Dialer cancel is called when the user has indicated that they no longer @@ -183,49 +86,52 @@ tls_conn_cb(void *arg) static void tls_conn_cancel(nni_aio *aio, void *arg, int rv) { - nng_tls *tls = arg; - NNI_ASSERT(tls->uaio == aio); + nni_tls_common *com = arg; + NNI_ASSERT(com->uaio == aio); // Just pass this down. If the connection is already done, this // will have no effect. - nni_aio_abort(tls->aio, rv); + nni_aio_abort(com->aio, rv); } -void -nng_tls_dialer_dial(nng_tls_dialer *d, const nng_sockaddr *sa, nng_aio *aio) +static void +tls_dialer_dial(void *arg, nng_aio *aio) { - int rv; - nng_tls *tls; - tls_ep * ep = (void *) d; + tls_dialer * d = arg; + int rv; + nng_stream * tls; + nni_tls_common *com; if (nni_aio_begin(aio) != 0) { return; } - if ((tls = NNI_ALLOC_STRUCT(tls)) == NULL) { - nni_aio_finish_error(aio, NNG_ENOMEM); + if ((rv = nni_tls_alloc(&tls)) != 0) { + nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_aio_init(&tls->aio, tls_conn_cb, tls)) != 0) { + + com = (void *) tls; + if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); - NNI_FREE_STRUCT(tls); + nng_stream_free(tls); return; } - tls->uaio = aio; + com->uaio = aio; // Save a copy of the TLS configuration. This way we don't have // to ensure that the dialer outlives the connection, because the // only shared data is the configuration which is reference counted. - nni_mtx_lock(&ep->lk); - tls->cfg = ep->cfg; - nng_tls_config_hold(tls->cfg); - nni_mtx_unlock(&ep->lk); + nni_mtx_lock(&d->lk); + com->cfg = d->cfg; + nng_tls_config_hold(com->cfg); + nni_mtx_unlock(&d->lk); if ((rv = nni_aio_schedule(aio, tls_conn_cancel, tls)) != 0) { nni_aio_finish_error(aio, rv); - nng_tls_free(tls); + nng_stream_free(tls); return; } - nni_tcp_dialer_dial(ep->tcp.d, sa, tls->aio); + nng_stream_dialer_dial(d->d, com->aio); } static int @@ -241,11 +147,12 @@ tls_check_string(const void *v, size_t sz, nni_opt_type t) } static int -tls_ep_set_config(void *arg, const void *buf, size_t sz, nni_type t) +tls_dialer_set_config(void *arg, const void *buf, size_t sz, nni_type t) { int rv; nng_tls_config *cfg; - tls_ep * ep; + tls_dialer * d = arg; + nng_tls_config *old; if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) { return (rv); @@ -253,308 +160,523 @@ tls_ep_set_config(void *arg, const void *buf, size_t sz, nni_type t) if (cfg == NULL) { return (NNG_EINVAL); } - if ((ep = arg) != NULL) { - nng_tls_config *old; - - nni_mtx_lock(&ep->lk); - old = ep->cfg; - nng_tls_config_hold(cfg); - ep->cfg = cfg; - nni_mtx_unlock(&ep->lk); - if (old != NULL) { - nng_tls_config_free(old); - } + nni_mtx_lock(&d->lk); + old = d->cfg; + nng_tls_config_hold(cfg); + d->cfg = cfg; + nni_mtx_unlock(&d->lk); + if (old != NULL) { + nng_tls_config_free(old); } return (0); } static int -tls_ep_get_config(void *arg, void *buf, size_t *szp, nni_type t) +tls_dialer_get_config(void *arg, void *buf, size_t *szp, nni_type t) { - tls_ep * ep = arg; + tls_dialer * d = arg; nng_tls_config *cfg; int rv; - nni_mtx_lock(&ep->lk); - if ((cfg = ep->cfg) != NULL) { + nni_mtx_lock(&d->lk); + if ((cfg = d->cfg) != NULL) { nng_tls_config_hold(cfg); } if ((rv = nni_copyout_ptr(cfg, buf, szp, t)) != 0) { nng_tls_config_free(cfg); } - nni_mtx_unlock(&ep->lk); + nni_mtx_unlock(&d->lk); return (rv); } static int -tls_ep_set_server_name(void *arg, const void *buf, size_t sz, nni_type t) +tls_dialer_set_server_name(void *arg, const void *buf, size_t sz, nni_type t) { - tls_ep *ep = arg; - int rv; - if ((rv = tls_check_string(buf, sz, t)) != 0) { - return (rv); - } - if ((ep = arg) != NULL) { - nni_mtx_lock(&ep->lk); - rv = nng_tls_config_server_name(ep->cfg, buf); - nni_mtx_unlock(&ep->lk); + tls_dialer *d = arg; + int rv; + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->lk); + rv = nng_tls_config_server_name(d->cfg, buf); + nni_mtx_unlock(&d->lk); } return (rv); } static int -tls_ep_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t) +tls_dialer_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t) { - int mode; - int rv; - tls_ep *ep; + int mode; + int rv; + tls_dialer *d = arg; rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE, NNG_TLS_AUTH_MODE_REQUIRED, t); - if ((rv == 0) && ((ep = arg) != NULL)) { - nni_mtx_lock(&ep->lk); - rv = nng_tls_config_auth_mode(ep->cfg, mode); - nni_mtx_unlock(&ep->lk); + if (rv == 0) { + nni_mtx_lock(&d->lk); + rv = nng_tls_config_auth_mode(d->cfg, mode); + nni_mtx_unlock(&d->lk); } return (rv); } static int -tls_ep_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t) +tls_dialer_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t) { - tls_ep *ep; - int rv; + tls_dialer *d = arg; + int rv; - if (((rv = tls_check_string(buf, sz, t)) == 0) && - ((ep = arg) != NULL)) { - nni_mtx_lock(&ep->lk); - rv = nng_tls_config_ca_file(ep->cfg, buf); - nni_mtx_unlock(&ep->lk); + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->lk); + rv = nng_tls_config_ca_file(d->cfg, buf); + nni_mtx_unlock(&d->lk); } return (rv); } static int -tls_ep_set_cert_key_file(void *arg, const void *buf, size_t sz, nni_opt_type t) +tls_dialer_set_cert_key_file( + void *arg, const void *buf, size_t sz, nni_opt_type t) { - tls_ep *ep; - int rv; + tls_dialer *d = arg; + int rv; - if (((rv = tls_check_string(buf, sz, t)) == 0) && - ((ep = arg) != NULL)) { - nni_mtx_lock(&ep->lk); - rv = nng_tls_config_cert_key_file(ep->cfg, buf, NULL); - nni_mtx_unlock(&ep->lk); + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->lk); + rv = nng_tls_config_cert_key_file(d->cfg, buf, NULL); + nni_mtx_unlock(&d->lk); } return (rv); } -static const nni_option tls_ep_opts[] = { +static const nni_option tls_dialer_opts[] = { { .o_name = NNG_OPT_TLS_CONFIG, - .o_get = tls_ep_get_config, - .o_set = tls_ep_set_config, + .o_get = tls_dialer_get_config, + .o_set = tls_dialer_set_config, }, { .o_name = NNG_OPT_TLS_SERVER_NAME, - .o_set = tls_ep_set_server_name, + .o_set = tls_dialer_set_server_name, }, { .o_name = NNG_OPT_TLS_CA_FILE, - .o_set = tls_ep_set_ca_file, + .o_set = tls_dialer_set_ca_file, }, { .o_name = NNG_OPT_TLS_CERT_KEY_FILE, - .o_set = tls_ep_set_cert_key_file, + .o_set = tls_dialer_set_cert_key_file, }, { .o_name = NNG_OPT_TLS_AUTH_MODE, - .o_set = tls_ep_set_auth_mode, + .o_set = tls_dialer_set_auth_mode, }, { .o_name = NULL, }, }; -// private version of getopt and setopt take the type -int -nni_tls_dialer_getopt( - nng_tls_dialer *d, const char *name, void *buf, size_t *szp, nni_type t) +static int +tls_dialer_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - int rv; - tls_ep *ep = (void *) d; + tls_dialer *d = arg; + int rv; - rv = nni_tcp_dialer_getopt(ep->tcp.d, name, buf, szp, t); + rv = nni_stream_dialer_getx(d->d, name, buf, szp, t); if (rv == NNG_ENOTSUP) { - rv = nni_getopt(tls_ep_opts, name, ep, buf, szp, t); + rv = nni_getopt(tls_dialer_opts, name, d, buf, szp, t); } return (rv); } -int -nni_tls_dialer_setopt(nng_tls_dialer *d, const char *name, const void *buf, - size_t sz, nni_type t) +static int +tls_dialer_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - int rv; - tls_ep *ep = (void *) d; + tls_dialer *d = arg; + int rv; - rv = nni_tcp_dialer_setopt( - ep != NULL ? ep->tcp.d : NULL, name, buf, sz, t); + rv = nni_stream_dialer_setx(d->d, name, buf, sz, t); if (rv == NNG_ENOTSUP) { - rv = nni_setopt(tls_ep_opts, name, ep, buf, sz, t); + rv = nni_setopt(tls_dialer_opts, name, d, buf, sz, t); } return (rv); } -// public versions of option handlers here - -int -nng_tls_dialer_getopt( - nng_tls_dialer *d, const char *name, void *buf, size_t *szp) -{ - return (nni_tls_dialer_getopt(d, name, buf, szp, NNI_TYPE_OPAQUE)); -} - int -nng_tls_dialer_setopt( - nng_tls_dialer *d, const char *name, const void *buf, size_t sz) +nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) { - return (nni_tls_dialer_setopt(d, name, buf, sz, NNI_TYPE_OPAQUE)); -} + tls_dialer *d; + int rv; + nng_url myurl; -void -nng_tls_listener_close(nng_tls_listener *l) -{ - tls_ep *ep = (void *) l; - nni_tcp_listener_close(ep->tcp.l); -} - -void -nng_tls_listener_free(nng_tls_listener *l) -{ - tls_ep *ep = (void *) l; - if (ep != NULL) { - nng_tls_listener_close(l); - nng_tls_config_free(ep->cfg); - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); - } -} - -int -nng_tls_listener_alloc(nng_tls_listener **lp) -{ - tls_ep *ep; - int rv; + memcpy(&myurl, url, sizeof(myurl)); + myurl.u_scheme = url->u_scheme + strlen("tls+"); if ((rv = nni_init()) != 0) { return (rv); } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&ep->lk); + nni_mtx_init(&d->lk); - if ((rv = nni_tcp_listener_init(&ep->tcp.l)) != 0) { - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); + if ((rv = nng_stream_dialer_alloc_url(&d->d, &myurl)) != 0) { + nni_mtx_fini(&d->lk); + NNI_FREE_STRUCT(d); return (rv); } - if ((rv = nng_tls_config_alloc(&ep->cfg, NNG_TLS_MODE_SERVER)) != 0) { - nni_tcp_listener_fini(ep->tcp.l); - nni_mtx_fini(&ep->lk); - NNI_FREE_STRUCT(ep); + if ((rv = nng_tls_config_alloc(&d->cfg, NNG_TLS_MODE_CLIENT)) != 0) { + nng_stream_dialer_free(d->d); + nni_mtx_fini(&d->lk); + NNI_FREE_STRUCT(d); return (rv); } - *lp = (void *) ep; - return (0); + + // Set the expected outbound hostname + nng_tls_config_server_name(d->cfg, url->u_hostname); + + d->ops.sd_close = tls_dialer_close; + d->ops.sd_free = tls_dialer_free; + d->ops.sd_dial = tls_dialer_dial; + d->ops.sd_getx = tls_dialer_getx; + d->ops.sd_setx = tls_dialer_setx; + *dp = (void *) d; + return (rv); } -int -nng_tls_listener_listen(nng_tls_listener *l, const nng_sockaddr *sa) +typedef struct { + nng_stream_listener ops; + nng_stream_listener *l; + nng_tls_config * cfg; + nni_mtx lk; +} tls_listener; + +static void +tls_listener_close(void *arg) { - tls_ep *ep = (void *) l; - return (nni_tcp_listener_listen(ep->tcp.l, sa)); + tls_listener *l = arg; + nng_stream_listener_close(l->l); } -void -nng_tls_listener_accept(nng_tls_listener *l, nng_aio *aio) +static void +tls_listener_free(void *arg) { - int rv; - nng_tls *tls; - tls_ep * ep = (void *) l; + tls_listener *l; + if ((l = arg) != NULL) { + tls_listener_close(l); + nng_tls_config_free(l->cfg); + nni_mtx_fini(&l->lk); + NNI_FREE_STRUCT(l); + } +} + +static int +tls_listener_listen(void *arg) +{ + tls_listener *l = arg; + return (nng_stream_listener_listen(l->l)); +} + +static void +tls_listener_accept(void *arg, nng_aio *aio) +{ + tls_listener * l = arg; + int rv; + nng_stream * tls; + nni_tls_common *com; if (nni_aio_begin(aio) != 0) { return; } - if ((tls = NNI_ALLOC_STRUCT(tls)) == NULL) { - nni_aio_finish_error(aio, NNG_ENOMEM); + if ((rv = nni_tls_alloc(&tls)) != 0) { + nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_aio_init(&tls->aio, tls_conn_cb, tls)) != 0) { + com = (void *) tls; + if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) { nni_aio_finish_error(aio, rv); - NNI_FREE_STRUCT(tls); + nng_stream_free(tls); return; } - tls->uaio = aio; + com->uaio = aio; // Save a copy of the TLS configuration. This way we don't have // to ensure that the dialer outlives the connection, because the // only shared data is the configuration which is reference counted. - nni_mtx_lock(&ep->lk); - tls->cfg = ep->cfg; - nng_tls_config_hold(tls->cfg); - nni_mtx_unlock(&ep->lk); + nni_mtx_lock(&l->lk); + com->cfg = l->cfg; + nng_tls_config_hold(com->cfg); + nni_mtx_unlock(&l->lk); if ((rv = nni_aio_schedule(aio, tls_conn_cancel, tls)) != 0) { nni_aio_finish_error(aio, rv); - nng_tls_free(tls); + nng_stream_free(tls); return; } - nni_tcp_listener_accept(ep->tcp.l, tls->aio); + nng_stream_listener_accept(l->l, com->aio); } -int -nni_tls_listener_getopt( - nng_tls_listener *l, const char *name, void *buf, size_t *szp, nni_type t) +static int +tls_listener_set_config(void *arg, const void *buf, size_t sz, nni_type t) { - int rv; - tls_ep *ep = (void *) l; + int rv; + nng_tls_config *cfg; + tls_listener * l = arg; + nng_tls_config *old; - rv = nni_tcp_listener_getopt(ep->tcp.l, name, buf, szp, t); - if (rv == NNG_ENOTSUP) { - rv = nni_getopt(tls_ep_opts, name, ep, buf, szp, t); + if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) { + return (rv); + } + if (cfg == NULL) { + return (NNG_EINVAL); + } + + nni_mtx_lock(&l->lk); + old = l->cfg; + nng_tls_config_hold(cfg); + l->cfg = cfg; + nni_mtx_unlock(&l->lk); + if (old != NULL) { + nng_tls_config_free(old); + } + return (0); +} + +static int +tls_listener_get_config(void *arg, void *buf, size_t *szp, nni_type t) +{ + tls_listener * l = arg; + nng_tls_config *cfg; + int rv; + nni_mtx_lock(&l->lk); + if ((cfg = l->cfg) != NULL) { + nng_tls_config_hold(cfg); + } + if ((rv = nni_copyout_ptr(cfg, buf, szp, t)) != 0) { + nng_tls_config_free(cfg); } + nni_mtx_unlock(&l->lk); return (rv); } -int -nni_tls_listener_setopt(nng_tls_listener *l, const char *name, const void *buf, - size_t sz, nni_type t) +static int +tls_listener_set_server_name(void *arg, const void *buf, size_t sz, nni_type t) +{ + tls_listener *l = arg; + int rv; + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->lk); + rv = nng_tls_config_server_name(l->cfg, buf); + nni_mtx_unlock(&l->lk); + } + return (rv); +} + +static int +tls_listener_set_auth_mode(void *arg, const void *buf, size_t sz, nni_type t) { - int rv; - tls_ep *ep = (void *) l; + int mode; + int rv; + tls_listener *l = arg; - rv = nni_tcp_listener_setopt( - ep != NULL ? ep->tcp.l : NULL, name, buf, sz, t); + rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE, + NNG_TLS_AUTH_MODE_REQUIRED, t); + if (rv == 0) { + nni_mtx_lock(&l->lk); + rv = nng_tls_config_auth_mode(l->cfg, mode); + nni_mtx_unlock(&l->lk); + } + return (rv); +} + +static int +tls_listener_set_ca_file(void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + tls_listener *l = arg; + int rv; + + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->lk); + rv = nng_tls_config_ca_file(l->cfg, buf); + nni_mtx_unlock(&l->lk); + } + return (rv); +} + +static int +tls_listener_set_cert_key_file( + void *arg, const void *buf, size_t sz, nni_opt_type t) +{ + tls_listener *l = arg; + int rv; + + if ((rv = tls_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->lk); + rv = nng_tls_config_cert_key_file(l->cfg, buf, NULL); + nni_mtx_unlock(&l->lk); + } + return (rv); +} + +static const nni_option tls_listener_opts[] = { + { + .o_name = NNG_OPT_TLS_CONFIG, + .o_get = tls_listener_get_config, + .o_set = tls_listener_set_config, + }, + { + .o_name = NNG_OPT_TLS_SERVER_NAME, + .o_set = tls_listener_set_server_name, + }, + { + .o_name = NNG_OPT_TLS_CA_FILE, + .o_set = tls_listener_set_ca_file, + }, + { + .o_name = NNG_OPT_TLS_CERT_KEY_FILE, + .o_set = tls_listener_set_cert_key_file, + }, + { + .o_name = NNG_OPT_TLS_AUTH_MODE, + .o_set = tls_listener_set_auth_mode, + }, + { + .o_name = NULL, + }, +}; + +static int +tls_listener_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + int rv; + tls_listener *l = arg; + + rv = nni_stream_listener_getx(l->l, name, buf, szp, t); if (rv == NNG_ENOTSUP) { - rv = nni_setopt(tls_ep_opts, name, ep, buf, sz, t); + rv = nni_getopt(tls_listener_opts, name, l, buf, szp, t); } return (rv); } -// public versions of option handlers here +static int +tls_listener_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + tls_listener *l = arg; + + rv = nni_stream_listener_setx(l->l, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(tls_listener_opts, name, l, buf, sz, t); + } + return (rv); +} int -nng_tls_listener_getopt( - nng_tls_listener *l, const char *name, void *buf, size_t *szp) +nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url) { - return (nni_tls_listener_getopt(l, name, buf, szp, NNI_TYPE_OPAQUE)); + tls_listener *l; + int rv; + nng_url myurl; + + memcpy(&myurl, url, sizeof(myurl)); + myurl.u_scheme = url->u_scheme + strlen("tls+"); + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->lk); + + if ((rv = nng_stream_listener_alloc_url(&l->l, &myurl)) != 0) { + nni_mtx_fini(&l->lk); + NNI_FREE_STRUCT(l); + return (rv); + } + if ((rv = nng_tls_config_alloc(&l->cfg, NNG_TLS_MODE_SERVER)) != 0) { + nng_stream_listener_free(l->l); + nni_mtx_fini(&l->lk); + NNI_FREE_STRUCT(l); + return (rv); + } + l->ops.sl_free = tls_listener_free; + l->ops.sl_close = tls_listener_close; + l->ops.sl_accept = tls_listener_accept; + l->ops.sl_listen = tls_listener_listen; + l->ops.sl_getx = tls_listener_getx; + l->ops.sl_setx = tls_listener_setx; + *lp = (void *) l; + return (0); } +// The following checks exist for socket configuration, when we need to +// configure an option on a socket before any transport is configured +// underneath. + +static int +tls_check_config(const void *buf, size_t sz, nni_type t) +{ + int rv; + nng_tls_config *cfg; + + if ((rv = nni_copyin_ptr((void **) &cfg, buf, sz, t)) != 0) { + return (rv); + } + if (cfg == NULL) { + return (NNG_EINVAL); + } + return (0); +} + +static int +tls_check_auth_mode(const void *buf, size_t sz, nni_type t) +{ + int mode; + int rv; + + rv = nni_copyin_int(&mode, buf, sz, NNG_TLS_AUTH_MODE_NONE, + NNG_TLS_AUTH_MODE_REQUIRED, t); + return (rv); +} + +static const nni_chkoption tls_chkopts[] = { + { + .o_name = NNG_OPT_TLS_CONFIG, + .o_check = tls_check_config, + }, + { + .o_name = NNG_OPT_TLS_SERVER_NAME, + .o_check = tls_check_string, + }, + { + .o_name = NNG_OPT_TLS_CA_FILE, + .o_check = tls_check_string, + }, + { + .o_name = NNG_OPT_TLS_CERT_KEY_FILE, + .o_check = tls_check_string, + }, + { + .o_name = NNG_OPT_TLS_AUTH_MODE, + .o_check = tls_check_auth_mode, + }, + { + .o_name = NULL, + }, +}; + int -nng_tls_listener_setopt( - nng_tls_listener *l, const char *name, const void *buf, size_t sz) +nni_tls_checkopt(const char *name, const void *data, size_t sz, nni_type t) { - return (nni_tls_listener_setopt(l, name, buf, sz, NNI_TYPE_OPAQUE)); + int rv; + + rv = nni_chkopt(tls_chkopts, name, data, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("tcp", name, data, sz, t); + } + return (rv); } diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt index 22ee955d..3b102c80 100644 --- a/src/supplemental/websocket/CMakeLists.txt +++ b/src/supplemental/websocket/CMakeLists.txt @@ -12,5 +12,7 @@ if (NNG_SUPP_WEBSOCKET) set(_SRCS supplemental/websocket/websocket.c supplemental/websocket/websocket.h) +else() + set(_SRCS supplemental/websocket/stubs.c) endif() set(NNG_SRCS ${NNG_SRCS} ${_SRCS} PARENT_SCOPE) diff --git a/src/supplemental/websocket/stub.c b/src/supplemental/websocket/stub.c new file mode 100644 index 00000000..94e7f1b5 --- /dev/null +++ b/src/supplemental/websocket/stub.c @@ -0,0 +1,40 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +// This stub file exists to support configuration of the stream subsystem +// when websocket support is unconfigured. + +int +nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + return (NNG_ENOTSUP); +} + +int +nni_ws_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + NNI_ARG_UNUSED(lp); + NNI_ARG_UNUSED(url); + return (NNG_ENOTSUP); +} + +int +nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t) +{ + NNI_ARG_UNUSED(name); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(sz); + NNI_ARG_UNUSED(t); + return (NNG_ENOTSUP); +} diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 3d3a68cb..4cecf430 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -18,11 +18,25 @@ #include "supplemental/http/http_api.h" #include "supplemental/sha1/sha1.h" +#include <nng/transport/ws/websocket.h> + #include "websocket.h" +// This should be removed or handled differently in the future. +typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); + +// We have chosen to be a bit more stringent in the size of the frames that +// we send, while we more generously allow larger incoming frames. These +// may be tuned by options. +#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) +#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv) +#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send) + +// Alias for checking the prefix of a string. +#define startswith(s, t) (strncmp(s, t, strlen(t)) == 0) + // Pre-defined types for some prototypes. These are from other subsystems. typedef struct ws_frame ws_frame; -typedef struct ws_msg ws_msg; typedef struct ws_header { nni_list_node node; @@ -31,17 +45,20 @@ typedef struct ws_header { } ws_header; struct nni_ws { + nng_stream ops; nni_list_node node; nni_reap_item reap; bool server; bool closed; bool ready; bool wclose; + bool isstream; + bool inmsg; nni_mtx mtx; - nni_list txmsgs; - nni_list rxmsgs; nni_list sendq; nni_list recvq; + nni_list txq; + nni_list rxq; ws_frame * txframe; ws_frame * rxframe; nni_aio * txaio; // physical aios @@ -57,26 +74,31 @@ struct nni_ws { char * reshdrs; size_t maxframe; size_t fragsize; + size_t recvmax; // largest message size nni_ws_listener *listener; nni_ws_dialer * dialer; }; struct nni_ws_listener { - nni_http_server * server; - char * proto; - nni_mtx mtx; - nni_cv cv; - nni_list pend; - nni_list reply; - nni_list aios; - nni_url * url; - bool started; - bool closed; - nni_http_handler * handler; - nni_ws_listen_hook hookfn; - void * hookarg; - nni_list headers; // response headers - size_t maxframe; + nng_stream_listener ops; + nni_http_server * server; + char * proto; + nni_mtx mtx; + nni_cv cv; + nni_list pend; + nni_list reply; + nni_list aios; + nng_url * url; + bool started; + bool closed; + bool isstream; + nni_http_handler * handler; + nni_ws_listen_hook hookfn; + void * hookarg; + nni_list headers; // response headers + size_t maxframe; + size_t fragsize; + size_t recvmax; // largest message size }; // The dialer tracks user aios in two lists. The first list is for aios @@ -86,18 +108,21 @@ struct nni_ws_listener { // completion of an earlier connection. (We don't want to establish // requests when we already have connects negotiating.) struct nni_ws_dialer { - nni_http_req * req; - nni_http_res * res; - nni_http_client *client; - nni_mtx mtx; - nni_cv cv; - char * proto; - nni_url * url; - nni_list wspend; // ws structures still negotiating - bool closed; - nng_sockaddr sa; - nni_list headers; // request headers - size_t maxframe; + nng_stream_dialer ops; + nni_http_req * req; + nni_http_res * res; + nni_http_client * client; + nni_mtx mtx; + nni_cv cv; + char * proto; + nng_url * url; + nni_list wspend; // ws structures still negotiating + bool closed; + bool isstream; + nni_list headers; // request headers + size_t maxframe; + size_t fragsize; + size_t recvmax; }; typedef enum ws_type { @@ -133,16 +158,7 @@ struct ws_frame { bool masked; size_t bufsz; // allocated size uint8_t * buf; - ws_msg * wmsg; -}; - -struct ws_msg { - nni_list frames; - nni_list_node node; - nni_ws * ws; - nni_aio * aio; - uint8_t * buf; - size_t bufsz; + nng_aio * aio; }; static void ws_send_close(nni_ws *ws, uint16_t code); @@ -150,6 +166,127 @@ static void ws_conn_cb(void *); static void ws_close_cb(void *); static void ws_read_cb(void *); static void ws_write_cb(void *); +static void ws_close_error(nni_ws *ws, uint16_t code); + +static void ws_str_free(void *); +static void ws_str_close(void *); +static void ws_str_send(void *, nng_aio *); +static void ws_str_recv(void *, nng_aio *); +static int ws_str_getx(void *, const char *, void *, size_t *, nni_type); +static int ws_str_setx(void *, const char *, const void *, size_t, nni_type); + +static void ws_listener_close(void *); +static void ws_listener_free(void *); + +static int +ws_check_string(const void *v, size_t sz, nni_opt_type t) +{ + if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { + return (NNG_EBADTYPE); + } + if (nni_strnlen(v, sz) >= sz) { + return (NNG_EINVAL); + } + return (0); +} + +static int +ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups) +{ + ws_header *hdr; + char * nv; + + if ((nv = nni_strdup(v)) == NULL) { + return (NNG_ENOMEM); + } + + if (strip_dups) { + NNI_LIST_FOREACH (l, hdr) { + if (nni_strcasecmp(hdr->name, n) == 0) { + nni_strfree(hdr->value); + hdr->value = nv; + return (0); + } + } + } + + if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) { + nni_strfree(nv); + return (NNG_ENOMEM); + } + if ((hdr->name = nni_strdup(n)) == NULL) { + nni_strfree(nv); + NNI_FREE_STRUCT(hdr); + return (NNG_ENOMEM); + } + hdr->value = nv; + nni_list_append(l, hdr); + return (0); +} + +static int +ws_set_header(nni_list *l, const char *n, const char *v) +{ + return (ws_set_header_ext(l, n, v, true)); +} + +static int +ws_set_headers(nni_list *l, const char *str) +{ + char * dupstr; + size_t duplen; + char * n; + char * v; + char * nl; + int rv; + + if ((dupstr = nni_strdup(str)) == NULL) { + return (NNG_ENOMEM); + } + duplen = strlen(dupstr) + 1; // so we can free it later + + n = dupstr; + for (;;) { + if ((v = strchr(n, ':')) == NULL) { + // Note that this also means that if + // a bare word is present, we ignore it. + break; + } + *v = '\0'; + v++; + while (*v == ' ') { + // Skip leading whitespace. Not strictly + // necessary, but still a good idea. + v++; + } + nl = v; + // Find the end of the line -- should be CRLF, but can + // also be unterminated or just LF if user + while ((*nl != '\0') && (*nl != '\r') && (*nl != '\n')) { + nl++; + } + while ((*nl == '\r') || (*nl == '\n')) { + *nl = '\0'; + nl++; + } + + // Note that this can lead to a partial failure. As this + // is most likely ENOMEM, don't worry too much about it. + // This method does *not* eliminate duplicates. + if ((rv = ws_set_header_ext(l, n, v, false)) != 0) { + goto done; + } + + // Advance to the next name. + n = nl; + } + + rv = 0; + +done: + nni_free(dupstr, duplen); + return (rv); +} // This looks, case independently for a word in a list, which is either // space or comma separated. @@ -204,30 +341,13 @@ ws_make_accept(const char *key, char *accept) static void ws_frame_fini(ws_frame *frame) { - if (frame->bufsz) { + if (frame->bufsz != 0) { nni_free(frame->buf, frame->bufsz); } NNI_FREE_STRUCT(frame); } static void -ws_msg_fini(ws_msg *wm) -{ - ws_frame *frame; - - NNI_ASSERT(!nni_list_node_active(&wm->node)); - while ((frame = nni_list_first(&wm->frames)) != NULL) { - nni_list_remove(&wm->frames, frame); - ws_frame_fini(frame); - } - if (wm->bufsz != 0) { - nni_free(wm->buf, wm->bufsz); - } - - NNI_FREE_STRUCT(wm); -} - -static void ws_mask_frame(ws_frame *frame) { uint32_t r; @@ -263,31 +383,19 @@ ws_unmask_frame(ws_frame *frame) static int ws_msg_init_control( - ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len) + ws_frame **framep, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len) { - ws_msg * wm; ws_frame *frame; if (len > 125) { return (NNG_EINVAL); } - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); - } - wm->buf = NULL; - wm->bufsz = 0; - if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - ws_msg_fini(wm); return (NNG_ENOMEM); } - NNI_LIST_INIT(&wm->frames, ws_frame, node); memcpy(frame->sdata, buf, len); - - nni_list_append(&wm->frames, frame); - frame->wmsg = wm; frame->len = len; frame->final = true; frame->op = op; @@ -303,114 +411,100 @@ ws_msg_init_control( ws_mask_frame(frame); } - wm->aio = NULL; - wm->ws = ws; - *wmp = wm; + *framep = frame; return (0); } static int -ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) +ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) { - ws_msg * wm; + nng_aio *aio = frame->aio; + nni_iov *iov; + unsigned niov; size_t len; - size_t maxfrag = ws->fragsize; // make this tunable. (1MB default) uint8_t *buf; - uint8_t op; - - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&wm->frames, ws_frame, node); - - len = nni_msg_len(msg) + nni_msg_header_len(msg); - wm->bufsz = len; - if ((wm->buf = nni_alloc(len)) == NULL) { - NNI_FREE_STRUCT(wm); - return (NNG_ENOMEM); - } - buf = wm->buf; - memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg)); - memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg), - nni_msg_len(msg)); - - op = WS_BINARY; // to start -- no support for sending TEXT frames - - // do ... while because we want at least one frame (even for empty - // messages.) Headers get their own frame, if present. Best bet - // is to try not to have a header when coming here. - do { - ws_frame *frame; - if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - ws_msg_fini(wm); + // Figure out how much we need for the entire aio. + frame->len = 0; + nni_aio_get_iov(aio, &niov, &iov); + for (unsigned i = 0; i < niov; i++) { + frame->len += iov[i].iov_len; + } + + if ((frame->len > ws->fragsize) && (ws->fragsize > 0)) { + // Limit it to a single frame per policy (fragsize), as needed. + frame->len = ws->fragsize; + // For stream mode, we constrain ourselves to one frame + // per message. Submitter may see a partial transmit, and + // should resubmit as needed. For message mode, we will + // continue to resubmit. + frame->final = ws->isstream ? true : false; + } else { + // It all fits in this frame (which might not be the first), + // so we're done. + frame->final = true; + } + // Potentially allocate space for the data if we need to. + // Note that an empty message is legal. + if ((frame->bufsz < frame->len) && (frame->len > 0)) { + frame->buf = nni_alloc(frame->len); + if (frame->buf == NULL) { return (NNG_ENOMEM); } - nni_list_append(&wm->frames, frame); - frame->wmsg = wm; - frame->len = len > maxfrag ? maxfrag : len; - frame->buf = buf; - frame->op = op; - - buf += frame->len; - len -= frame->len; - op = WS_CONT; - - if (len == 0) { - frame->final = true; - } - frame->head[0] = frame->op; - frame->hlen = 2; - if (frame->final) { - frame->head[0] |= 0x80; // final frame bit - } - if (frame->len < 126) { - frame->head[1] = frame->len & 0x7f; - } else if (frame->len < 65536) { - frame->head[1] = 126; - NNI_PUT16(frame->head + 2, (frame->len & 0xffff)); - frame->hlen += 2; - } else { - frame->head[1] = 127; - NNI_PUT64(frame->head + 2, (uint64_t) frame->len); - frame->hlen += 8; - } + } + buf = frame->buf; - if (ws->server) { - frame->masked = false; - } else { - ws_mask_frame(frame); + // Now copy the data into the frame. + len = frame->len; + while (len != 0) { + size_t n = len; + if (n > iov->iov_len) { + n = iov->iov_len; } + memcpy(buf, iov->iov_buf, n); + iov++; + len -= n; + buf += n; + } - } while (len); - - wm->aio = aio; - wm->ws = ws; - *wmp = wm; - return (0); -} + if (nni_aio_count(aio) == 0) { + // This is the first frame. + frame->op = WS_BINARY; + } else { + frame->op = WS_CONT; + } -static int -ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio) -{ - ws_msg *wm; + // Populate the frame header. + frame->head[0] = frame->op; + frame->hlen = 2; + if (frame->final) { + frame->head[0] |= 0x80; // final frame bit + } + if (frame->len < 126) { + frame->head[1] = frame->len & 0x7f; + } else if (frame->len < 65536) { + frame->head[1] = 126; + NNI_PUT16(frame->head + 2, (frame->len & 0xffff)); + frame->hlen += 2; + } else { + frame->head[1] = 127; + NNI_PUT64(frame->head + 2, (uint64_t) frame->len); + frame->hlen += 8; + } - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); + // If we are on the client, then we need to mask the frame. + frame->masked = false; + if (!ws->server) { + ws_mask_frame(frame); } - NNI_LIST_INIT(&wm->frames, ws_frame, node); - wm->aio = aio; - wm->ws = ws; - *wmp = wm; return (0); } static void ws_close_cb(void *arg) { - nni_ws * ws = arg; - ws_msg * wm; - nni_aio *aio; + nni_ws * ws = arg; + ws_frame *frame; nni_aio_close(ws->txaio); nni_aio_close(ws->rxaio); @@ -422,31 +516,13 @@ ws_close_cb(void *arg) nni_http_conn_close(ws->http); - // This list (receive) should be empty. - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - ws_msg_fini(wm); - } - - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + if (frame->aio != NULL) { + nni_aio_list_remove(frame->aio); + nni_aio_finish_error(frame->aio, NNG_ECLOSED); } - ws_msg_fini(wm); - } - ws->txframe = NULL; - - if (ws->rxframe != NULL) { - ws_frame_fini(ws->rxframe); - ws->rxframe = NULL; + ws_frame_fini(frame); } // Any txframe should have been killed with its wmsg. @@ -456,19 +532,13 @@ ws_close_cb(void *arg) static void ws_close(nni_ws *ws, uint16_t code) { - ws_msg *wm; + nng_aio *aio; // Receive stuff gets aborted always. No further receives // once we get a close. - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_aio *aio; - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((aio = nni_list_first(&ws->recvq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } // If were closing "gracefully", then don't abort in-flight @@ -487,7 +557,6 @@ static void ws_start_write(nni_ws *ws) { ws_frame *frame; - ws_msg * wm; nni_iov iov[2]; int niov; @@ -495,13 +564,11 @@ ws_start_write(nni_ws *ws) return; // busy } - if ((wm = nni_list_first(&ws->txmsgs)) == NULL) { - // Nothing to send. - return; + if ((frame = nni_list_first(&ws->txq)) == NULL) { + return; // nothing to send } - - frame = nni_list_first(&wm->frames); NNI_ASSERT(frame != NULL); + nni_list_remove(&ws->txq, frame); // Push it out. ws->txframe = frame; @@ -534,7 +601,6 @@ ws_write_cb(void *arg) { nni_ws * ws = arg; ws_frame *frame; - ws_msg * wm; nni_aio * aio; int rv; @@ -545,17 +611,20 @@ ws_write_cb(void *arg) return; } ws->txframe = NULL; + if (frame->op == WS_CLOSE) { // If this was a close frame, we are done. // No other messages may succeed.. - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; + ws->txframe = NULL; + ws_frame_fini(frame); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + if ((aio = frame->aio) != NULL) { + frame->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); + ws_frame_fini(frame); } - ws_msg_fini(wm); } if (ws->wclose) { ws->wclose = false; @@ -565,54 +634,55 @@ ws_write_cb(void *arg) return; } - wm = frame->wmsg; - aio = wm->aio; - + aio = frame->aio; if ((rv = nni_aio_result(ws->txaio)) != 0) { - - nni_list_remove(&ws->txmsgs, wm); - ws_msg_fini(wm); + frame->aio = NULL; if (aio != NULL) { - wm->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } - + ws_frame_fini(frame); ws->closed = true; nni_http_conn_close(ws->http); nni_mtx_unlock(&ws->mtx); return; } - // good frame, was it the last - nni_list_remove(&wm->frames, frame); - ws_frame_fini(frame); - - // if we still have more frames to transmit, then schedule it. - if (!nni_list_empty(&wm->frames)) { - ws_start_write(ws); - nni_mtx_unlock(&ws->mtx); - return; + if (aio != NULL) { + nni_aio_iov_advance(aio, frame->len); + nni_aio_bump_count(aio, frame->len); + if (frame->final) { + frame->aio = NULL; + nni_aio_list_remove(aio); + } else { + // Clear the aio so that we won't attempt to finish + // it outside the lock + aio = NULL; + } } - if (aio != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); + if (frame->final) { + ws_frame_fini(frame); + } else { + // This one cannot fail here, since we only do allocation + // at initial scheduling. + ws_frame_prep_tx(ws, frame); + // Schedule at end. This permits other frames to interleave. + nni_list_append(&ws->txq, frame); } - nni_list_remove(&ws->txmsgs, wm); - // Write the next frame. ws_start_write(ws); nni_mtx_unlock(&ws->mtx); - // discard while not holding lock (just deallocations) - ws_msg_fini(wm); - + // We attempt to finish the operation synchronously, outside the lock. if (aio != NULL) { - nng_msg *msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); - nni_msg_free(msg); + nni_msg *msg; + // Successful send, don't leak the message! + if ((msg = nni_aio_get_msg(aio)) != NULL) { + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + } + nni_aio_finish_synch(aio, 0, nni_aio_count(aio)); } } @@ -620,7 +690,6 @@ static void ws_write_cancel(nni_aio *aio, void *arg, int rv) { nni_ws * ws = arg; - ws_msg * wm; ws_frame *frame; // Is this aio active? We can tell by looking at the active tx frame. @@ -630,17 +699,17 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&ws->mtx); return; } - wm = nni_aio_get_prov_extra(aio, 0); - if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) { + frame = nni_aio_get_prov_extra(aio, 0); + if (frame == ws->txframe) { nni_aio_abort(ws->txaio, rv); // We will wait for callback on the txaio to finish aio. - } else if (nni_list_active(&ws->txmsgs, wm)) { + } else { // If scheduled, just need to remove node and complete it. - nni_list_remove(&ws->txmsgs, wm); - wm->aio = NULL; + nni_list_remove(&ws->txq, frame); + frame->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); + ws_frame_fini(frame); } nni_mtx_unlock(&ws->mtx); } @@ -648,10 +717,10 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv) static void ws_send_close(nni_ws *ws, uint16_t code) { - ws_msg * wm; - uint8_t buf[sizeof(uint16_t)]; - int rv; - nni_aio *aio; + ws_frame *frame; + uint8_t buf[sizeof(uint16_t)]; + int rv; + nni_aio * aio; NNI_PUT16(buf, code); @@ -665,125 +734,45 @@ ws_send_close(nni_ws *ws, uint16_t code) return; } ws->wclose = true; - rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf)); + rv = ws_msg_init_control(&frame, ws, WS_CLOSE, buf, sizeof(buf)); if (rv != 0) { ws->wclose = false; nni_aio_finish_error(aio, rv); return; } - // Close frames get priority! if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) { ws->wclose = false; nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); return; } - nni_list_prepend(&ws->txmsgs, wm); + // This gets inserted at the head. + nni_list_prepend(&ws->txq, frame); ws_start_write(ws); } static void ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len) { - ws_msg *wm; + ws_frame *frame; // Note that we do not care if this works or not. So no AIO needed. if ((ws->closed) || - (ws_msg_init_control(&wm, ws, op, buf, len) != 0)) { + (ws_msg_init_control(&frame, ws, op, buf, len) != 0)) { return; } // Control frames at head of list. (Note that this may preempt // the close frame or other ping/pong requests. Oh well.) - nni_list_prepend(&ws->txmsgs, wm); + nni_list_prepend(&ws->txq, frame); ws_start_write(ws); } -static const nni_option ws_options[] = { - { - .o_name = NULL, - }, -}; - -int -nni_ws_getopt(nni_ws *ws, const char *name, void *buf, size_t *szp, nni_type t) -{ - int rv; - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - return (NNG_ECLOSED); - } - rv = nni_http_conn_getopt(ws->http, name, buf, szp, t); - if (rv == NNG_ENOTSUP) { - rv = nni_getopt(ws_options, name, ws, buf, szp, t); - } - nni_mtx_unlock(&ws->mtx); - return (rv); -} - -int -nni_ws_setopt( - nni_ws *ws, const char *name, const void *buf, size_t sz, nni_type t) -{ - int rv; - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - return (NNG_ECLOSED); - } - rv = nni_http_conn_setopt(ws->http, name, buf, sz, t); - if (rv == NNG_ENOTSUP) { - rv = nni_setopt(ws_options, name, ws, buf, sz, t); - } - nni_mtx_unlock(&ws->mtx); - return (rv); -} - -void -nni_ws_send_msg(nni_ws *ws, nni_aio *aio) -{ - ws_msg * wm; - nni_msg *msg; - int rv; - - msg = nni_aio_get_msg(aio); - - if (nni_aio_begin(aio) != 0) { - return; - } - if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - ws_msg_fini(wm); - return; - } - if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) { - nni_mtx_unlock(&ws->mtx); - nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); - return; - } - nni_aio_set_prov_extra(aio, 0, wm); - nni_list_append(&ws->sendq, aio); - nni_list_append(&ws->txmsgs, wm); - ws_start_write(ws); - nni_mtx_unlock(&ws->mtx); -} - static void ws_start_read(nni_ws *ws) { ws_frame *frame; - ws_msg * wm; nni_aio * aio; nni_iov iov; @@ -791,18 +780,18 @@ ws_start_read(nni_ws *ws) return; // already reading or closed } - if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) { - return; // no body expecting a message. + // If nobody is waiting for recv, and we already have a data + // frame, stop reading. This keeps us from buffering infinitely. + if (nni_list_empty(&ws->recvq) && !nni_list_empty(&ws->rxq)) { + return; } if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; + if ((aio = nni_list_first(&ws->recvq)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ENOMEM); } - ws_msg_fini(wm); + ws_close(ws, WS_CLOSE_INTERNAL); return; } @@ -820,34 +809,143 @@ ws_start_read(nni_ws *ws) } static void -ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop) +ws_read_finish_str(nni_ws *ws) { - ws_msg *wm = nni_list_first(&ws->rxmsgs); + for (;;) { + nni_aio * aio; + nni_iov * iov; + unsigned niov; + ws_frame *frame; - switch (frame->op) { - case WS_CONT: - if (wm == NULL) { - ws_close(ws, WS_CLOSE_GOING_AWAY); + if ((aio = nni_list_first(&ws->recvq)) == NULL) { return; } - if (nni_list_empty(&wm->frames)) { + + if ((frame = nni_list_first(&ws->rxq)) == NULL) { + return; + } + + // Discard 0 length frames -- in stream mode they are not used. + if (frame->len == 0) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); + continue; + } + + // We are completing this aio one way or the other. + nni_aio_list_remove(aio); + nni_aio_get_iov(aio, &niov, &iov); + + while ((frame != NULL) && (niov != 0)) { + size_t n; + + if ((n = frame->len) > iov->iov_len) { + // This eats the entire iov. + n = iov->iov_len; + } + iov->iov_buf = ((uint8_t *) iov->iov_buf) + n; + iov->iov_len -= n; + if (iov->iov_len == 0) { + iov++; + niov--; + } + + if (frame->len == n) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); + frame = nni_list_first(&ws->rxq); + } else { + frame->len -= n; + frame->buf += n; + } + + nni_aio_bump_count(aio, n); + } + + nni_aio_finish(aio, 0, nni_aio_count(aio)); + } +} + +static void +ws_read_finish_msg(nni_ws *ws) +{ + nni_aio * aio; + size_t len; + ws_frame *frame; + nni_msg * msg; + int rv; + uint8_t * body; + + // If we have no data, no waiter, or have not received the complete + // message yet, then there is nothing to do. + if (ws->inmsg || nni_list_empty(&ws->rxq) || + ((aio = nni_list_first(&ws->recvq)) == NULL)) { + return; + } + + // At this point, we have both a complete message in the queue (and + // there should not be any frames other than the for the message), + // and a waiting reader. + len = 0; + NNI_LIST_FOREACH (&ws->rxq, frame) { + len += frame->len; + } + + nni_aio_list_remove(aio); + + if ((rv = nni_msg_alloc(&msg, len)) != 0) { + nni_aio_finish_error(aio, rv); + ws_close_error(ws, WS_CLOSE_INTERNAL); + return; + } + body = nni_msg_body(msg); + while ((frame = nni_list_first(&ws->rxq)) != NULL) { + nni_list_remove(&ws->rxq, frame); + memcpy(body, frame->buf, frame->len); + body += frame->len; + ws_frame_fini(frame); + } + + nni_aio_set_msg(aio, msg); + nni_aio_bump_count(aio, nni_msg_len(msg)); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +ws_read_finish(nni_ws *ws) +{ + if (ws->isstream) { + ws_read_finish_str(ws); + } else { + ws_read_finish_msg(ws); + } +} + +static void +ws_read_frame_cb(nni_ws *ws, ws_frame *frame) +{ + switch (frame->op) { + case WS_CONT: + if (!ws->inmsg) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); return; } + if (frame->final) { + ws->inmsg = false; + } ws->rxframe = NULL; - nni_list_append(&wm->frames, frame); + nni_list_append(&ws->rxq, frame); break; case WS_BINARY: - if (wm == NULL) { - ws_close(ws, WS_CLOSE_GOING_AWAY); - return; - } - if (!nni_list_empty(&wm->frames)) { + if (ws->inmsg) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); return; } + if (!frame->final) { + ws->inmsg = true; + } ws->rxframe = NULL; - nni_list_append(&wm->frames, frame); + nni_list_append(&ws->rxq, frame); break; case WS_TEXT: // No support for text mode at present. @@ -880,23 +978,13 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop) return; } - // If this was the last (final) frame, then complete it. But - // we have to look at the msg, since we might have got a - // control frame. - if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) { - nni_list_remove(&ws->rxmsgs, wm); - *wmp = wm; - *aiop = wm->aio; - nni_aio_list_remove(wm->aio); - wm->aio = NULL; - } + ws_read_finish(ws); } static void ws_read_cb(void *arg) { - nni_ws * ws = arg; - ws_msg * wm; + nni_ws * ws = arg; nni_aio * aio = ws->rxaio; ws_frame *frame; int rv; @@ -976,6 +1064,21 @@ ws_read_cb(void *arg) nni_mtx_unlock(&ws->mtx); return; } + // For message mode, also check to make sure that the overall + // length of the message has not exceeded our recvmax. + // (Protect against an infinite stream of small messages!) + if ((!ws->isstream) && (ws->recvmax > 0)) { + size_t totlen = frame->len; + ws_frame *fr2; + NNI_LIST_FOREACH (&ws->rxq, fr2) { + totlen += fr2->len; + } + if (totlen > ws->recvmax) { + ws_close(ws, WS_CLOSE_TOO_BIG); + nni_mtx_unlock(&ws->mtx); + return; + } + } // Check for masking. (We don't actually do the unmask // here, because we don't have data yet.) @@ -1023,134 +1126,40 @@ ws_read_cb(void *arg) // At this point, we have a complete frame. ws_unmask_frame(frame); // idempotent - wm = NULL; - aio = NULL; - ws_read_frame_cb(ws, frame, &wm, &aio); + ws_read_frame_cb(ws, frame); ws_start_read(ws); nni_mtx_unlock(&ws->mtx); - - // Got a good message, so we have to do the work to send it up. - if (wm != NULL) { - size_t len = 0; - nni_msg *msg; - uint8_t *body; - int rv; - - NNI_LIST_FOREACH (&wm->frames, frame) { - len += frame->len; - } - if ((rv = nni_msg_alloc(&msg, len)) != 0) { - nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); - nni_ws_close_error(ws, WS_CLOSE_INTERNAL); - return; - } - body = nni_msg_body(msg); - NNI_LIST_FOREACH (&wm->frames, frame) { - memcpy(body, frame->buf, frame->len); - body += frame->len; - } - nni_aio_set_msg(aio, msg); - nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); - ws_msg_fini(wm); - } } static void ws_read_cancel(nni_aio *aio, void *arg, int rv) { nni_ws *ws = arg; - ws_msg *wm; nni_mtx_lock(&ws->mtx); - if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&ws->mtx); - return; - } - wm = nni_aio_get_prov_extra(aio, 0); - if (wm == nni_list_first(&ws->rxmsgs)) { - // Cancellation will percolate back up. - nni_aio_abort(ws->rxaio, rv); - } else if (nni_list_active(&ws->rxmsgs, wm)) { - nni_list_remove(&ws->rxmsgs, wm); - ws_msg_fini(wm); + if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ws->mtx); } -void -nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) -{ - ws_msg *wm; - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - nni_mtx_lock(&ws->mtx); - if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) { - nni_mtx_unlock(&ws->mtx); - ws_msg_fini(wm); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_set_prov_extra(aio, 0, wm); - nni_list_append(&ws->recvq, aio); - nni_list_append(&ws->rxmsgs, wm); - ws_start_read(ws); - - nni_mtx_unlock(&ws->mtx); -} - -void -nni_ws_close_error(nni_ws *ws, uint16_t code) +static void +ws_close_error(nni_ws *ws, uint16_t code) { nni_mtx_lock(&ws->mtx); ws_close(ws, code); nni_mtx_unlock(&ws->mtx); } -void -nni_ws_close(nni_ws *ws) -{ - nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); -} - -const char * -nni_ws_request_headers(nni_ws *ws) -{ - nni_mtx_lock(&ws->mtx); - if (ws->reqhdrs == NULL) { - ws->reqhdrs = nni_http_req_headers(ws->req); - } - nni_mtx_unlock(&ws->mtx); - return (ws->reqhdrs); -} - -const char * -nni_ws_response_headers(nni_ws *ws) -{ - nni_mtx_lock(&ws->mtx); - if (ws->reshdrs == NULL) { - ws->reshdrs = nni_http_res_headers(ws->res); - } - nni_mtx_unlock(&ws->mtx); - return (ws->reshdrs); -} - static void ws_fini(void *arg) { - nni_ws *ws = arg; - ws_msg *wm; + nni_ws * ws = arg; + ws_frame *frame; + nng_aio * aio; - nni_ws_close(ws); + ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); // Give a chance for the close frame to drain. if (ws->closeaio) { @@ -1175,25 +1184,29 @@ ws_fini(void *arg) } nni_mtx_lock(&ws->mtx); - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if (wm->aio) { - nni_aio_finish_error(wm->aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((frame = nni_list_first(&ws->rxq)) != NULL) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); } - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if (wm->aio) { - nni_aio_finish_error(wm->aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + ws_frame_fini(frame); } - if (ws->rxframe) { + if (ws->rxframe != NULL) { ws_frame_fini(ws->rxframe); } + if (ws->txframe != NULL) { + ws_frame_fini(ws->txframe); + } + + while (((aio = nni_list_first(&ws->recvq)) != NULL) || + ((aio = nni_list_first(&ws->sendq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); if (ws->http) { @@ -1217,8 +1230,8 @@ ws_fini(void *arg) NNI_FREE_STRUCT(ws); } -void -nni_ws_fini(nni_ws *ws) +static void +ws_reap(nni_ws *ws) { nni_reap(&ws->reap, ws_fini, ws); } @@ -1232,8 +1245,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio) nni_mtx_lock(&l->mtx); nni_list_remove(&l->reply, ws); if (nni_aio_result(aio) != 0) { - nni_ws_fini(ws); nni_mtx_unlock(&l->mtx); + ws_reap(ws); return; } ws->ready = true; @@ -1320,14 +1333,14 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) (!ws_contains_word(ptr, "upgrade")) || ((ptr = GETH("Upgrade")) == NULL) || (strcmp(ptr, "websocket") != 0)) { - nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); rv = NNG_EPROTO; goto err; } if (d->proto != NULL) { if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) || (!ws_contains_word(d->proto, ptr))) { - nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); rv = NNG_EPROTO; goto err; } @@ -1358,7 +1371,7 @@ err: } nni_mtx_unlock(&d->mtx); - nni_ws_fini(ws); + ws_reap(ws); } static void @@ -1384,8 +1397,8 @@ ws_init(nni_ws **wsp) return (NNG_ENOMEM); } nni_mtx_init(&ws->mtx); - NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node); - NNI_LIST_INIT(&ws->txmsgs, ws_msg, node); + NNI_LIST_INIT(&ws->rxq, ws_frame, node); + NNI_LIST_INIT(&ws->txq, ws_frame, node); nni_aio_list_init(&ws->sendq); nni_aio_list_init(&ws->recvq); @@ -1401,17 +1414,25 @@ ws_init(nni_ws **wsp) nni_aio_set_timeout(ws->closeaio, 100); nni_aio_set_timeout(ws->httpaio, 2000); + ws->ops.s_close = ws_str_close; + ws->ops.s_free = ws_str_free; + ws->ops.s_send = ws_str_send; + ws->ops.s_recv = ws_str_recv; + ws->ops.s_getx = ws_str_getx; + ws->ops.s_setx = ws_str_setx; + ws->fragsize = 1 << 20; // we won't send a frame larger than this *wsp = ws; return (0); } -void -nni_ws_listener_fini(nni_ws_listener *l) +static void +ws_listener_free(void *arg) { - ws_header *hdr; + nni_ws_listener *l = arg; + ws_header * hdr; - nni_ws_listener_close(l); + ws_listener_close(l); nni_mtx_lock(&l->mtx); while (!nni_list_empty(&l->reply)) { @@ -1437,7 +1458,7 @@ nni_ws_listener_fini(nni_ws_listener *l) NNI_FREE_STRUCT(hdr); } if (l->url) { - nni_url_free(l->url); + nng_url_free(l->url); } NNI_FREE_STRUCT(l); } @@ -1456,6 +1477,7 @@ ws_handler(nni_aio *aio) uint16_t status; int rv; char key[29]; + ws_header * hdr; req = nni_aio_get_input(aio, 0); h = nni_aio_get_input(aio, 1); @@ -1542,6 +1564,20 @@ ws_handler(nni_aio *aio) goto err; } + // Set any user supplied headers. This is better than using a hook + // for most things, because it is loads easier. + NNI_LIST_FOREACH (&l->headers, hdr) { + if (SETH(hdr->name, hdr->value) != 0) { + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_free(res); + goto err; + } + } + + // The hook function gives us the ability to intercept the HTTP + // response altogether. Its best not to do this unless you really + // need to, because it's much more complex. But if you want to set + // up an HTTP Authorization handler this might be the only choice. if (l->hookfn != NULL) { rv = l->hookfn(l->hookarg, req, res); if (rv != 0) { @@ -1583,8 +1619,9 @@ ws_handler(nni_aio *aio) ws->res = res; ws->server = true; ws->maxframe = l->maxframe; - - // XXX: Inherit fragmentation? (Frag is limited for now). + ws->fragsize = l->fragsize; + ws->recvmax = l->recvmax; + ws->isstream = l->isstream; nni_list_append(&l->reply, ws); nni_aio_set_data(ws->httpaio, 0, l); @@ -1603,71 +1640,6 @@ err: } } -int -nni_ws_listener_init(nni_ws_listener **wslp, nni_url *url) -{ - nni_ws_listener *l; - int rv; - char * host; - - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&l->mtx); - nni_cv_init(&l->cv, &l->mtx); - nni_aio_list_init(&l->aios); - - NNI_LIST_INIT(&l->pend, nni_ws, node); - NNI_LIST_INIT(&l->reply, nni_ws, node); - - // make a private copy of the url structure. - if ((rv = nni_url_clone(&l->url, url)) != 0) { - nni_ws_listener_fini(l); - return (rv); - } - - host = l->url->u_hostname; - if (strlen(host) == 0) { - host = NULL; - } - rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler); - if (rv != 0) { - nni_ws_listener_fini(l); - return (rv); - } - - if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) || - ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) || - ((rv = nni_http_server_init(&l->server, url)) != 0)) { - nni_ws_listener_fini(l); - return (rv); - } - - l->maxframe = 0; - *wslp = l; - return (0); -} - -int -nni_ws_listener_proto(nni_ws_listener *l, const char *proto) -{ - int rv = 0; - char *ns; - nni_mtx_lock(&l->mtx); - if (l->started) { - rv = NNG_EBUSY; - } else if ((ns = nni_strdup(proto)) == NULL) { - rv = NNG_ENOMEM; - } else { - if (l->proto != NULL) { - nni_strfree(l->proto); - } - l->proto = ns; - } - nni_mtx_unlock(&l->mtx); - return (rv); -} - static void ws_accept_cancel(nni_aio *aio, void *arg, int rv) { @@ -1681,11 +1653,12 @@ ws_accept_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&l->mtx); } -void -nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) +static void +ws_listener_accept(void *arg, nni_aio *aio) { - nni_ws *ws; - int rv; + nni_ws_listener *l = arg; + nni_ws * ws; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -1717,10 +1690,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) nni_mtx_unlock(&l->mtx); } -void -nni_ws_listener_close(nni_ws_listener *l) +static void +ws_listener_close(void *arg) { - nni_ws *ws; + nni_ws_listener *l = arg; + nni_ws * ws; nni_mtx_lock(&l->mtx); if (l->closed) { nni_mtx_unlock(&l->mtx); @@ -1733,18 +1707,30 @@ nni_ws_listener_close(nni_ws_listener *l) l->started = false; } NNI_LIST_FOREACH (&l->pend, ws) { - nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + ws_close_error(ws, WS_CLOSE_GOING_AWAY); } NNI_LIST_FOREACH (&l->reply, ws) { - nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + ws_close_error(ws, WS_CLOSE_GOING_AWAY); } nni_mtx_unlock(&l->mtx); } -int -nni_ws_listener_listen(nni_ws_listener *l) +// XXX: Consider replacing this with an option. +void +nni_ws_listener_hook( + nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg) { - int rv; + nni_mtx_lock(&l->mtx); + l->hookfn = hookfn; + l->hookarg = hookarg; + nni_mtx_unlock(&l->mtx); +} + +static int +ws_listener_listen(void *arg) +{ + nni_ws_listener *l = arg; + int rv; nni_mtx_lock(&l->mtx); if (l->closed) { @@ -1777,42 +1763,290 @@ nni_ws_listener_listen(nni_ws_listener *l) return (0); } -void -nni_ws_listener_hook( - nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg) +static int +ws_listener_set_size( + nni_ws_listener *l, size_t *valp, const void *buf, size_t sz, nni_type t) { - nni_mtx_lock(&l->mtx); - l->hookfn = hookfn; - l->hookarg = hookarg; - nni_mtx_unlock(&l->mtx); + size_t val; + int rv; + + // Max size is limited to 4 GB, but you really never want to have + // to have a larger value. If you think you need that, you're doing it + // wrong. You *can* set the size to 0 for unlimited. + if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&l->mtx); + *valp = val; + nni_mtx_unlock(&l->mtx); + } + return (rv); } -int -nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls) +static int +ws_listener_get_size( + nni_ws_listener *l, size_t *valp, void *buf, size_t *szp, nni_type t) { - int rv; + size_t val; nni_mtx_lock(&l->mtx); - rv = nni_http_server_set_tls(l->server, tls); + val = *valp; nni_mtx_unlock(&l->mtx); + return (nni_copyout_size(val, buf, szp, t)); +} + +static int +ws_listener_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->maxframe, buf, sz, t)); +} + +static int +ws_listener_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->maxframe, buf, szp, t)); +} + +static int +ws_listener_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->fragsize, buf, sz, t)); +} + +static int +ws_listener_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->fragsize, buf, szp, t)); +} + +static int +ws_listener_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->recvmax, buf, sz, t)); +} + +static int +ws_listener_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->recvmax, buf, szp, t)); +} + +static int +ws_listener_set_res_headers(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + rv = ws_set_headers(&l->headers, buf); + nni_mtx_unlock(&l->mtx); + } return (rv); } -int -nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp) +static int +ws_listener_set_proto(void *arg, const void *buf, size_t sz, nni_type t) { - int rv; + nni_ws_listener *l = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + char *ns; + if ((ns = nni_strdup(buf)) == NULL) { + rv = NNG_ENOMEM; + } else { + nni_mtx_lock(&l->mtx); + if (l->proto != NULL) { + nni_strfree(l->proto); + } + l->proto = ns; + nni_mtx_unlock(&l->mtx); + } + } + return (rv); +} + +static int +ws_listener_get_proto(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; nni_mtx_lock(&l->mtx); - rv = nni_http_server_get_tls(l->server, tlsp); + rv = nni_copyout_str(l->proto != NULL ? l->proto : "", buf, szp, t); nni_mtx_unlock(&l->mtx); return (rv); } -void -nni_ws_listener_set_maxframe(nni_ws_listener *l, size_t maxframe) +static int +ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) { + nni_ws_listener *l = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->isstream = !b; + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); - l->maxframe = maxframe; + rv = nni_copyout_str(l->url->u_rawurl, buf, szp, t); nni_mtx_unlock(&l->mtx); + return (rv); +} + +static const nni_option ws_listener_options[] = { + { + .o_name = NNI_OPT_WS_MSGMODE, + .o_set = ws_listener_set_msgmode, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_set = ws_listener_set_maxframe, + .o_get = ws_listener_get_maxframe, + }, + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_set = ws_listener_set_fragsize, + .o_get = ws_listener_get_fragsize, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_set = ws_listener_set_recvmax, + .o_get = ws_listener_get_recvmax, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_set = ws_listener_set_res_headers, + // XXX: Get not implemented yet; likely of marginal value. + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_set = ws_listener_set_proto, + .o_get = ws_listener_get_proto, + }, + { + .o_name = NNG_OPT_URL, + .o_get = ws_listener_get_url, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_listener_set_header(nni_ws_listener *l, const char *name, const void *buf, + size_t sz, nni_type t) +{ + int rv; + name += strlen(NNG_OPT_WS_RESPONSE_HEADER); + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + rv = ws_set_header(&l->headers, name, buf); + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + rv = nni_setopt(ws_listener_options, name, l, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_server_setx(l->server, name, buf, sz, t); + } + + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_listener_set_header(l, name, buf, sz, t); + } + } + return (rv); +} + +static int +ws_listener_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + rv = nni_getopt(ws_listener_options, name, l, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_server_getx(l->server, name, buf, szp, t); + } + return (rv); +} + +int +nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url) +{ + nni_ws_listener *l; + int rv; + char * host; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + nni_cv_init(&l->cv, &l->mtx); + nni_aio_list_init(&l->aios); + + NNI_LIST_INIT(&l->pend, nni_ws, node); + NNI_LIST_INIT(&l->reply, nni_ws, node); + + // make a private copy of the url structure. + if ((rv = nng_url_clone(&l->url, url)) != 0) { + ws_listener_free(l); + return (rv); + } + + host = l->url->u_hostname; + if (strlen(host) == 0) { + host = NULL; + } + rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler); + if (rv != 0) { + ws_listener_free(l); + return (rv); + } + + if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) || + ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) || + ((rv = nni_http_server_init(&l->server, url)) != 0)) { + ws_listener_free(l); + return (rv); + } + + l->fragsize = WS_DEF_MAXTXFRAME; + l->maxframe = WS_DEF_MAXRXFRAME; + l->recvmax = WS_DEF_RECVMAX; + l->isstream = false; + l->ops.sl_free = ws_listener_free; + l->ops.sl_close = ws_listener_close; + l->ops.sl_accept = ws_listener_accept; + l->ops.sl_listen = ws_listener_listen; + l->ops.sl_setx = ws_listener_setx; + l->ops.sl_getx = ws_listener_getx; + *wslp = (void *) l; + return (0); } void @@ -1846,7 +2080,7 @@ ws_conn_cb(void *arg) nni_cv_wake(&d->cv); } nni_mtx_unlock(&d->mtx); - nni_ws_fini(ws); + ws_reap(ws); } else { nni_mtx_unlock(&d->mtx); } @@ -1861,7 +2095,7 @@ ws_conn_cb(void *arg) // This request was canceled for some reason. nni_http_conn_fini(http); nni_mtx_unlock(&ws->mtx); - nni_ws_fini(ws); + ws_reap(ws); return; } @@ -1908,13 +2142,14 @@ err: if (req != NULL) { nni_http_req_free(req); } - nni_ws_fini(ws); + ws_reap(ws); } -void -nni_ws_dialer_fini(nni_ws_dialer *d) +static void +ws_dialer_free(void *arg) { - ws_header *hdr; + nni_ws_dialer *d = arg; + ws_header * hdr; nni_mtx_lock(&d->mtx); while (!nni_list_empty(&d->wspend)) { @@ -1933,65 +2168,18 @@ nni_ws_dialer_fini(nni_ws_dialer *d) nni_http_client_fini(d->client); } if (d->url) { - nni_url_free(d->url); + nng_url_free(d->url); } nni_cv_fini(&d->cv); nni_mtx_fini(&d->mtx); NNI_FREE_STRUCT(d); } -int -nni_ws_dialer_init(nni_ws_dialer **dp, nni_url *url) -{ - nni_ws_dialer *d; - int rv; - - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&d->headers, ws_header, node); - NNI_LIST_INIT(&d->wspend, nni_ws, node); - nni_mtx_init(&d->mtx); - nni_cv_init(&d->cv, &d->mtx); - - if ((rv = nni_url_clone(&d->url, url)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - - if ((rv = nni_http_client_init(&d->client, url)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - d->maxframe = 0; - *dp = d; - return (0); -} - -int -nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls) -{ - int rv; - nni_mtx_lock(&d->mtx); - rv = nni_http_client_set_tls(d->client, tls); - nni_mtx_unlock(&d->mtx); - return (rv); -} - -int -nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp) -{ - int rv; - nni_mtx_lock(&d->mtx); - rv = nni_http_client_get_tls(d->client, tlsp); - nni_mtx_unlock(&d->mtx); - return (rv); -} - -void -nni_ws_dialer_close(nni_ws_dialer *d) +static void +ws_dialer_close(void *arg) { - nni_ws *ws; + nni_ws_dialer *d = arg; + nni_ws * ws; nni_mtx_lock(&d->mtx); if (d->closed) { nni_mtx_unlock(&d->mtx); @@ -2005,24 +2193,6 @@ nni_ws_dialer_close(nni_ws_dialer *d) nni_mtx_unlock(&d->mtx); } -int -nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) -{ - int rv = 0; - char *ns; - nni_mtx_lock(&d->mtx); - if ((ns = nni_strdup(proto)) == NULL) { - rv = NNG_ENOMEM; - } else { - if (d->proto != NULL) { - nni_strfree(d->proto); - } - d->proto = ns; - } - nni_mtx_unlock(&d->mtx); - return (rv); -} - static void ws_dial_cancel(nni_aio *aio, void *arg, int rv) { @@ -2038,11 +2208,12 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&ws->mtx); } -void -nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) +static void +ws_dialer_dial(void *arg, nni_aio *aio) { - nni_ws *ws; - int rv; + nni_ws_dialer *d = arg; + nni_ws * ws; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -2055,72 +2226,275 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) if (d->closed) { nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); - ws_fini(ws); + ws_reap(ws); return; } if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) { nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); - ws_fini(ws); + ws_reap(ws); return; } ws->dialer = d; ws->useraio = aio; ws->server = false; ws->maxframe = d->maxframe; + ws->isstream = d->isstream; nni_list_append(&d->wspend, ws); nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); } static int -ws_set_header(nni_list *l, const char *n, const char *v) +ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) { - ws_header *hdr; - char * nv; + nni_ws_dialer *d = arg; + int rv; + bool b; - if ((nv = nni_strdup(v)) == NULL) { - return (NNG_ENOMEM); + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->isstream = !b; + nni_mtx_unlock(&d->mtx); } + return (rv); +} - NNI_LIST_FOREACH (l, hdr) { - if (nni_strcasecmp(hdr->name, n) == 0) { - nni_strfree(hdr->value); - hdr->value = nv; - return (0); - } - } +static int +ws_dialer_set_size( + nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t) +{ + size_t val; + int rv; - if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) { - nni_strfree(nv); - return (NNG_ENOMEM); - } - if ((hdr->name = nni_strdup(n)) == NULL) { - nni_strfree(nv); - NNI_FREE_STRUCT(hdr); - return (NNG_ENOMEM); + // Max size is limited to 4 GB, but you really never want to have + // to have a larger value. If you think you need that, you're doing it + // wrong. You *can* set the size to 0 for unlimited. + if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + *valp = val; + nni_mtx_unlock(&d->mtx); } - hdr->value = nv; - nni_list_append(l, hdr); - return (0); + return (rv); } -int -nni_ws_dialer_header(nni_ws_dialer *d, const char *n, const char *v) +static int +ws_dialer_get_size( + nni_ws_dialer *d, size_t *valp, void *buf, size_t *szp, nni_type t) { - int rv; + size_t val; nni_mtx_lock(&d->mtx); - rv = ws_set_header(&d->headers, n, v); + val = *valp; nni_mtx_unlock(&d->mtx); + return (nni_copyout_size(val, buf, szp, t)); +} + +static int +ws_dialer_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->maxframe, buf, sz, t)); +} + +static int +ws_dialer_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->maxframe, buf, szp, t)); +} + +static int +ws_dialer_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->fragsize, buf, sz, t)); +} + +static int +ws_dialer_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->fragsize, buf, szp, t)); +} + +static int +ws_dialer_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->recvmax, buf, sz, t)); +} + +static int +ws_dialer_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->recvmax, buf, szp, t)); +} + +static int +ws_dialer_set_req_headers(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + rv = ws_set_headers(&d->headers, buf); + nni_mtx_unlock(&d->mtx); + } return (rv); } -void -nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe) +static int +ws_dialer_set_proto(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + char *ns; + if ((ns = nni_strdup(buf)) == NULL) { + rv = NNG_ENOMEM; + } else { + nni_mtx_lock(&d->mtx); + if (d->proto != NULL) { + nni_strfree(d->proto); + } + d->proto = ns; + nni_mtx_unlock(&d->mtx); + } + } + return (rv); +} + +static int +ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t) { + nni_ws_dialer *d = arg; + int rv; nni_mtx_lock(&d->mtx); - d->maxframe = maxframe; + rv = nni_copyout_str(d->proto != NULL ? d->proto : "", buf, szp, t); nni_mtx_unlock(&d->mtx); + return (rv); +} + +static const nni_option ws_dialer_options[] = { + { + .o_name = NNI_OPT_WS_MSGMODE, + .o_set = ws_dialer_set_msgmode, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_set = ws_dialer_set_maxframe, + .o_get = ws_dialer_get_maxframe, + }, + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_set = ws_dialer_set_fragsize, + .o_get = ws_dialer_get_fragsize, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_set = ws_dialer_set_recvmax, + .o_get = ws_dialer_get_recvmax, + }, + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_set = ws_dialer_set_req_headers, + // XXX: Get not implemented yet; likely of marginal value. + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_set = ws_dialer_set_proto, + .o_get = ws_dialer_get_proto, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_dialer_set_header( + nni_ws_dialer *d, const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + name += strlen(NNG_OPT_WS_REQUEST_HEADER); + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + rv = ws_set_header(&d->headers, name, buf); + nni_mtx_unlock(&d->mtx); + } + return (rv); +} + +static int +ws_dialer_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + rv = nni_setopt(ws_dialer_options, name, d, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_client_setx(d->client, name, buf, sz, t); + } + + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_REQUEST_HEADER)) { + rv = ws_dialer_set_header(d, name, buf, sz, t); + } + } + return (rv); +} + +static int +ws_dialer_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + rv = nni_getopt(ws_dialer_options, name, d, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_client_getx(d->client, name, buf, szp, t); + } + return (rv); +} + +int +nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + nni_ws_dialer *d; + int rv; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&d->headers, ws_header, node); + NNI_LIST_INIT(&d->wspend, nni_ws, node); + nni_mtx_init(&d->mtx); + nni_cv_init(&d->cv, &d->mtx); + + if ((rv = nng_url_clone(&d->url, url)) != 0) { + ws_dialer_free(d); + return (rv); + } + + if ((rv = nni_http_client_init(&d->client, url)) != 0) { + ws_dialer_free(d); + return (rv); + } + d->isstream = true; + d->recvmax = WS_DEF_RECVMAX; + d->maxframe = WS_DEF_MAXRXFRAME; + d->fragsize = WS_DEF_MAXTXFRAME; + + d->ops.sd_free = ws_dialer_free; + d->ops.sd_close = ws_dialer_close; + d->ops.sd_dial = ws_dialer_dial; + d->ops.sd_setx = ws_dialer_setx; + d->ops.sd_getx = ws_dialer_getx; + *dp = (void *) d; + return (0); } // Dialer does not get a hook chance, as it can examine the request @@ -2130,3 +2504,293 @@ nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe) // The implementation will send periodic PINGs, and respond with // PONGs. + +static void +ws_str_free(void *arg) +{ + nni_ws *ws = arg; + nni_reap(&ws->reap, ws_fini, ws); +} + +static void +ws_str_close(void *arg) +{ + nni_ws *ws = arg; + ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); +} + +static void +ws_str_send(void *arg, nni_aio *aio) +{ + nni_ws * ws = arg; + int rv; + ws_frame *frame; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (!ws->isstream) { + nni_msg *msg; + unsigned niov; + nni_iov iov[2]; + if ((msg = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); + return; + } + niov = 0; + if (nng_msg_header_len(msg) > 0) { + iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + niov++; + } + iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + niov++; + + // Scribble into the iov for now. + nni_aio_set_iov(aio, niov, iov); + } + + if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { + nni_aio_finish_error(aio, NNG_ENOMEM); + return; + } + frame->aio = aio; + if ((rv = ws_frame_prep_tx(ws, frame)) != 0) { + nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); + return; + } + + nni_mtx_lock(&ws->mtx); + + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + ws_frame_fini(frame); + return; + } + if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); + return; + } + nni_aio_set_prov_extra(aio, 0, frame); + nni_list_append(&ws->sendq, aio); + nni_list_append(&ws->txq, frame); + ws_start_write(ws); + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_str_recv(void *arg, nng_aio *aio) +{ + nni_ws *ws = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ws->mtx); + if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ws->recvq, aio); + if (nni_list_first(&ws->recvq) == aio) { + ws_read_finish_msg(ws); + } + ws_start_read(ws); + + nni_mtx_unlock(&ws->mtx); +} + +static int +ws_get_request_headers(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + nni_mtx_lock(&ws->mtx); + if (ws->reqhdrs == NULL) { + ws->reqhdrs = nni_http_req_headers(ws->req); + } + nni_mtx_unlock(&ws->mtx); + return (nni_copyout_str(ws->reqhdrs, buf, szp, t)); +} + +static int +ws_get_response_headers(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + nni_mtx_lock(&ws->mtx); + if (ws->reshdrs == NULL) { + ws->reshdrs = nni_http_res_headers(ws->res); + } + nni_mtx_unlock(&ws->mtx); + return (nni_copyout_str(ws->reshdrs, buf, szp, t)); +} + +static int +ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t)); +} + +static const nni_option ws_options[] = { + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_get = ws_get_request_headers, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_get = ws_get_response_headers, + }, + { + .o_name = NNG_OPT_WS_REQUEST_URI, + .o_get = ws_get_request_uri, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_str_setx(void *arg, const char *nm, const void *buf, size_t sz, nni_type t) +{ + nni_ws *ws = arg; + int rv; + + // Headers can only be set. + nni_mtx_lock(&ws->mtx); + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); + rv = nni_http_conn_setopt(ws->http, nm, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(ws_options, nm, ws, buf, sz, t); + } + if (rv == NNG_ENOTSUP) { + if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER) || + startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) { + return (NNG_EREADONLY); + } + } + + return (rv); +} + +static int +ws_get_req_header( + nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t) +{ + const char *s; + nm += strlen(NNG_OPT_WS_REQUEST_HEADER); + s = nni_http_req_get_header(ws->req, nm); + if (s == NULL) { + return (NNG_ENOENT); + } + return (nni_copyout_str(s, buf, szp, t)); +} + +static int +ws_get_res_header( + nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t) +{ + const char *s; + nm += strlen(NNG_OPT_WS_RESPONSE_HEADER); + s = nni_http_res_get_header(ws->res, nm); + if (s == NULL) { + return (NNG_ENOENT); + } + return (nni_copyout_str(s, buf, szp, t)); +} + +static int +ws_str_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + int rv; + + nni_mtx_lock(&ws->mtx); + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); + rv = nni_http_conn_getopt(ws->http, nm, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(ws_options, nm, ws, buf, szp, t); + } + // Check for generic headers... + if (rv == NNG_ENOTSUP) { + if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER)) { + rv = ws_get_req_header(ws, nm, buf, szp, t); + } else if (startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_get_res_header(ws, nm, buf, szp, t); + } + } + return (rv); +} + +static int +ws_check_size(const void *buf, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t)); +} + +static const nni_chkoption ws_chkopts[] = { + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_check = ws_check_string, + }, + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_check = ws_check_string, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_check = ws_check_string, + }, + { + .o_name = NULL, + }, +}; + +int +nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t) +{ + int rv; + + rv = nni_chkopt(ws_chkopts, name, data, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("tcp", name, data, sz, t); + } + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("tls+tcp", name, data, sz, t); + } + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_REQUEST_HEADER) || + startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_check_string(data, sz, t); + } + } + // Potentially, add checks for header options. + return (rv); +} diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h index 88b4bfb4..bbc58d30 100644 --- a/src/supplemental/websocket/websocket.h +++ b/src/supplemental/websocket/websocket.h @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -18,7 +18,10 @@ typedef struct nni_ws nni_ws; typedef struct nni_ws_listener nni_ws_listener; typedef struct nni_ws_dialer nni_ws_dialer; -typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); +// Internal option, not for normal use (at present). This sets the +// dialer/listener into message mode. This is used by the SP transport. +// This is a boolean. +#define NNI_OPT_WS_MSGMODE "ws:msgmode" // Specify URL as ws://[<host>][:port][/path] // If host is missing, INADDR_ANY is assumed. If port is missing, @@ -26,44 +29,10 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); // on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY // makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?) -extern int nni_ws_listener_init(nni_ws_listener **, nni_url *); -extern void nni_ws_listener_fini(nni_ws_listener *); -extern void nni_ws_listener_close(nni_ws_listener *); -extern int nni_ws_listener_proto(nni_ws_listener *, const char *); -extern int nni_ws_listener_listen(nni_ws_listener *); -extern void nni_ws_listener_accept(nni_ws_listener *, nng_aio *); -extern void nni_ws_listener_hook( - nni_ws_listener *, nni_ws_listen_hook, void *); -extern int nni_ws_listener_set_tls(nni_ws_listener *, nng_tls_config *); -extern int nni_ws_listener_get_tls(nni_ws_listener *, nng_tls_config **s); -extern void nni_ws_listener_set_maxframe(nni_ws_listener *, size_t); - -extern int nni_ws_dialer_init(nni_ws_dialer **, nni_url *); -extern void nni_ws_dialer_fini(nni_ws_dialer *); -extern void nni_ws_dialer_close(nni_ws_dialer *); -extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *); -extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *); -extern void nni_ws_dialer_set_maxframe(nni_ws_dialer *, size_t); -extern void nni_ws_dialer_dial(nni_ws_dialer *, nng_aio *); -extern int nni_ws_dialer_set_tls(nni_ws_dialer *, nng_tls_config *); -extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **); - -// Dialer does not get a hook chance, as it can examine the request and reply -// after dial is done; this is not a 3-way handshake, so the dialer does -// not confirm the server's response at the HTTP level. (It can still issue -// a websocket close). - -extern void nni_ws_send_msg(nni_ws *, nng_aio *); -extern void nni_ws_recv_msg(nni_ws *, nng_aio *); -extern void nni_ws_close(nni_ws *); -extern void nni_ws_close_error(nni_ws *, uint16_t); -extern void nni_ws_fini(nni_ws *); -extern const char *nni_ws_response_headers(nni_ws *); -extern const char *nni_ws_request_headers(nni_ws *); -extern int nni_ws_getopt(nni_ws *, const char *, void *, size_t *, nni_type); -extern int nni_ws_setopt( - nni_ws *, const char *, const void *, size_t, nni_type); - -// The implementation will send periodic PINGs, and respond with PONGs. +// Much of the websocket API is still "private", meeaning you should not +// rely upon it being around. +extern int nni_ws_listener_alloc(nng_stream_listener **, const nni_url *); +extern int nni_ws_dialer_alloc(nng_stream_dialer **, const nni_url *); +extern int nni_ws_checkopt(const char *, const void *, size_t, nni_type); #endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H |
