diff options
| author | Garrett D'Amore <garrett@damore.org> | 2023-12-17 19:03:29 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2023-12-17 19:09:30 -0800 |
| commit | 8ff9663c06a18d6c7fe0605de679948d3c4de9d7 (patch) | |
| tree | 3a5e1884816c7e693eadc83a0f45ddac167c8494 /src/supplemental/websocket/websocket.c | |
| parent | 8ccc10bf6a7ce305e5197e8eaf931ac7dc8612c0 (diff) | |
| download | nng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.tar.gz nng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.tar.bz2 nng-8ff9663c06a18d6c7fe0605de679948d3c4de9d7.zip | |
fixes #1735 websocket should send, and wait for, WS_CLOSE frames on shutdown
fixes #1733 deadlock in websocket listener close
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 185 |
1 files changed, 99 insertions, 86 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index d1c9c8d5..2ef152f9 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -28,7 +28,7 @@ typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); // We have chosen to be a bit more stringent in the size of the frames that // we send, while we more generously allow larger incoming frames. These // may be tuned by options. -#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) +#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) #define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv) #define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send) @@ -40,8 +40,8 @@ typedef struct ws_frame ws_frame; typedef struct ws_header { nni_list_node node; - char * name; - char * value; + char *name; + char *value; } ws_header; struct nni_ws { @@ -49,7 +49,8 @@ struct nni_ws { nni_list_node node; nni_reap_node reap; bool server; - bool closed; + bool closed; // received a close, or initiated a close + bool peer_closed; // we received a close frame bool ready; bool wclose; bool isstream; @@ -61,44 +62,44 @@ struct nni_ws { nni_list recvq; nni_list txq; nni_list rxq; - ws_frame * txframe; - ws_frame * rxframe; - nni_aio * txaio; // physical aios - nni_aio * rxaio; - nni_aio * closeaio; - nni_aio * httpaio; - nni_aio * connaio; // connect aio - nni_aio * useraio; // user aio, during HTTP negotiation - nni_http_conn * http; - nni_http_req * req; - nni_http_res * res; - char * reqhdrs; - char * reshdrs; + ws_frame *txframe; + ws_frame *rxframe; + nni_aio *txaio; // physical aios + nni_aio *rxaio; + nni_aio *closeaio; // used for lingering/draining close + nni_aio *httpaio; + nni_aio *connaio; // connect aio + nni_aio *useraio; // user aio, during HTTP negotiation + nni_http_conn *http; + nni_http_req *req; + nni_http_res *res; + char *reqhdrs; + char *reshdrs; size_t maxframe; size_t fragsize; size_t recvmax; // largest message size nni_ws_listener *listener; - nni_ws_dialer * dialer; + nni_ws_dialer *dialer; }; struct nni_ws_listener { nng_stream_listener ops; - nni_http_server * server; - char * proto; + nni_http_server *server; + char *proto; nni_mtx mtx; nni_cv cv; nni_list pend; nni_list reply; nni_list aios; - nng_url * url; + nng_url *url; bool started; bool closed; bool isstream; bool send_text; bool recv_text; - nni_http_handler * handler; + nni_http_handler *handler; nni_ws_listen_hook hookfn; - void * hookarg; + void *hookarg; nni_list headers; // response headers size_t maxframe; size_t fragsize; @@ -113,13 +114,13 @@ struct nni_ws_listener { // requests when we already have connects negotiating.) struct nni_ws_dialer { nng_stream_dialer ops; - nni_http_req * req; - nni_http_res * res; - nni_http_client * client; + nni_http_req *req; + nni_http_res *res; + nni_http_client *client; nni_mtx mtx; nni_cv cv; - char * proto; - nng_url * url; + char *proto; + nng_url *url; nni_list wspend; // ws structures still negotiating bool closed; bool isstream; @@ -163,9 +164,9 @@ struct ws_frame { bool final; bool masked; size_t asize; // allocated size - uint8_t * adata; - uint8_t * buf; - nng_aio * aio; + uint8_t *adata; + uint8_t *buf; + nng_aio *aio; }; static void ws_send_close(nni_ws *ws, uint16_t code); @@ -201,7 +202,7 @@ static int ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups) { ws_header *hdr; - char * nv; + char *nv; if ((nv = nni_strdup(v)) == NULL) { return (NNG_ENOMEM); @@ -240,11 +241,11 @@ ws_set_header(nni_list *l, const char *n, const char *v) static int ws_set_headers(nni_list *l, const char *str) { - char * dupstr; + char *dupstr; size_t duplen; - char * n; - char * v; - char * nl; + char *n; + char *v; + char *nl; int rv; if ((dupstr = nni_strdup(str)) == NULL) { @@ -518,7 +519,7 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) static void ws_close_cb(void *arg) { - nni_ws * ws = arg; + nni_ws *ws = arg; ws_frame *frame; nni_aio_close(ws->txaio); @@ -614,9 +615,9 @@ ws_cancel_close(nni_aio *aio, void *arg, int rv) static void ws_write_cb(void *arg) { - nni_ws * ws = arg; + nni_ws *ws = arg; ws_frame *frame; - nni_aio * aio; + nni_aio *aio; int rv; nni_mtx_lock(&ws->mtx); @@ -641,9 +642,11 @@ ws_write_cb(void *arg) ws_frame_fini(frame); } } - if (ws->wclose) { - ws->wclose = false; - nni_aio_finish(ws->closeaio, 0, 0); + if (ws->peer_closed) { + if (ws->wclose) { // could assert this? + ws->wclose = false; + nni_aio_finish(ws->closeaio, 0, 0); + } } nni_mtx_unlock(&ws->mtx); return; @@ -651,6 +654,8 @@ ws_write_cb(void *arg) aio = frame->aio; if ((rv = nni_aio_result(ws->txaio)) != 0) { + // if tx fails, we can't send a close frame either + // we expect the caller to just close this connection frame->aio = NULL; if (aio != NULL) { nni_aio_list_remove(aio); @@ -704,7 +709,7 @@ ws_write_cb(void *arg) static void ws_write_cancel(nni_aio *aio, void *arg, int rv) { - nni_ws * ws = arg; + nni_ws *ws = arg; ws_frame *frame; // Is this aio active? We can tell by looking at the active tx frame. @@ -735,7 +740,7 @@ ws_send_close(nni_ws *ws, uint16_t code) ws_frame *frame; uint8_t buf[sizeof(uint16_t)]; int rv; - nni_aio * aio; + nni_aio *aio; NNI_PUT16(buf, code); @@ -788,7 +793,7 @@ static void ws_start_read(nni_ws *ws) { ws_frame *frame; - nni_aio * aio; + nni_aio *aio; nni_iov iov; if ((ws->rxframe != NULL) || ws->closed) { @@ -827,8 +832,8 @@ static void ws_read_finish_str(nni_ws *ws) { for (;;) { - nni_aio * aio; - nni_iov * iov; + nni_aio *aio; + nni_iov *iov; unsigned niov; ws_frame *frame; @@ -887,12 +892,12 @@ ws_read_finish_str(nni_ws *ws) static void ws_read_finish_msg(nni_ws *ws) { - nni_aio * aio; + nni_aio *aio; size_t len; ws_frame *frame; - nni_msg * msg; + nni_msg *msg; int rv; - uint8_t * body; + uint8_t *body; // If we have no data, no waiter, or have not received the complete // message yet, then there is nothing to do. @@ -990,8 +995,15 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) ws_frame_fini(frame); break; case WS_CLOSE: - ws->closed = true; // no need to send close reply - ws_close(ws, 0); + // if we did not send a close frame yet, do so. + // (this might be a response to our close) + ws->peer_closed = true; + if (!ws->closed) { + ws_close(ws, WS_CLOSE_NORMAL_CLOSE); + } else { + ws->wclose = false; + nni_aio_finish(ws->closeaio, 0, 0); + } return; default: ws_close(ws, WS_CLOSE_PROTOCOL_ERR); @@ -1004,10 +1016,9 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) static void ws_read_cb(void *arg) { - nni_ws * ws = arg; - nni_aio * aio = ws->rxaio; + nni_ws *ws = arg; + nni_aio *aio = ws->rxaio; ws_frame *frame; - int rv; nni_mtx_lock(&ws->mtx); if ((frame = ws->rxframe) == NULL) { @@ -1015,8 +1026,10 @@ ws_read_cb(void *arg) return; } - if ((rv = nni_aio_result(aio)) != 0) { - ws->closed = true; // do not send a close frame + if (nni_aio_result(aio) != 0) { + // on a read error, we assume the connection was + // abruptly closed, and we don't try to shut down nicely + ws->closed = true; ws_close(ws, 0); nni_mtx_unlock(&ws->mtx); return; @@ -1050,7 +1063,7 @@ ws_read_cb(void *arg) } // If we are returning from a read of additional data, then - // the buf will be set. Otherwise we need to determine + // the buf will be set. Otherwise, we need to determine // how much data to read. As our headers are complete, we take // this time to do some protocol checks -- no point in waiting // to read data. (Frame size check needs to be done first @@ -1100,7 +1113,7 @@ ws_read_cb(void *arg) } } - // Check for masking. (We don't actually do the unmask + // Check for masking. (We don't actually unmask // here, because we don't have data yet.) if (frame->masked) { memcpy(frame->mask, frame->head + frame->hlen - 4, 4); @@ -1176,9 +1189,9 @@ ws_close_error(nni_ws *ws, uint16_t code) static void ws_fini(void *arg) { - nni_ws * ws = arg; + nni_ws *ws = arg; ws_frame *frame; - nng_aio * aio; + nng_aio *aio; ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); @@ -1292,11 +1305,11 @@ static void ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) { nni_ws_dialer *d; - nni_aio * uaio; + nni_aio *uaio; int rv; uint16_t status; char wskey[29]; - const char * ptr; + const char *ptr; d = ws->dialer; nni_mtx_lock(&d->mtx); @@ -1402,7 +1415,7 @@ err: static void ws_http_cb(void *arg) { - nni_ws * ws = arg; + nni_ws *ws = arg; nni_aio *aio = ws->httpaio; if (ws->server) { @@ -1455,7 +1468,7 @@ static void ws_listener_free(void *arg) { nni_ws_listener *l = arg; - ws_header * hdr; + ws_header *hdr; ws_listener_close(l); @@ -1492,18 +1505,18 @@ ws_listener_free(void *arg) static void ws_handler(nni_aio *aio) { - nni_ws_listener * l; - nni_ws * ws; - nni_http_conn * conn; - nni_http_req * req; - nni_http_res * res; + nni_ws_listener *l; + nni_ws *ws; + nni_http_conn *conn; + nni_http_req *req; + nni_http_res *res; nni_http_handler *h; - const char * ptr; - const char * proto; + const char *ptr; + const char *proto; uint16_t status; int rv; char key[29]; - ws_header * hdr; + ws_header *hdr; req = nni_aio_get_input(aio, 0); h = nni_aio_get_input(aio, 1); @@ -1695,7 +1708,7 @@ static void ws_listener_accept(void *arg, nni_aio *aio) { nni_ws_listener *l = arg; - nni_ws * ws; + nni_ws *ws; int rv; if (nni_aio_begin(aio) != 0) { @@ -1732,7 +1745,7 @@ static void ws_listener_close(void *arg) { nni_ws_listener *l = arg; - nni_ws * ws; + nni_ws *ws; nni_mtx_lock(&l->mtx); if (l->closed) { nni_mtx_unlock(&l->mtx); @@ -1741,7 +1754,7 @@ ws_listener_close(void *arg) l->closed = true; if (l->started) { nni_http_server_del_handler(l->server, l->handler); - nni_http_server_stop(l->server); + nni_http_server_close(l->server); l->started = false; } NNI_LIST_FOREACH (&l->pend, ws) { @@ -2100,7 +2113,7 @@ nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url) { nni_ws_listener *l; int rv; - char * host; + char *host; if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { return (NNG_ENOMEM); @@ -2153,14 +2166,14 @@ void ws_conn_cb(void *arg) { nni_ws_dialer *d; - nni_ws * ws; - nni_aio * uaio; + nni_ws *ws; + nni_aio *uaio; nni_http_conn *http; - nni_http_req * req = NULL; + nni_http_req *req = NULL; int rv; uint8_t raw[16]; char wskey[25]; - ws_header * hdr; + ws_header *hdr; ws = arg; @@ -2249,7 +2262,7 @@ static void ws_dialer_free(void *arg) { nni_ws_dialer *d = arg; - ws_header * hdr; + ws_header *hdr; nni_mtx_lock(&d->mtx); while (!nni_list_empty(&d->wspend)) { @@ -2279,7 +2292,7 @@ static void ws_dialer_close(void *arg) { nni_ws_dialer *d = arg; - nni_ws * ws; + nni_ws *ws; nni_mtx_lock(&d->mtx); if (d->closed) { nni_mtx_unlock(&d->mtx); @@ -2312,7 +2325,7 @@ static void ws_dialer_dial(void *arg, nni_aio *aio) { nni_ws_dialer *d = arg; - nni_ws * ws; + nni_ws *ws; int rv; if (nni_aio_begin(aio) != 0) { @@ -2684,7 +2697,7 @@ ws_str_close(void *arg) static void ws_str_send(void *arg, nni_aio *aio) { - nni_ws * ws = arg; + nni_ws *ws = arg; int rv; ws_frame *frame; |
