aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-10-26 00:09:23 -0700
committerGarrett D'Amore <garrett@damore.org>2020-10-26 21:17:06 -0700
commitcaefd5cb745e2d1e8454dcda6753262f95812de4 (patch)
treef5d4b081670b19a8b979ae4a5eae23b91daa1126 /src/supplemental
parent4e7205cedb5f631d0fbbe72dbf89b5b9205a6260 (diff)
downloadnng-caefd5cb745e2d1e8454dcda6753262f95812de4.tar.gz
nng-caefd5cb745e2d1e8454dcda6753262f95812de4.tar.bz2
nng-caefd5cb745e2d1e8454dcda6753262f95812de4.zip
fixes #914 websocket stream mode should support TEXT
This adds new options, NNG_OPT_WS_SEND_TEXT and NNG_OPT_WS_RECV_TEXT that permit communication with WebSocket peers that insist on using TEXT frames (stream mode only). The support is limited, as NNG does no validation of the frame contents to check for UTF-8 compliance.
Diffstat (limited to 'src/supplemental')
-rw-r--r--src/supplemental/websocket/websocket.c229
-rw-r--r--src/supplemental/websocket/websocket_test.c152
2 files changed, 349 insertions, 32 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index df4a0834..44117f7b 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -28,7 +28,7 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
// We have chosen to be a bit more stringent in the size of the frames that
// we send, while we more generously allow larger incoming frames. These
// may be tuned by options.
-#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
+#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv)
#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send)
@@ -54,6 +54,8 @@ struct nni_ws {
bool wclose;
bool isstream;
bool inmsg;
+ bool send_text;
+ bool recv_text;
nni_mtx mtx;
nni_list sendq;
nni_list recvq;
@@ -92,6 +94,8 @@ struct nni_ws_listener {
bool started;
bool closed;
bool isstream;
+ bool send_text;
+ bool recv_text;
nni_http_handler * handler;
nni_ws_listen_hook hookfn;
void * hookarg;
@@ -119,6 +123,8 @@ struct nni_ws_dialer {
nni_list wspend; // ws structures still negotiating
bool closed;
bool isstream;
+ bool send_text;
+ bool recv_text;
nni_list headers; // request headers
size_t maxframe;
size_t fragsize;
@@ -455,7 +461,7 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
return (NNG_ENOMEM);
}
frame->asize = frame->len;
- frame->buf = frame->adata;
+ frame->buf = frame->adata;
}
buf = frame->buf;
@@ -474,7 +480,11 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
if (nni_aio_count(aio) == 0) {
// This is the first frame.
- frame->op = WS_BINARY;
+ if (ws->send_text) {
+ frame->op = WS_TEXT;
+ } else {
+ frame->op = WS_BINARY;
+ }
} else {
frame->op = WS_CONT;
}
@@ -944,6 +954,12 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws->rxframe = NULL;
nni_list_append(&ws->rxq, frame);
break;
+ case WS_TEXT:
+ if (!ws->recv_text) {
+ // No support for text mode at present.
+ ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
+ }
+ // FALLTHROUGH
case WS_BINARY:
if (ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
@@ -955,10 +971,6 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws->rxframe = NULL;
nni_list_append(&ws->rxq, frame);
break;
- case WS_TEXT:
- // No support for text mode at present.
- ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
- return;
case WS_PING:
if (frame->len > 125) {
@@ -1120,7 +1132,7 @@ ws_read_cb(void *arg)
return;
}
frame->asize = frame->len;
- frame->buf = frame->adata;
+ frame->buf = frame->adata;
}
iov.iov_buf = frame->buf;
@@ -1624,14 +1636,16 @@ ws_handler(nni_aio *aio)
status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
- ws->http = conn;
- ws->req = req;
- ws->res = res;
- ws->server = true;
- ws->maxframe = l->maxframe;
- ws->fragsize = l->fragsize;
- ws->recvmax = l->recvmax;
- ws->isstream = l->isstream;
+ ws->http = conn;
+ ws->req = req;
+ ws->res = res;
+ ws->server = true;
+ ws->maxframe = l->maxframe;
+ ws->fragsize = l->fragsize;
+ ws->recvmax = l->recvmax;
+ ws->isstream = l->isstream;
+ ws->recv_text = l->recv_text;
+ ws->send_text = l->send_text;
nni_list_append(&l->reply, ws);
nni_aio_set_data(ws->httpaio, 0, l);
@@ -1907,6 +1921,58 @@ ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
+ws_listener_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->recv_text = b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_set_send_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->send_text = b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->recv_text, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static int
+ws_listener_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->send_text, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static int
ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_ws_listener *l = arg;
@@ -1953,6 +2019,16 @@ static const nni_option ws_listener_options[] = {
.o_get = ws_listener_get_url,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_set = ws_listener_set_recv_text,
+ .o_get = ws_listener_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_set = ws_listener_set_send_text,
+ .o_get = ws_listener_get_send_text,
+ },
+ {
.o_name = NULL,
},
};
@@ -2245,11 +2321,13 @@ ws_dialer_dial(void *arg, nni_aio *aio)
ws_reap(ws);
return;
}
- ws->dialer = d;
- ws->useraio = aio;
- ws->server = false;
- ws->maxframe = d->maxframe;
- ws->isstream = d->isstream;
+ ws->dialer = d;
+ ws->useraio = aio;
+ ws->server = false;
+ ws->maxframe = d->maxframe;
+ ws->isstream = d->isstream;
+ ws->recv_text = d->recv_text;
+ ws->send_text = d->send_text;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
@@ -2271,6 +2349,36 @@ ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
+ws_dialer_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->recv_text = b;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_set_send_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->send_text = b;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
ws_dialer_set_size(
nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t)
{
@@ -2388,6 +2496,28 @@ ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
return (rv);
}
+static int
+ws_dialer_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_bool(d->recv_text, buf, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static int
+ws_dialer_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_bool(d->send_text, buf, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
static const nni_option ws_dialer_options[] = {
{
.o_name = NNI_OPT_WS_MSGMODE,
@@ -2419,6 +2549,17 @@ static const nni_option ws_dialer_options[] = {
.o_get = ws_dialer_get_proto,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_set = ws_dialer_set_recv_text,
+ .o_get = ws_dialer_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_set = ws_dialer_set_send_text,
+ .o_get = ws_dialer_get_send_text,
+ },
+
+ {
.o_name = NULL,
},
};
@@ -2649,6 +2790,30 @@ ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t));
}
+static int
+ws_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ bool b;
+ nni_mtx_lock(&ws->mtx);
+ b = ws->recv_text;
+ nni_mtx_unlock(&ws->mtx);
+
+ return (nni_copyout_bool(b, buf, szp, t));
+}
+
+static int
+ws_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ bool b;
+ nni_mtx_lock(&ws->mtx);
+ b = ws->send_text;
+ nni_mtx_unlock(&ws->mtx);
+
+ return (nni_copyout_bool(b, buf, szp, t));
+}
+
static const nni_option ws_options[] = {
{
.o_name = NNG_OPT_WS_REQUEST_HEADERS,
@@ -2663,6 +2828,14 @@ static const nni_option ws_options[] = {
.o_get = ws_get_request_uri,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_get = ws_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_get = ws_get_send_text,
+ },
+ {
.o_name = NULL,
},
};
@@ -2753,6 +2926,12 @@ ws_check_size(const void *buf, size_t sz, nni_type t)
return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
}
+static int
+ws_check_bool(const void *buf, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
+}
+
static const nni_chkoption ws_chkopts[] = {
{
.o_name = NNG_OPT_WS_SENDMAXFRAME,
@@ -2779,6 +2958,14 @@ static const nni_chkoption ws_chkopts[] = {
.o_check = ws_check_string,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_check = ws_check_bool,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_check = ws_check_bool,
+ },
+ {
.o_name = NULL,
},
};
diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c
index 9d99f2c2..9a298142 100644
--- a/src/supplemental/websocket/websocket_test.c
+++ b/src/supplemental/websocket/websocket_test.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -175,24 +175,24 @@ test_websocket_conn_props(void)
TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_LOCADDR, &sa1));
TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_REMADDR, &sa2));
TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x",
- sa1.s_family, sa2.s_family);
+ sa1.s_family, sa2.s_family);
TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr,
- "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
- testutil_htonl(sa2.s_in.sa_addr));
+ "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
+ testutil_htonl(sa2.s_in.sa_addr));
TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port,
- "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
- testutil_htons(sa2.s_in.sa_port));
+ "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
+ testutil_htons(sa2.s_in.sa_port));
TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_REMADDR, &sa1));
TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_LOCADDR, &sa2));
TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x",
- sa1.s_family, sa2.s_family);
+ sa1.s_family, sa2.s_family);
TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr,
- "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
- testutil_htonl(sa2.s_in.sa_addr));
+ "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
+ testutil_htonl(sa2.s_in.sa_addr));
TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port,
- "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
- testutil_htons(sa2.s_in.sa_port));
+ "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
+ testutil_htons(sa2.s_in.sa_port));
on = true;
TEST_NNG_PASS(nng_stream_set_bool(c1, NNG_OPT_TCP_NODELAY, on));
@@ -234,6 +234,135 @@ test_websocket_conn_props(void)
nng_stream_dialer_free(d);
}
+void
+test_websocket_text_mode(void)
+{
+ nng_stream_dialer * d = NULL;
+ nng_stream_listener *l = NULL;
+ nng_aio * daio = NULL;
+ nng_aio * laio = NULL;
+ nng_aio * aio1 = NULL;
+ nng_aio * aio2 = NULL;
+ nng_stream * c1 = NULL;
+ nng_stream * c2 = NULL;
+ char uri[64];
+ char txb[5];
+ char rxb[5];
+ bool on;
+ uint16_t port = testutil_next_port();
+ nng_iov iov;
+
+ (void) snprintf(uri, sizeof(uri), "ws://127.0.0.1:%d/test", port);
+
+ TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+ nng_aio_set_timeout(daio, 5000); // 5 seconds
+ nng_aio_set_timeout(laio, 5000);
+ nng_aio_set_timeout(aio1, 5000);
+ nng_aio_set_timeout(aio2, 5000);
+
+ TEST_NNG_PASS(nng_stream_listener_alloc(&l, uri));
+ TEST_NNG_PASS(nng_stream_dialer_alloc(&d, uri));
+
+ on = true;
+ TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_SEND_TEXT, on));
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on));
+
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == true);
+
+ on = false;
+ TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_RECV_TEXT, on));
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_SEND_TEXT, on));
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+ TEST_NNG_PASS(nng_stream_listener_listen(l));
+ nng_stream_dialer_dial(d, daio);
+ nng_stream_listener_accept(l, laio);
+
+ nng_aio_wait(laio);
+ nng_aio_wait(daio);
+
+ TEST_NNG_PASS(nng_aio_result(laio));
+ TEST_NNG_PASS(nng_aio_result(daio));
+ c1 = nng_aio_get_output(laio, 0);
+ c2 = nng_aio_get_output(daio, 0);
+ TEST_CHECK(c1 != NULL);
+ TEST_CHECK(c2 != NULL);
+
+ TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on));
+
+ TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+ memcpy(txb, "PING", 5);
+ iov.iov_buf = txb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio1, 1, &iov);
+ nng_stream_send(c1, aio1);
+ iov.iov_buf = rxb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio2, 1, &iov);
+ nng_stream_recv(c2, aio2);
+
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+ TEST_NNG_PASS(nng_aio_result(aio1));
+ TEST_NNG_PASS(nng_aio_result(aio2));
+ TEST_ASSERT(memcmp(rxb, txb, 5) == 0);
+
+ memset(rxb, 0, 5);
+ memcpy(txb, "PONG", 5);
+
+ iov.iov_buf = txb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio2, 1, &iov);
+ nng_stream_send(c2, aio2);
+ iov.iov_buf = rxb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio1, 1, &iov);
+ nng_stream_recv(c1, aio1);
+
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+ TEST_NNG_PASS(nng_aio_result(aio1));
+ TEST_NNG_PASS(nng_aio_result(aio2));
+ TEST_ASSERT(memcmp(rxb, txb, 5) == 0);
+
+ nng_stream_close(c1);
+ nng_stream_free(c1);
+ nng_stream_close(c2);
+ nng_stream_free(c2);
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+ nng_aio_free(daio);
+ nng_aio_free(laio);
+ nng_stream_listener_free(l);
+ nng_stream_dialer_free(d);
+}
+
typedef struct recv_state {
nng_stream * c;
int total;
@@ -410,5 +539,6 @@ TEST_LIST = {
{ "websocket stream wildcard", test_websocket_wildcard },
{ "websocket conn properties", test_websocket_conn_props },
{ "websocket fragmentation", test_websocket_fragmentation },
+ { "websocket text mode", test_websocket_text_mode },
{ NULL, NULL },
};