aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket/websocket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2023-12-17 19:03:29 -0800
committerGarrett D'Amore <garrett@damore.org>2023-12-17 19:09:30 -0800
commit8ff9663c06a18d6c7fe0605de679948d3c4de9d7 (patch)
tree3a5e1884816c7e693eadc83a0f45ddac167c8494 /src/supplemental/websocket/websocket.c
parent8ccc10bf6a7ce305e5197e8eaf931ac7dc8612c0 (diff)
downloadnng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.tar.gz
nng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.tar.bz2
nng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.zip
fixes #1735 websocket should send, and wait for, WS_CLOSE frames on shutdown
fixes #1733 deadlock in websocket listener close
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
-rw-r--r--src/supplemental/websocket/websocket.c185
1 files changed, 99 insertions, 86 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index d1c9c8d5..2ef152f9 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -28,7 +28,7 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
// We have chosen to be a bit more stringent in the size of the frames that
// we send, while we more generously allow larger incoming frames. These
// may be tuned by options.
-#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
+#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv)
#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send)
@@ -40,8 +40,8 @@ typedef struct ws_frame ws_frame;
typedef struct ws_header {
nni_list_node node;
- char * name;
- char * value;
+ char *name;
+ char *value;
} ws_header;
struct nni_ws {
@@ -49,7 +49,8 @@ struct nni_ws {
nni_list_node node;
nni_reap_node reap;
bool server;
- bool closed;
+ bool closed; // received a close, or initiated a close
+ bool peer_closed; // we received a close frame
bool ready;
bool wclose;
bool isstream;
@@ -61,44 +62,44 @@ struct nni_ws {
nni_list recvq;
nni_list txq;
nni_list rxq;
- ws_frame * txframe;
- ws_frame * rxframe;
- nni_aio * txaio; // physical aios
- nni_aio * rxaio;
- nni_aio * closeaio;
- nni_aio * httpaio;
- nni_aio * connaio; // connect aio
- nni_aio * useraio; // user aio, during HTTP negotiation
- nni_http_conn * http;
- nni_http_req * req;
- nni_http_res * res;
- char * reqhdrs;
- char * reshdrs;
+ ws_frame *txframe;
+ ws_frame *rxframe;
+ nni_aio *txaio; // physical aios
+ nni_aio *rxaio;
+ nni_aio *closeaio; // used for lingering/draining close
+ nni_aio *httpaio;
+ nni_aio *connaio; // connect aio
+ nni_aio *useraio; // user aio, during HTTP negotiation
+ nni_http_conn *http;
+ nni_http_req *req;
+ nni_http_res *res;
+ char *reqhdrs;
+ char *reshdrs;
size_t maxframe;
size_t fragsize;
size_t recvmax; // largest message size
nni_ws_listener *listener;
- nni_ws_dialer * dialer;
+ nni_ws_dialer *dialer;
};
struct nni_ws_listener {
nng_stream_listener ops;
- nni_http_server * server;
- char * proto;
+ nni_http_server *server;
+ char *proto;
nni_mtx mtx;
nni_cv cv;
nni_list pend;
nni_list reply;
nni_list aios;
- nng_url * url;
+ nng_url *url;
bool started;
bool closed;
bool isstream;
bool send_text;
bool recv_text;
- nni_http_handler * handler;
+ nni_http_handler *handler;
nni_ws_listen_hook hookfn;
- void * hookarg;
+ void *hookarg;
nni_list headers; // response headers
size_t maxframe;
size_t fragsize;
@@ -113,13 +114,13 @@ struct nni_ws_listener {
// requests when we already have connects negotiating.)
struct nni_ws_dialer {
nng_stream_dialer ops;
- nni_http_req * req;
- nni_http_res * res;
- nni_http_client * client;
+ nni_http_req *req;
+ nni_http_res *res;
+ nni_http_client *client;
nni_mtx mtx;
nni_cv cv;
- char * proto;
- nng_url * url;
+ char *proto;
+ nng_url *url;
nni_list wspend; // ws structures still negotiating
bool closed;
bool isstream;
@@ -163,9 +164,9 @@ struct ws_frame {
bool final;
bool masked;
size_t asize; // allocated size
- uint8_t * adata;
- uint8_t * buf;
- nng_aio * aio;
+ uint8_t *adata;
+ uint8_t *buf;
+ nng_aio *aio;
};
static void ws_send_close(nni_ws *ws, uint16_t code);
@@ -201,7 +202,7 @@ static int
ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups)
{
ws_header *hdr;
- char * nv;
+ char *nv;
if ((nv = nni_strdup(v)) == NULL) {
return (NNG_ENOMEM);
@@ -240,11 +241,11 @@ ws_set_header(nni_list *l, const char *n, const char *v)
static int
ws_set_headers(nni_list *l, const char *str)
{
- char * dupstr;
+ char *dupstr;
size_t duplen;
- char * n;
- char * v;
- char * nl;
+ char *n;
+ char *v;
+ char *nl;
int rv;
if ((dupstr = nni_strdup(str)) == NULL) {
@@ -518,7 +519,7 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
static void
ws_close_cb(void *arg)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
ws_frame *frame;
nni_aio_close(ws->txaio);
@@ -614,9 +615,9 @@ ws_cancel_close(nni_aio *aio, void *arg, int rv)
static void
ws_write_cb(void *arg)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
ws_frame *frame;
- nni_aio * aio;
+ nni_aio *aio;
int rv;
nni_mtx_lock(&ws->mtx);
@@ -641,9 +642,11 @@ ws_write_cb(void *arg)
ws_frame_fini(frame);
}
}
- if (ws->wclose) {
- ws->wclose = false;
- nni_aio_finish(ws->closeaio, 0, 0);
+ if (ws->peer_closed) {
+ if (ws->wclose) { // could assert this?
+ ws->wclose = false;
+ nni_aio_finish(ws->closeaio, 0, 0);
+ }
}
nni_mtx_unlock(&ws->mtx);
return;
@@ -651,6 +654,8 @@ ws_write_cb(void *arg)
aio = frame->aio;
if ((rv = nni_aio_result(ws->txaio)) != 0) {
+ // if tx fails, we can't send a close frame either
+ // we expect the caller to just close this connection
frame->aio = NULL;
if (aio != NULL) {
nni_aio_list_remove(aio);
@@ -704,7 +709,7 @@ ws_write_cb(void *arg)
static void
ws_write_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
ws_frame *frame;
// Is this aio active? We can tell by looking at the active tx frame.
@@ -735,7 +740,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws_frame *frame;
uint8_t buf[sizeof(uint16_t)];
int rv;
- nni_aio * aio;
+ nni_aio *aio;
NNI_PUT16(buf, code);
@@ -788,7 +793,7 @@ static void
ws_start_read(nni_ws *ws)
{
ws_frame *frame;
- nni_aio * aio;
+ nni_aio *aio;
nni_iov iov;
if ((ws->rxframe != NULL) || ws->closed) {
@@ -827,8 +832,8 @@ static void
ws_read_finish_str(nni_ws *ws)
{
for (;;) {
- nni_aio * aio;
- nni_iov * iov;
+ nni_aio *aio;
+ nni_iov *iov;
unsigned niov;
ws_frame *frame;
@@ -887,12 +892,12 @@ ws_read_finish_str(nni_ws *ws)
static void
ws_read_finish_msg(nni_ws *ws)
{
- nni_aio * aio;
+ nni_aio *aio;
size_t len;
ws_frame *frame;
- nni_msg * msg;
+ nni_msg *msg;
int rv;
- uint8_t * body;
+ uint8_t *body;
// If we have no data, no waiter, or have not received the complete
// message yet, then there is nothing to do.
@@ -990,8 +995,15 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws_frame_fini(frame);
break;
case WS_CLOSE:
- ws->closed = true; // no need to send close reply
- ws_close(ws, 0);
+ // if we did not send a close frame yet, do so.
+ // (this might be a response to our close)
+ ws->peer_closed = true;
+ if (!ws->closed) {
+ ws_close(ws, WS_CLOSE_NORMAL_CLOSE);
+ } else {
+ ws->wclose = false;
+ nni_aio_finish(ws->closeaio, 0, 0);
+ }
return;
default:
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
@@ -1004,10 +1016,9 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
static void
ws_read_cb(void *arg)
{
- nni_ws * ws = arg;
- nni_aio * aio = ws->rxaio;
+ nni_ws *ws = arg;
+ nni_aio *aio = ws->rxaio;
ws_frame *frame;
- int rv;
nni_mtx_lock(&ws->mtx);
if ((frame = ws->rxframe) == NULL) {
@@ -1015,8 +1026,10 @@ ws_read_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(aio)) != 0) {
- ws->closed = true; // do not send a close frame
+ if (nni_aio_result(aio) != 0) {
+ // on a read error, we assume the connection was
+ // abruptly closed, and we don't try to shut down nicely
+ ws->closed = true;
ws_close(ws, 0);
nni_mtx_unlock(&ws->mtx);
return;
@@ -1050,7 +1063,7 @@ ws_read_cb(void *arg)
}
// If we are returning from a read of additional data, then
- // the buf will be set. Otherwise we need to determine
+ // the buf will be set. Otherwise, we need to determine
// how much data to read. As our headers are complete, we take
// this time to do some protocol checks -- no point in waiting
// to read data. (Frame size check needs to be done first
@@ -1100,7 +1113,7 @@ ws_read_cb(void *arg)
}
}
- // Check for masking. (We don't actually do the unmask
+ // Check for masking. (We don't actually unmask
// here, because we don't have data yet.)
if (frame->masked) {
memcpy(frame->mask, frame->head + frame->hlen - 4, 4);
@@ -1176,9 +1189,9 @@ ws_close_error(nni_ws *ws, uint16_t code)
static void
ws_fini(void *arg)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
ws_frame *frame;
- nng_aio * aio;
+ nng_aio *aio;
ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
@@ -1292,11 +1305,11 @@ static void
ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
{
nni_ws_dialer *d;
- nni_aio * uaio;
+ nni_aio *uaio;
int rv;
uint16_t status;
char wskey[29];
- const char * ptr;
+ const char *ptr;
d = ws->dialer;
nni_mtx_lock(&d->mtx);
@@ -1402,7 +1415,7 @@ err:
static void
ws_http_cb(void *arg)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
nni_aio *aio = ws->httpaio;
if (ws->server) {
@@ -1455,7 +1468,7 @@ static void
ws_listener_free(void *arg)
{
nni_ws_listener *l = arg;
- ws_header * hdr;
+ ws_header *hdr;
ws_listener_close(l);
@@ -1492,18 +1505,18 @@ ws_listener_free(void *arg)
static void
ws_handler(nni_aio *aio)
{
- nni_ws_listener * l;
- nni_ws * ws;
- nni_http_conn * conn;
- nni_http_req * req;
- nni_http_res * res;
+ nni_ws_listener *l;
+ nni_ws *ws;
+ nni_http_conn *conn;
+ nni_http_req *req;
+ nni_http_res *res;
nni_http_handler *h;
- const char * ptr;
- const char * proto;
+ const char *ptr;
+ const char *proto;
uint16_t status;
int rv;
char key[29];
- ws_header * hdr;
+ ws_header *hdr;
req = nni_aio_get_input(aio, 0);
h = nni_aio_get_input(aio, 1);
@@ -1695,7 +1708,7 @@ static void
ws_listener_accept(void *arg, nni_aio *aio)
{
nni_ws_listener *l = arg;
- nni_ws * ws;
+ nni_ws *ws;
int rv;
if (nni_aio_begin(aio) != 0) {
@@ -1732,7 +1745,7 @@ static void
ws_listener_close(void *arg)
{
nni_ws_listener *l = arg;
- nni_ws * ws;
+ nni_ws *ws;
nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_mtx_unlock(&l->mtx);
@@ -1741,7 +1754,7 @@ ws_listener_close(void *arg)
l->closed = true;
if (l->started) {
nni_http_server_del_handler(l->server, l->handler);
- nni_http_server_stop(l->server);
+ nni_http_server_close(l->server);
l->started = false;
}
NNI_LIST_FOREACH (&l->pend, ws) {
@@ -2100,7 +2113,7 @@ nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url)
{
nni_ws_listener *l;
int rv;
- char * host;
+ char *host;
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
@@ -2153,14 +2166,14 @@ void
ws_conn_cb(void *arg)
{
nni_ws_dialer *d;
- nni_ws * ws;
- nni_aio * uaio;
+ nni_ws *ws;
+ nni_aio *uaio;
nni_http_conn *http;
- nni_http_req * req = NULL;
+ nni_http_req *req = NULL;
int rv;
uint8_t raw[16];
char wskey[25];
- ws_header * hdr;
+ ws_header *hdr;
ws = arg;
@@ -2249,7 +2262,7 @@ static void
ws_dialer_free(void *arg)
{
nni_ws_dialer *d = arg;
- ws_header * hdr;
+ ws_header *hdr;
nni_mtx_lock(&d->mtx);
while (!nni_list_empty(&d->wspend)) {
@@ -2279,7 +2292,7 @@ static void
ws_dialer_close(void *arg)
{
nni_ws_dialer *d = arg;
- nni_ws * ws;
+ nni_ws *ws;
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
@@ -2312,7 +2325,7 @@ static void
ws_dialer_dial(void *arg, nni_aio *aio)
{
nni_ws_dialer *d = arg;
- nni_ws * ws;
+ nni_ws *ws;
int rv;
if (nni_aio_begin(aio) != 0) {
@@ -2684,7 +2697,7 @@ ws_str_close(void *arg)
static void
ws_str_send(void *arg, nni_aio *aio)
{
- nni_ws * ws = arg;
+ nni_ws *ws = arg;
int rv;
ws_frame *frame;