summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-03-30 14:04:08 -0700
committerGarrett D'Amore <garrett@damore.org>2018-03-30 14:04:08 -0700
commit8b6ba86a60276fb79d6769c94f0674511157a9df (patch)
tree2526d02f43e68c30fbb46ff5e64f94a7d3dfefe4
parentddb8267c4d85b8d2290fc7fd062c85502fb98978 (diff)
downloadnng-8b6ba86a60276fb79d6769c94f0674511157a9df.tar.gz
nng-8b6ba86a60276fb79d6769c94f0674511157a9df.tar.bz2
nng-8b6ba86a60276fb79d6769c94f0674511157a9df.zip
fixes #315 WebSocket message handling errors
This also gives a performance benefit to WebSocket, by making the completion logic run synchronously.
-rw-r--r--src/supplemental/websocket/websocket.c211
1 files changed, 129 insertions, 82 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index d66d4fda..6932ce70 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -39,6 +39,8 @@ struct nni_ws {
nni_mtx mtx;
nni_list txmsgs;
nni_list rxmsgs;
+ nni_list sendq;
+ nni_list recvq;
ws_frame * txframe;
ws_frame * rxframe;
nni_aio * txaio; // physical aios
@@ -134,8 +136,9 @@ struct ws_msg {
nni_list frames;
nni_list_node node;
nni_ws * ws;
- nni_msg * msg;
nni_aio * aio;
+ uint8_t * buf;
+ size_t bufsz;
};
static void ws_send_close(nni_ws *ws, uint16_t code);
@@ -213,10 +216,10 @@ ws_msg_fini(ws_msg *wm)
nni_list_remove(&wm->frames, frame);
ws_frame_fini(frame);
}
-
- if (wm->msg != NULL) {
- nni_msg_free(wm->msg);
+ if (wm->bufsz != 0) {
+ nni_free(wm->buf, wm->bufsz);
}
+
NNI_FREE_STRUCT(wm);
}
@@ -268,6 +271,8 @@ ws_msg_init_control(
if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
return (NNG_ENOMEM);
}
+ wm->buf = NULL;
+ wm->bufsz = 0;
if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
ws_msg_fini(wm);
@@ -309,27 +314,23 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
uint8_t *buf;
uint8_t op;
- // If the message has a header, move it to front of body. Most of
- // the time this will not cause a reallocation (there should be
- // headroom). Doing this simplifies our framing, and avoids sending
- // tiny frames for headers.
- if ((len = nni_msg_header_len(msg)) != 0) {
- int rv;
- buf = nni_msg_header(msg);
- if ((rv = nni_msg_insert(msg, buf, len)) != 0) {
- return (rv);
- }
- nni_msg_header_clear(msg);
- }
-
if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
return (NNG_ENOMEM);
}
NNI_LIST_INIT(&wm->frames, ws_frame, node);
- len = nni_msg_len(msg);
- buf = nni_msg_body(msg);
- op = WS_BINARY; // to start -- no support for sending TEXT frames
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+ wm->bufsz = len;
+ if ((wm->buf = nni_alloc(len)) == NULL) {
+ NNI_FREE_STRUCT(wm);
+ return (NNG_ENOMEM);
+ }
+ buf = wm->buf;
+ memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg));
+ memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg),
+ nni_msg_len(msg));
+
+ op = WS_BINARY; // to start -- no support for sending TEXT frames
// do ... while because we want at least one frame (even for empty
// messages.) Headers get their own frame, if present. Best bet
@@ -379,7 +380,6 @@ ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
} while (len);
- wm->msg = msg;
wm->aio = aio;
wm->ws = ws;
*wmp = wm;
@@ -428,6 +428,7 @@ ws_close_cb(void *arg)
while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
nni_list_remove(&ws->txmsgs, wm);
if (wm->aio) {
+ nni_aio_list_remove(wm->aio);
nni_aio_finish_error(wm->aio, NNG_ECLOSED);
}
ws_msg_fini(wm);
@@ -546,9 +547,9 @@ ws_write_cb(void *arg)
// No other messages may succeed..
while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
nni_list_remove(&ws->txmsgs, wm);
- if (wm->aio != NULL) {
- nni_aio_set_msg(wm->aio, NULL);
- nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ if ((aio = wm->aio) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
ws_msg_fini(wm);
}
@@ -567,6 +568,7 @@ ws_write_cb(void *arg)
ws_msg_fini(wm);
if (aio != NULL) {
+ nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
@@ -579,18 +581,33 @@ ws_write_cb(void *arg)
// good frame, was it the last
nni_list_remove(&wm->frames, frame);
ws_frame_fini(frame);
- if (nni_list_empty(&wm->frames)) {
- nni_list_remove(&ws->txmsgs, wm);
- ws_msg_fini(wm);
- if (aio != NULL) {
- nni_aio_finish(aio, 0, 0);
- }
+
+ // if we still have more frames to transmit, then schedule it.
+ if (!nni_list_empty(&wm->frames)) {
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+ return;
}
+ if (aio != NULL) {
+ nni_aio_list_remove(aio);
+ }
+ nni_list_remove(&ws->txmsgs, wm);
+
// Write the next frame.
ws_start_write(ws);
-
nni_mtx_unlock(&ws->mtx);
+
+ // discard while not holding lock (just deallocations)
+ ws_msg_fini(wm);
+
+ if (aio != NULL) {
+ nng_msg *msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_set_synch(aio);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
}
static void
@@ -602,9 +619,14 @@ ws_write_cancel(nni_aio *aio, int rv)
// Is this aio active? We can tell by looking at the
// active tx frame.
- wm = nni_aio_get_prov_data(aio);
- ws = wm->ws;
+ ws = nni_aio_get_prov_data(aio);
+
nni_mtx_lock(&ws->mtx);
+ wm = nni_aio_get_prov_extra(aio, 0);
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
nni_aio_abort(ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
@@ -709,21 +731,20 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
}
nni_mtx_lock(&ws->mtx);
- nni_aio_set_msg(aio, NULL);
- if (ws->closed) {
- ws_msg_fini(wm);
- if (nni_aio_start(aio, NULL, NULL) == 0) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
+ if (nni_aio_start(aio, ws_write_cancel, ws) != 0) {
nni_mtx_unlock(&ws->mtx);
+ ws_msg_fini(wm);
return;
}
- if (nni_aio_start(aio, ws_write_cancel, wm) != 0) {
+ if (ws->closed) {
nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
ws_msg_fini(wm);
return;
}
+ nni_aio_set_prov_extra(aio, 0, wm);
+ nni_list_append(&ws->sendq, aio);
nni_list_append(&ws->txmsgs, wm);
ws_start_write(ws);
nni_mtx_unlock(&ws->mtx);
@@ -768,7 +789,7 @@ ws_start_read(nni_ws *ws)
}
static void
-ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp)
{
ws_msg *wm = nni_list_first(&ws->rxmsgs);
@@ -832,36 +853,19 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
// we have to look at the msg, since we might have got a
// control frame.
if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
- size_t len = 0;
- nni_msg *msg;
- uint8_t *body;
- int rv;
-
nni_list_remove(&ws->rxmsgs, wm);
- NNI_LIST_FOREACH (&wm->frames, frame) {
- len += frame->len;
- }
- if ((rv = nni_msg_alloc(&msg, len)) != 0) {
- nni_aio_finish_error(wm->aio, rv);
- ws_msg_fini(wm);
- ws_close(ws, WS_CLOSE_INTERNAL);
- return;
- }
- body = nni_msg_body(msg);
- NNI_LIST_FOREACH (&wm->frames, frame) {
- memcpy(body, frame->buf, frame->len);
- body += frame->len;
- }
- nni_aio_finish_msg(wm->aio, msg);
- wm->aio = NULL;
- ws_msg_fini(wm);
+ nni_aio_list_remove(wm->aio);
+ *wmp = wm;
+ } else {
+ *wmp = NULL;
}
}
static void
ws_read_cb(void *arg)
{
- nni_ws * ws = arg;
+ nni_ws * ws = arg;
+ ws_msg * wm;
nni_aio * aio = ws->rxaio;
ws_frame *frame;
int rv;
@@ -988,24 +992,61 @@ ws_read_cb(void *arg)
// At this point, we have a complete frame.
ws_unmask_frame(frame); // idempotent
- ws_read_frame_cb(ws, frame);
+ wm = NULL;
+ ws_read_frame_cb(ws, frame, &wm);
ws_start_read(ws);
nni_mtx_unlock(&ws->mtx);
+
+ // Got a good message, so we have to do the work to send it up.
+ if (wm != NULL) {
+ size_t len = 0;
+ nni_msg *msg;
+ nni_aio *aio;
+ uint8_t *body;
+ int rv;
+
+ aio = wm->aio;
+
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ len += frame->len;
+ }
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ ws_msg_fini(wm);
+ nni_ws_close_error(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ }
+ nni_aio_set_msg(wm->aio, msg);
+ nni_aio_set_synch(aio);
+ nni_aio_finish(wm->aio, 0, nni_msg_len(msg));
+ ws_msg_fini(wm);
+ }
}
static void
ws_read_cancel(nni_aio *aio, int rv)
{
- ws_msg *wm = nni_aio_get_prov_data(aio);
- nni_ws *ws = wm->ws;
+ nni_ws *ws = nni_aio_get_prov_data(aio);
+ ws_msg *wm;
nni_mtx_lock(&ws->mtx);
+ if (!nni_aio_list_active(aio)) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ wm = nni_aio_get_prov_extra(aio, 0);
if (wm == nni_list_first(&ws->rxmsgs)) {
// Cancellation will percolate back up.
nni_aio_abort(ws->rxaio, rv);
} else if (nni_list_active(&ws->rxmsgs, wm)) {
nni_list_remove(&ws->rxmsgs, wm);
ws_msg_fini(wm);
+ nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&ws->mtx);
@@ -1016,15 +1057,17 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
{
ws_msg *wm;
int rv;
- nni_mtx_lock(&ws->mtx);
+
if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
if (nni_aio_start(aio, NULL, NULL)) {
nni_aio_finish_error(aio, rv);
}
- nni_mtx_unlock(&ws->mtx);
return;
}
- if (nni_aio_start(aio, ws_read_cancel, wm) == 0) {
+ nni_mtx_lock(&ws->mtx);
+ if (nni_aio_start(aio, ws_read_cancel, ws) == 0) {
+ nni_aio_set_prov_extra(aio, 0, wm);
+ nni_list_append(&ws->recvq, aio);
nni_list_append(&ws->rxmsgs, wm);
ws_start_read(ws);
}
@@ -1207,8 +1250,9 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
goto err;
}
- // If we have no response structure, then this was completion of the
- // send of the request. Prepare an empty response, and read it.
+ // If we have no response structure, then this was completion
+ // of the send of the request. Prepare an empty response, and
+ // read it.
if (ws->res == NULL) {
if ((rv = nni_http_res_alloc(&ws->res)) != 0) {
goto err;
@@ -1310,6 +1354,8 @@ ws_init(nni_ws **wsp)
nni_mtx_init(&ws->mtx);
NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+ nni_aio_list_init(&ws->sendq);
+ nni_aio_list_init(&ws->recvq);
if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
@@ -1425,9 +1471,9 @@ ws_handler(nni_aio *aio)
}
// If the client has requested a specific subprotocol, then
- // we need to try to match it to what the handler says we support.
- // (If no suitable option is found in the handler, we fail the
- // request.)
+ // we need to try to match it to what the handler says we
+ // support. (If no suitable option is found in the handler, we
+ // fail the request.)
proto = GETH("Sec-WebSocket-Protocol");
if (proto == NULL) {
if (l->proto != NULL) {
@@ -1475,13 +1521,14 @@ ws_handler(nni_aio *aio)
if (nni_http_res_get_status(res) !=
NNG_HTTP_STATUS_SWITCHING) {
- // The hook has decided to give back a different
- // reply and we are not upgrading anymore. For
- // example the Origin might not be permitted, or
- // another level of authentication may be required.
- // (Note that the hook can also give back various
- // other headers, but it would be bad for it to
- // alter the websocket mandated headers.)
+ // The hook has decided to give back a
+ // different reply and we are not upgrading
+ // anymore. For example the Origin might not
+ // be permitted, or another level of
+ // authentication may be required. (Note that
+ // the hook can also give back various other
+ // headers, but it would be bad for it to alter
+ // the websocket mandated headers.)
nni_http_req_free(req);
nni_aio_set_output(aio, 0, res);
nni_aio_finish(aio, 0, 0);