diff options
Diffstat (limited to 'src/supplemental')
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 229 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket_test.c | 152 |
2 files changed, 349 insertions, 32 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index df4a0834..44117f7b 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -28,7 +28,7 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); // We have chosen to be a bit more stringent in the size of the frames that // we send, while we more generously allow larger incoming frames. These // may be tuned by options. -#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) +#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) #define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv) #define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send) @@ -54,6 +54,8 @@ struct nni_ws { bool wclose; bool isstream; bool inmsg; + bool send_text; + bool recv_text; nni_mtx mtx; nni_list sendq; nni_list recvq; @@ -92,6 +94,8 @@ struct nni_ws_listener { bool started; bool closed; bool isstream; + bool send_text; + bool recv_text; nni_http_handler * handler; nni_ws_listen_hook hookfn; void * hookarg; @@ -119,6 +123,8 @@ struct nni_ws_dialer { nni_list wspend; // ws structures still negotiating bool closed; bool isstream; + bool send_text; + bool recv_text; nni_list headers; // request headers size_t maxframe; size_t fragsize; @@ -455,7 +461,7 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) return (NNG_ENOMEM); } frame->asize = frame->len; - frame->buf = frame->adata; + frame->buf = frame->adata; } buf = frame->buf; @@ -474,7 +480,11 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) if (nni_aio_count(aio) == 0) { // This is the first frame. - frame->op = WS_BINARY; + if (ws->send_text) { + frame->op = WS_TEXT; + } else { + frame->op = WS_BINARY; + } } else { frame->op = WS_CONT; } @@ -944,6 +954,12 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) ws->rxframe = NULL; nni_list_append(&ws->rxq, frame); break; + case WS_TEXT: + if (!ws->recv_text) { + // No support for text mode at present. + ws_close(ws, WS_CLOSE_UNSUPP_FORMAT); + } + // FALLTHROUGH case WS_BINARY: if (ws->inmsg) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); @@ -955,10 +971,6 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) ws->rxframe = NULL; nni_list_append(&ws->rxq, frame); break; - case WS_TEXT: - // No support for text mode at present. - ws_close(ws, WS_CLOSE_UNSUPP_FORMAT); - return; case WS_PING: if (frame->len > 125) { @@ -1120,7 +1132,7 @@ ws_read_cb(void *arg) return; } frame->asize = frame->len; - frame->buf = frame->adata; + frame->buf = frame->adata; } iov.iov_buf = frame->buf; @@ -1624,14 +1636,16 @@ ws_handler(nni_aio *aio) status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; goto err; } - ws->http = conn; - ws->req = req; - ws->res = res; - ws->server = true; - ws->maxframe = l->maxframe; - ws->fragsize = l->fragsize; - ws->recvmax = l->recvmax; - ws->isstream = l->isstream; + ws->http = conn; + ws->req = req; + ws->res = res; + ws->server = true; + ws->maxframe = l->maxframe; + ws->fragsize = l->fragsize; + ws->recvmax = l->recvmax; + ws->isstream = l->isstream; + ws->recv_text = l->recv_text; + ws->send_text = l->send_text; nni_list_append(&l->reply, ws); nni_aio_set_data(ws->httpaio, 0, l); @@ -1907,6 +1921,58 @@ ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) } static int +ws_listener_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->recv_text = b; + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_set_send_text(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->send_text = b; + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->recv_text, buf, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); +} + +static int +ws_listener_get_send_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_copyout_bool(l->send_text, buf, szp, t); + nni_mtx_unlock(&l->mtx); + return (rv); +} + +static int ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t) { nni_ws_listener *l = arg; @@ -1953,6 +2019,16 @@ static const nni_option ws_listener_options[] = { .o_get = ws_listener_get_url, }, { + .o_name = NNG_OPT_WS_RECV_TEXT, + .o_set = ws_listener_set_recv_text, + .o_get = ws_listener_get_recv_text, + }, + { + .o_name = NNG_OPT_WS_SEND_TEXT, + .o_set = ws_listener_set_send_text, + .o_get = ws_listener_get_send_text, + }, + { .o_name = NULL, }, }; @@ -2245,11 +2321,13 @@ ws_dialer_dial(void *arg, nni_aio *aio) ws_reap(ws); return; } - ws->dialer = d; - ws->useraio = aio; - ws->server = false; - ws->maxframe = d->maxframe; - ws->isstream = d->isstream; + ws->dialer = d; + ws->useraio = aio; + ws->server = false; + ws->maxframe = d->maxframe; + ws->isstream = d->isstream; + ws->recv_text = d->recv_text; + ws->send_text = d->send_text; nni_list_append(&d->wspend, ws); nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); @@ -2271,6 +2349,36 @@ ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) } static int +ws_dialer_set_recv_text(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->recv_text = b; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} + +static int +ws_dialer_set_send_text(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->send_text = b; + nni_mtx_unlock(&d->mtx); + } + return (rv); +} + +static int ws_dialer_set_size( nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t) { @@ -2388,6 +2496,28 @@ ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t) return (rv); } +static int +ws_dialer_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_bool(d->recv_text, buf, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); +} + +static int +ws_dialer_get_send_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_copyout_bool(d->send_text, buf, szp, t); + nni_mtx_unlock(&d->mtx); + return (rv); +} + static const nni_option ws_dialer_options[] = { { .o_name = NNI_OPT_WS_MSGMODE, @@ -2419,6 +2549,17 @@ static const nni_option ws_dialer_options[] = { .o_get = ws_dialer_get_proto, }, { + .o_name = NNG_OPT_WS_RECV_TEXT, + .o_set = ws_dialer_set_recv_text, + .o_get = ws_dialer_get_recv_text, + }, + { + .o_name = NNG_OPT_WS_SEND_TEXT, + .o_set = ws_dialer_set_send_text, + .o_get = ws_dialer_get_send_text, + }, + + { .o_name = NULL, }, }; @@ -2649,6 +2790,30 @@ ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t) return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t)); } +static int +ws_get_recv_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + bool b; + nni_mtx_lock(&ws->mtx); + b = ws->recv_text; + nni_mtx_unlock(&ws->mtx); + + return (nni_copyout_bool(b, buf, szp, t)); +} + +static int +ws_get_send_text(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + bool b; + nni_mtx_lock(&ws->mtx); + b = ws->send_text; + nni_mtx_unlock(&ws->mtx); + + return (nni_copyout_bool(b, buf, szp, t)); +} + static const nni_option ws_options[] = { { .o_name = NNG_OPT_WS_REQUEST_HEADERS, @@ -2663,6 +2828,14 @@ static const nni_option ws_options[] = { .o_get = ws_get_request_uri, }, { + .o_name = NNG_OPT_WS_RECV_TEXT, + .o_get = ws_get_recv_text, + }, + { + .o_name = NNG_OPT_WS_SEND_TEXT, + .o_get = ws_get_send_text, + }, + { .o_name = NULL, }, }; @@ -2753,6 +2926,12 @@ ws_check_size(const void *buf, size_t sz, nni_type t) return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t)); } +static int +ws_check_bool(const void *buf, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t)); +} + static const nni_chkoption ws_chkopts[] = { { .o_name = NNG_OPT_WS_SENDMAXFRAME, @@ -2779,6 +2958,14 @@ static const nni_chkoption ws_chkopts[] = { .o_check = ws_check_string, }, { + .o_name = NNG_OPT_WS_RECV_TEXT, + .o_check = ws_check_bool, + }, + { + .o_name = NNG_OPT_WS_SEND_TEXT, + .o_check = ws_check_bool, + }, + { .o_name = NULL, }, }; diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c index 9d99f2c2..9a298142 100644 --- a/src/supplemental/websocket/websocket_test.c +++ b/src/supplemental/websocket/websocket_test.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -175,24 +175,24 @@ test_websocket_conn_props(void) TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_LOCADDR, &sa1)); TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_REMADDR, &sa2)); TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x", - sa1.s_family, sa2.s_family); + sa1.s_family, sa2.s_family); TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr, - "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), - testutil_htonl(sa2.s_in.sa_addr)); + "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), + testutil_htonl(sa2.s_in.sa_addr)); TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port, - "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), - testutil_htons(sa2.s_in.sa_port)); + "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), + testutil_htons(sa2.s_in.sa_port)); TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_REMADDR, &sa1)); TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_LOCADDR, &sa2)); TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x", - sa1.s_family, sa2.s_family); + sa1.s_family, sa2.s_family); TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr, - "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), - testutil_htonl(sa2.s_in.sa_addr)); + "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), + testutil_htonl(sa2.s_in.sa_addr)); TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port, - "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), - testutil_htons(sa2.s_in.sa_port)); + "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), + testutil_htons(sa2.s_in.sa_port)); on = true; TEST_NNG_PASS(nng_stream_set_bool(c1, NNG_OPT_TCP_NODELAY, on)); @@ -234,6 +234,135 @@ test_websocket_conn_props(void) nng_stream_dialer_free(d); } +void +test_websocket_text_mode(void) +{ + nng_stream_dialer * d = NULL; + nng_stream_listener *l = NULL; + nng_aio * daio = NULL; + nng_aio * laio = NULL; + nng_aio * aio1 = NULL; + nng_aio * aio2 = NULL; + nng_stream * c1 = NULL; + nng_stream * c2 = NULL; + char uri[64]; + char txb[5]; + char rxb[5]; + bool on; + uint16_t port = testutil_next_port(); + nng_iov iov; + + (void) snprintf(uri, sizeof(uri), "ws://127.0.0.1:%d/test", port); + + TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + nng_aio_set_timeout(daio, 5000); // 5 seconds + nng_aio_set_timeout(laio, 5000); + nng_aio_set_timeout(aio1, 5000); + nng_aio_set_timeout(aio2, 5000); + + TEST_NNG_PASS(nng_stream_listener_alloc(&l, uri)); + TEST_NNG_PASS(nng_stream_dialer_alloc(&d, uri)); + + on = true; + TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_SEND_TEXT, on)); + TEST_NNG_PASS( + nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on)); + + TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_SEND_TEXT, &on)); + TEST_ASSERT(on == true); + TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on)); + TEST_ASSERT(on == false); + + + TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on)); + TEST_ASSERT(on == false); + TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_RECV_TEXT, &on)); + TEST_ASSERT(on == true); + + on = false; + TEST_NNG_PASS(nng_stream_dialer_set_bool(d, NNG_OPT_WS_RECV_TEXT, on)); + TEST_NNG_PASS( + nng_stream_listener_set_bool(l, NNG_OPT_WS_SEND_TEXT, on)); + TEST_NNG_PASS(nng_stream_listener_get_bool(l, NNG_OPT_WS_SEND_TEXT, &on)); + TEST_ASSERT(on == false); + TEST_NNG_PASS(nng_stream_dialer_get_bool(d, NNG_OPT_WS_RECV_TEXT, &on)); + TEST_ASSERT(on == false); + + TEST_NNG_PASS(nng_stream_listener_listen(l)); + nng_stream_dialer_dial(d, daio); + nng_stream_listener_accept(l, laio); + + nng_aio_wait(laio); + nng_aio_wait(daio); + + TEST_NNG_PASS(nng_aio_result(laio)); + TEST_NNG_PASS(nng_aio_result(daio)); + c1 = nng_aio_get_output(laio, 0); + c2 = nng_aio_get_output(daio, 0); + TEST_CHECK(c1 != NULL); + TEST_CHECK(c2 != NULL); + + TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_SEND_TEXT, &on)); + TEST_ASSERT(on == false); + TEST_NNG_PASS(nng_stream_get_bool(c1, NNG_OPT_WS_RECV_TEXT, &on)); + TEST_ASSERT(on == true); + TEST_NNG_PASS( + nng_stream_listener_set_bool(l, NNG_OPT_WS_RECV_TEXT, on)); + + TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_SEND_TEXT, &on)); + TEST_ASSERT(on == true); + TEST_NNG_PASS(nng_stream_get_bool(c2, NNG_OPT_WS_RECV_TEXT, &on)); + TEST_ASSERT(on == false); + + memcpy(txb, "PING", 5); + iov.iov_buf = txb; + iov.iov_len = 5; + nng_aio_set_iov(aio1, 1, &iov); + nng_stream_send(c1, aio1); + iov.iov_buf = rxb; + iov.iov_len = 5; + nng_aio_set_iov(aio2, 1, &iov); + nng_stream_recv(c2, aio2); + + nng_aio_wait(aio1); + nng_aio_wait(aio2); + TEST_NNG_PASS(nng_aio_result(aio1)); + TEST_NNG_PASS(nng_aio_result(aio2)); + TEST_ASSERT(memcmp(rxb, txb, 5) == 0); + + memset(rxb, 0, 5); + memcpy(txb, "PONG", 5); + + iov.iov_buf = txb; + iov.iov_len = 5; + nng_aio_set_iov(aio2, 1, &iov); + nng_stream_send(c2, aio2); + iov.iov_buf = rxb; + iov.iov_len = 5; + nng_aio_set_iov(aio1, 1, &iov); + nng_stream_recv(c1, aio1); + + nng_aio_wait(aio1); + nng_aio_wait(aio2); + TEST_NNG_PASS(nng_aio_result(aio1)); + TEST_NNG_PASS(nng_aio_result(aio2)); + TEST_ASSERT(memcmp(rxb, txb, 5) == 0); + + nng_stream_close(c1); + nng_stream_free(c1); + nng_stream_close(c2); + nng_stream_free(c2); + nng_aio_free(aio1); + nng_aio_free(aio2); + nng_aio_free(daio); + nng_aio_free(laio); + nng_stream_listener_free(l); + nng_stream_dialer_free(d); +} + typedef struct recv_state { nng_stream * c; int total; @@ -410,5 +539,6 @@ TEST_LIST = { { "websocket stream wildcard", test_websocket_wildcard }, { "websocket conn properties", test_websocket_conn_props }, { "websocket fragmentation", test_websocket_fragmentation }, + { "websocket text mode", test_websocket_text_mode }, { NULL, NULL }, }; |
