diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-03-30 14:04:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-03-30 14:04:08 -0700 |
| commit | 8b6ba86a60276fb79d6769c94f0674511157a9df (patch) | |
| tree | 2526d02f43e68c30fbb46ff5e64f94a7d3dfefe4 | |
| parent | ddb8267c4d85b8d2290fc7fd062c85502fb98978 (diff) | |
| download | nng-8b6ba86a60276fb79d6769c94f0674511157a9df.tar.gz nng-8b6ba86a60276fb79d6769c94f0674511157a9df.tar.bz2 nng-8b6ba86a60276fb79d6769c94f0674511157a9df.zip | |
fixes #315 WebSocket message handling errors
This also gives a performance benefit to WebSocket, by making
the completion logic run synchronously.
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 211 |
1 files changed, 129 insertions, 82 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index d66d4fda..6932ce70 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -39,6 +39,8 @@ struct nni_ws { nni_mtx mtx; nni_list txmsgs; nni_list rxmsgs; + nni_list sendq; + nni_list recvq; ws_frame * txframe; ws_frame * rxframe; nni_aio * txaio; // physical aios @@ -134,8 +136,9 @@ struct ws_msg { nni_list frames; nni_list_node node; nni_ws * ws; - nni_msg * msg; nni_aio * aio; + uint8_t * buf; + size_t bufsz; }; static void ws_send_close(nni_ws *ws, uint16_t code); @@ -213,10 +216,10 @@ ws_msg_fini(ws_msg *wm) nni_list_remove(&wm->frames, frame); ws_frame_fini(frame); } - - if (wm->msg != NULL) { - nni_msg_free(wm->msg); + if (wm->bufsz != 0) { + nni_free(wm->buf, wm->bufsz); } + NNI_FREE_STRUCT(wm); } @@ -268,6 +271,8 @@ ws_msg_init_control( if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { return (NNG_ENOMEM); } + wm->buf = NULL; + wm->bufsz = 0; if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { ws_msg_fini(wm); @@ -309,27 +314,23 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) uint8_t *buf; uint8_t op; - // If the message has a header, move it to front of body. Most of - // the time this will not cause a reallocation (there should be - // headroom). Doing this simplifies our framing, and avoids sending - // tiny frames for headers. - if ((len = nni_msg_header_len(msg)) != 0) { - int rv; - buf = nni_msg_header(msg); - if ((rv = nni_msg_insert(msg, buf, len)) != 0) { - return (rv); - } - nni_msg_header_clear(msg); - } - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { return (NNG_ENOMEM); } NNI_LIST_INIT(&wm->frames, ws_frame, node); - len = nni_msg_len(msg); - buf = nni_msg_body(msg); - op = WS_BINARY; // to start -- no support for sending TEXT frames + len = nni_msg_len(msg) + nni_msg_header_len(msg); + wm->bufsz = len; + if ((wm->buf = nni_alloc(len)) == NULL) { + NNI_FREE_STRUCT(wm); + return (NNG_ENOMEM); + } + buf = wm->buf; + memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg)); + memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg), + nni_msg_len(msg)); + + op = WS_BINARY; // to start -- no support for sending TEXT frames // do ... while because we want at least one frame (even for empty // messages.) Headers get their own frame, if present. Best bet @@ -379,7 +380,6 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) } while (len); - wm->msg = msg; wm->aio = aio; wm->ws = ws; *wmp = wm; @@ -428,6 +428,7 @@ ws_close_cb(void *arg) while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { nni_list_remove(&ws->txmsgs, wm); if (wm->aio) { + nni_aio_list_remove(wm->aio); nni_aio_finish_error(wm->aio, NNG_ECLOSED); } ws_msg_fini(wm); @@ -546,9 +547,9 @@ ws_write_cb(void *arg) // No other messages may succeed.. while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { nni_list_remove(&ws->txmsgs, wm); - if (wm->aio != NULL) { - nni_aio_set_msg(wm->aio, NULL); - nni_aio_finish_error(wm->aio, NNG_ECLOSED); + if ((aio = wm->aio) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } ws_msg_fini(wm); } @@ -567,6 +568,7 @@ ws_write_cb(void *arg) ws_msg_fini(wm); if (aio != NULL) { + nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } @@ -579,18 +581,33 @@ ws_write_cb(void *arg) // good frame, was it the last nni_list_remove(&wm->frames, frame); ws_frame_fini(frame); - if (nni_list_empty(&wm->frames)) { - nni_list_remove(&ws->txmsgs, wm); - ws_msg_fini(wm); - if (aio != NULL) { - nni_aio_finish(aio, 0, 0); - } + + // if we still have more frames to transmit, then schedule it. + if (!nni_list_empty(&wm->frames)) { + ws_start_write(ws); + nni_mtx_unlock(&ws->mtx); + return; } + if (aio != NULL) { + nni_aio_list_remove(aio); + } + nni_list_remove(&ws->txmsgs, wm); + // Write the next frame. ws_start_write(ws); - nni_mtx_unlock(&ws->mtx); + + // discard while not holding lock (just deallocations) + ws_msg_fini(wm); + + if (aio != NULL) { + nng_msg *msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + nni_aio_set_synch(aio); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } } static void @@ -602,9 +619,14 @@ ws_write_cancel(nni_aio *aio, int rv) // Is this aio active? We can tell by looking at the // active tx frame. - wm = nni_aio_get_prov_data(aio); - ws = wm->ws; + ws = nni_aio_get_prov_data(aio); + nni_mtx_lock(&ws->mtx); + wm = nni_aio_get_prov_extra(aio, 0); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&ws->mtx); + return; + } if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) { nni_aio_abort(ws->txaio, rv); // We will wait for callback on the txaio to finish aio. @@ -709,21 +731,20 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio) } nni_mtx_lock(&ws->mtx); - nni_aio_set_msg(aio, NULL); - if (ws->closed) { - ws_msg_fini(wm); - if (nni_aio_start(aio, NULL, NULL) == 0) { - nni_aio_finish_error(aio, NNG_ECLOSED); - } + if (nni_aio_start(aio, ws_write_cancel, ws) != 0) { nni_mtx_unlock(&ws->mtx); + ws_msg_fini(wm); return; } - if (nni_aio_start(aio, ws_write_cancel, wm) != 0) { + if (ws->closed) { nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); ws_msg_fini(wm); return; } + nni_aio_set_prov_extra(aio, 0, wm); + nni_list_append(&ws->sendq, aio); nni_list_append(&ws->txmsgs, wm); ws_start_write(ws); nni_mtx_unlock(&ws->mtx); @@ -768,7 +789,7 @@ ws_start_read(nni_ws *ws) } static void -ws_read_frame_cb(nni_ws *ws, ws_frame *frame) +ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp) { ws_msg *wm = nni_list_first(&ws->rxmsgs); @@ -832,36 +853,19 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) // we have to look at the msg, since we might have got a // control frame. if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) { - size_t len = 0; - nni_msg *msg; - uint8_t *body; - int rv; - nni_list_remove(&ws->rxmsgs, wm); - NNI_LIST_FOREACH (&wm->frames, frame) { - len += frame->len; - } - if ((rv = nni_msg_alloc(&msg, len)) != 0) { - nni_aio_finish_error(wm->aio, rv); - ws_msg_fini(wm); - ws_close(ws, WS_CLOSE_INTERNAL); - return; - } - body = nni_msg_body(msg); - NNI_LIST_FOREACH (&wm->frames, frame) { - memcpy(body, frame->buf, frame->len); - body += frame->len; - } - nni_aio_finish_msg(wm->aio, msg); - wm->aio = NULL; - ws_msg_fini(wm); + nni_aio_list_remove(wm->aio); + *wmp = wm; + } else { + *wmp = NULL; } } static void ws_read_cb(void *arg) { - nni_ws * ws = arg; + nni_ws * ws = arg; + ws_msg * wm; nni_aio * aio = ws->rxaio; ws_frame *frame; int rv; @@ -988,24 +992,61 @@ ws_read_cb(void *arg) // At this point, we have a complete frame. ws_unmask_frame(frame); // idempotent - ws_read_frame_cb(ws, frame); + wm = NULL; + ws_read_frame_cb(ws, frame, &wm); ws_start_read(ws); nni_mtx_unlock(&ws->mtx); + + // Got a good message, so we have to do the work to send it up. + if (wm != NULL) { + size_t len = 0; + nni_msg *msg; + nni_aio *aio; + uint8_t *body; + int rv; + + aio = wm->aio; + + NNI_LIST_FOREACH (&wm->frames, frame) { + len += frame->len; + } + if ((rv = nni_msg_alloc(&msg, len)) != 0) { + nni_aio_finish_error(aio, rv); + ws_msg_fini(wm); + nni_ws_close_error(ws, WS_CLOSE_INTERNAL); + return; + } + body = nni_msg_body(msg); + NNI_LIST_FOREACH (&wm->frames, frame) { + memcpy(body, frame->buf, frame->len); + body += frame->len; + } + nni_aio_set_msg(wm->aio, msg); + nni_aio_set_synch(aio); + nni_aio_finish(wm->aio, 0, nni_msg_len(msg)); + ws_msg_fini(wm); + } } static void ws_read_cancel(nni_aio *aio, int rv) { - ws_msg *wm = nni_aio_get_prov_data(aio); - nni_ws *ws = wm->ws; + nni_ws *ws = nni_aio_get_prov_data(aio); + ws_msg *wm; nni_mtx_lock(&ws->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&ws->mtx); + return; + } + wm = nni_aio_get_prov_extra(aio, 0); if (wm == nni_list_first(&ws->rxmsgs)) { // Cancellation will percolate back up. nni_aio_abort(ws->rxaio, rv); } else if (nni_list_active(&ws->rxmsgs, wm)) { nni_list_remove(&ws->rxmsgs, wm); ws_msg_fini(wm); + nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ws->mtx); @@ -1016,15 +1057,17 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) { ws_msg *wm; int rv; - nni_mtx_lock(&ws->mtx); + if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) { if (nni_aio_start(aio, NULL, NULL)) { nni_aio_finish_error(aio, rv); } - nni_mtx_unlock(&ws->mtx); return; } - if (nni_aio_start(aio, ws_read_cancel, wm) == 0) { + nni_mtx_lock(&ws->mtx); + if (nni_aio_start(aio, ws_read_cancel, ws) == 0) { + nni_aio_set_prov_extra(aio, 0, wm); + nni_list_append(&ws->recvq, aio); nni_list_append(&ws->rxmsgs, wm); ws_start_read(ws); } @@ -1207,8 +1250,9 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) goto err; } - // If we have no response structure, then this was completion of the - // send of the request. Prepare an empty response, and read it. + // If we have no response structure, then this was completion + // of the send of the request. Prepare an empty response, and + // read it. if (ws->res == NULL) { if ((rv = nni_http_res_alloc(&ws->res)) != 0) { goto err; @@ -1310,6 +1354,8 @@ ws_init(nni_ws **wsp) nni_mtx_init(&ws->mtx); NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node); NNI_LIST_INIT(&ws->txmsgs, ws_msg, node); + nni_aio_list_init(&ws->sendq); + nni_aio_list_init(&ws->recvq); if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) || ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) || @@ -1425,9 +1471,9 @@ ws_handler(nni_aio *aio) } // If the client has requested a specific subprotocol, then - // we need to try to match it to what the handler says we support. - // (If no suitable option is found in the handler, we fail the - // request.) + // we need to try to match it to what the handler says we + // support. (If no suitable option is found in the handler, we + // fail the request.) proto = GETH("Sec-WebSocket-Protocol"); if (proto == NULL) { if (l->proto != NULL) { @@ -1475,13 +1521,14 @@ ws_handler(nni_aio *aio) if (nni_http_res_get_status(res) != NNG_HTTP_STATUS_SWITCHING) { - // The hook has decided to give back a different - // reply and we are not upgrading anymore. For - // example the Origin might not be permitted, or - // another level of authentication may be required. - // (Note that the hook can also give back various - // other headers, but it would be bad for it to - // alter the websocket mandated headers.) + // The hook has decided to give back a + // different reply and we are not upgrading + // anymore. For example the Origin might not + // be permitted, or another level of + // authentication may be required. (Note that + // the hook can also give back various other + // headers, but it would be bad for it to alter + // the websocket mandated headers.) nni_http_req_free(req); nni_aio_set_output(aio, 0, res); nni_aio_finish(aio, 0, 0); |
