aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/supplemental/websocket/websocket.c229
-rw-r--r--src/supplemental/websocket/websocket_test.c152
2 files changed, 349 insertions, 32 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index df4a0834..44117f7b 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -28,7 +28,7 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
// We have chosen to be a bit more stringent in the size of the frames that
// we send, while we more generously allow larger incoming frames. These
// may be tuned by options.
-#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
+#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv)
#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send)
@@ -54,6 +54,8 @@ struct nni_ws {
bool wclose;
bool isstream;
bool inmsg;
+ bool send_text;
+ bool recv_text;
nni_mtx mtx;
nni_list sendq;
nni_list recvq;
@@ -92,6 +94,8 @@ struct nni_ws_listener {
bool started;
bool closed;
bool isstream;
+ bool send_text;
+ bool recv_text;
nni_http_handler * handler;
nni_ws_listen_hook hookfn;
void * hookarg;
@@ -119,6 +123,8 @@ struct nni_ws_dialer {
nni_list wspend; // ws structures still negotiating
bool closed;
bool isstream;
+ bool send_text;
+ bool recv_text;
nni_list headers; // request headers
size_t maxframe;
size_t fragsize;
@@ -455,7 +461,7 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
return (NNG_ENOMEM);
}
frame->asize = frame->len;
- frame->buf = frame->adata;
+ frame->buf = frame->adata;
}
buf = frame->buf;
@@ -474,7 +480,11 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
if (nni_aio_count(aio) == 0) {
// This is the first frame.
- frame->op = WS_BINARY;
+ if (ws->send_text) {
+ frame->op = WS_TEXT;
+ } else {
+ frame->op = WS_BINARY;
+ }
} else {
frame->op = WS_CONT;
}
@@ -944,6 +954,12 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws->rxframe = NULL;
nni_list_append(&ws->rxq, frame);
break;
+ case WS_TEXT:
+ if (!ws->recv_text) {
+ // No support for text mode at present.
+ ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
+ }
+ // FALLTHROUGH
case WS_BINARY:
if (ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
@@ -955,10 +971,6 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws->rxframe = NULL;
nni_list_append(&ws->rxq, frame);
break;
- case WS_TEXT:
- // No support for text mode at present.
- ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
- return;
case WS_PING:
if (frame->len > 125) {
@@ -1120,7 +1132,7 @@ ws_read_cb(void *arg)
return;
}
frame->asize = frame->len;
- frame->buf = frame->adata;
+ frame->buf = frame->adata;
}
iov.iov_buf = frame->buf;
@@ -1624,14 +1636,16 @@ ws_handler(nni_aio *aio)
status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
- ws->http = conn;
- ws->req = req;
- ws->res = res;
- ws->server = true;
- ws->maxframe = l->maxframe;
- ws->fragsize = l->fragsize;
- ws->recvmax = l->recvmax;
- ws->isstream = l->isstream;
+ ws->http = conn;
+ ws->req = req;
+ ws->res = res;
+ ws->server = true;
+ ws->maxframe = l->maxframe;
+ ws->fragsize = l->fragsize;
+ ws->recvmax = l->recvmax;
+ ws->isstream = l->isstream;
+ ws->recv_text = l->recv_text;
+ ws->send_text = l->send_text;
nni_list_append(&l->reply, ws);
nni_aio_set_data(ws->httpaio, 0, l);
@@ -1907,6 +1921,58 @@ ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
+ws_listener_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->recv_text = b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_set_send_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->send_text = b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->recv_text, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static int
+ws_listener_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_copyout_bool(l->send_text, buf, szp, t);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static int
ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_ws_listener *l = arg;
@@ -1953,6 +2019,16 @@ static const nni_option ws_listener_options[] = {
.o_get = ws_listener_get_url,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_set = ws_listener_set_recv_text,
+ .o_get = ws_listener_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_set = ws_listener_set_send_text,
+ .o_get = ws_listener_get_send_text,
+ },
+ {
.o_name = NULL,
},
};
@@ -2245,11 +2321,13 @@ ws_dialer_dial(void *arg, nni_aio *aio)
ws_reap(ws);
return;
}
- ws->dialer = d;
- ws->useraio = aio;
- ws->server = false;
- ws->maxframe = d->maxframe;
- ws->isstream = d->isstream;
+ ws->dialer = d;
+ ws->useraio = aio;
+ ws->server = false;
+ ws->maxframe = d->maxframe;
+ ws->isstream = d->isstream;
+ ws->recv_text = d->recv_text;
+ ws->send_text = d->send_text;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
@@ -2271,6 +2349,36 @@ ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
+ws_dialer_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->recv_text = b;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_set_send_text(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->send_text = b;
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
ws_dialer_set_size(
nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t)
{
@@ -2388,6 +2496,28 @@ ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
return (rv);
}
+static int
+ws_dialer_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_bool(d->recv_text, buf, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static int
+ws_dialer_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_copyout_bool(d->send_text, buf, szp, t);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
static const nni_option ws_dialer_options[] = {
{
.o_name = NNI_OPT_WS_MSGMODE,
@@ -2419,6 +2549,17 @@ static const nni_option ws_dialer_options[] = {
.o_get = ws_dialer_get_proto,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_set = ws_dialer_set_recv_text,
+ .o_get = ws_dialer_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_set = ws_dialer_set_send_text,
+ .o_get = ws_dialer_get_send_text,
+ },
+
+ {
.o_name = NULL,
},
};
@@ -2649,6 +2790,30 @@ ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t));
}
+static int
+ws_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ bool b;
+ nni_mtx_lock(&ws->mtx);
+ b = ws->recv_text;
+ nni_mtx_unlock(&ws->mtx);
+
+ return (nni_copyout_bool(b, buf, szp, t));
+}
+
+static int
+ws_get_send_text(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ bool b;
+ nni_mtx_lock(&ws->mtx);
+ b = ws->send_text;
+ nni_mtx_unlock(&ws->mtx);
+
+ return (nni_copyout_bool(b, buf, szp, t));
+}
+
static const nni_option ws_options[] = {
{
.o_name = NNG_OPT_WS_REQUEST_HEADERS,
@@ -2663,6 +2828,14 @@ static const nni_option ws_options[] = {
.o_get = ws_get_request_uri,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_get = ws_get_recv_text,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_get = ws_get_send_text,
+ },
+ {
.o_name = NULL,
},
};
@@ -2753,6 +2926,12 @@ ws_check_size(const void *buf, size_t sz, nni_type t)
return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
}
+static int
+ws_check_bool(const void *buf, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
+}
+
static const nni_chkoption ws_chkopts[] = {
{
.o_name = NNG_OPT_WS_SENDMAXFRAME,
@@ -2779,6 +2958,14 @@ static const nni_chkoption ws_chkopts[] = {
.o_check = ws_check_string,
},
{
+ .o_name = NNG_OPT_WS_RECV_TEXT,
+ .o_check = ws_check_bool,
+ },
+ {
+ .o_name = NNG_OPT_WS_SEND_TEXT,
+ .o_check = ws_check_bool,
+ },
+ {
.o_name = NULL,
},
};
diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c
index 9d99f2c2..9a298142 100644
--- a/src/supplemental/websocket/websocket_test.c
+++ b/src/supplemental/websocket/websocket_test.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -175,24 +175,24 @@ test_websocket_conn_props(void)
TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_LOCADDR, &sa1));
TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_REMADDR, &sa2));
TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x",
- sa1.s_family, sa2.s_family);
+ sa1.s_family, sa2.s_family);
TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr,
- "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
- testutil_htonl(sa2.s_in.sa_addr));
+ "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
+ testutil_htonl(sa2.s_in.sa_addr));
TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port,
- "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
- testutil_htons(sa2.s_in.sa_port));
+ "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
+ testutil_htons(sa2.s_in.sa_port));
TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_REMADDR, &sa1));
TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_LOCADDR, &sa2));
TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x",
- sa1.s_family, sa2.s_family);
+ sa1.s_family, sa2.s_family);
TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr,
- "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
- testutil_htonl(sa2.s_in.sa_addr));
+ "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr),
+ testutil_htonl(sa2.s_in.sa_addr));
TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port,
- "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
- testutil_htons(sa2.s_in.sa_port));
+ "ports match %u == %u", testutil_htons(sa1.s_in.sa_port),
+ testutil_htons(sa2.s_in.sa_port));
on = true;
TEST_NNG_PASS(nng_stream_set_bool(c1, NNG_OPT_TCP_NODELAY, on));
@@ -234,6 +234,135 @@ test_websocket_conn_props(void)
nng_stream_dialer_free(d);
}
+void
+test_websocket_text_mode(void)
+{
+ nng_stream_dialer * d = NULL;
+ nng_stream_listener *l = NULL;
+ nng_aio * daio = NULL;
+ nng_aio * laio = NULL;
+ nng_aio * aio1 = NULL;
+ nng_aio * aio2 = NULL;
+ nng_stream * c1 = NULL;
+ nng_stream * c2 = NULL;
+ char uri[64];
+ char txb[5];
+ char rxb[5];
+ bool on;
+ uint16_t port = testutil_next_port();
+ nng_iov iov;
+
+ (void) snprintf(uri, sizeof(uri), "ws://127.0.0.1:%d/test", port);
+
+ TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL));
+ TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL));
+ nng_aio_set_timeout(daio, 5000); // 5 seconds
+ nng_aio_set_timeout(laio, 5000);
+ nng_aio_set_timeout(aio1, 5000);
+ nng_aio_set_timeout(aio2, 5000);
+
+ TEST_NNG_PASS(nng_stream_listener_alloc(&l, uri));
+ TEST_NNG_PASS(nng_stream_dialer_alloc(&d, uri));
+
+ on = true;
+ TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_SEND_TEXT, on));
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on));
+
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == true);
+
+ on = false;
+ TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_RECV_TEXT, on));
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_SEND_TEXT, on));
+ TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+ TEST_NNG_PASS(nng_stream_listener_listen(l));
+ nng_stream_dialer_dial(d, daio);
+ nng_stream_listener_accept(l, laio);
+
+ nng_aio_wait(laio);
+ nng_aio_wait(daio);
+
+ TEST_NNG_PASS(nng_aio_result(laio));
+ TEST_NNG_PASS(nng_aio_result(daio));
+ c1 = nng_aio_get_output(laio, 0);
+ c2 = nng_aio_get_output(daio, 0);
+ TEST_CHECK(c1 != NULL);
+ TEST_CHECK(c2 != NULL);
+
+ TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == false);
+ TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(
+ nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on));
+
+ TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_SEND_TEXT, &on));
+ TEST_ASSERT(on == true);
+ TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_RECV_TEXT, &on));
+ TEST_ASSERT(on == false);
+
+ memcpy(txb, "PING", 5);
+ iov.iov_buf = txb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio1, 1, &iov);
+ nng_stream_send(c1, aio1);
+ iov.iov_buf = rxb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio2, 1, &iov);
+ nng_stream_recv(c2, aio2);
+
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+ TEST_NNG_PASS(nng_aio_result(aio1));
+ TEST_NNG_PASS(nng_aio_result(aio2));
+ TEST_ASSERT(memcmp(rxb, txb, 5) == 0);
+
+ memset(rxb, 0, 5);
+ memcpy(txb, "PONG", 5);
+
+ iov.iov_buf = txb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio2, 1, &iov);
+ nng_stream_send(c2, aio2);
+ iov.iov_buf = rxb;
+ iov.iov_len = 5;
+ nng_aio_set_iov(aio1, 1, &iov);
+ nng_stream_recv(c1, aio1);
+
+ nng_aio_wait(aio1);
+ nng_aio_wait(aio2);
+ TEST_NNG_PASS(nng_aio_result(aio1));
+ TEST_NNG_PASS(nng_aio_result(aio2));
+ TEST_ASSERT(memcmp(rxb, txb, 5) == 0);
+
+ nng_stream_close(c1);
+ nng_stream_free(c1);
+ nng_stream_close(c2);
+ nng_stream_free(c2);
+ nng_aio_free(aio1);
+ nng_aio_free(aio2);
+ nng_aio_free(daio);
+ nng_aio_free(laio);
+ nng_stream_listener_free(l);
+ nng_stream_dialer_free(d);
+}
+
typedef struct recv_state {
nng_stream * c;
int total;
@@ -410,5 +539,6 @@ TEST_LIST = {
{ "websocket stream wildcard", test_websocket_wildcard },
{ "websocket conn properties", test_websocket_conn_props },
{ "websocket fragmentation", test_websocket_fragmentation },
+ { "websocket text mode", test_websocket_text_mode },
{ NULL, NULL },
};