aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket/websocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
-rw-r--r--src/supplemental/websocket/websocket.c177
1 files changed, 87 insertions, 90 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 8e75490b..ad4ce196 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -47,7 +47,7 @@ struct nni_ws {
nni_aio * httpaio;
nni_aio * connaio; // connect aio
nni_aio * useraio; // user aio, during HTTP negotiation
- nni_http * http;
+ nni_http_conn * http;
nni_http_req * req;
nni_http_res * res;
char * reqhdrs;
@@ -411,10 +411,10 @@ ws_close_cb(void *arg)
// we are done, and its time to abort everything else.
nni_mtx_lock(&ws->mtx);
- nni_http_close(ws->http);
- nni_aio_cancel(ws->txaio, NNG_ECLOSED);
- nni_aio_cancel(ws->rxaio, NNG_ECLOSED);
- nni_aio_cancel(ws->httpaio, NNG_ECLOSED);
+ nni_http_conn_close(ws->http);
+ nni_aio_abort(ws->txaio, NNG_ECLOSED);
+ nni_aio_abort(ws->rxaio, NNG_ECLOSED);
+ nni_aio_abort(ws->httpaio, NNG_ECLOSED);
// This list (receive) should be empty.
while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
@@ -463,8 +463,8 @@ ws_close(nni_ws *ws, uint16_t code)
// pending connect request.
if (!ws->closed) {
// ABORT connection negotiation.
- nni_aio_cancel(ws->connaio, NNG_ECLOSED);
- nni_aio_cancel(ws->httpaio, NNG_ECLOSED);
+ nni_aio_abort(ws->connaio, NNG_ECLOSED);
+ nni_aio_abort(ws->httpaio, NNG_ECLOSED);
ws_send_close(ws, code);
}
@@ -484,6 +484,8 @@ ws_start_write(nni_ws *ws)
{
ws_frame *frame;
ws_msg * wm;
+ nni_iov iov[2];
+ int niov;
if ((ws->txframe != NULL) || (!ws->ready)) {
return; // busy
@@ -498,21 +500,23 @@ ws_start_write(nni_ws *ws)
NNI_ASSERT(frame != NULL);
// Push it out.
- ws->txframe = frame;
- ws->txaio->a_niov = frame->len > 0 ? 2 : 1;
- ws->txaio->a_iov[0].iov_len = frame->hlen;
- ws->txaio->a_iov[0].iov_buf = frame->head;
+ ws->txframe = frame;
+ niov = 1;
+ iov[0].iov_len = frame->hlen;
+ iov[0].iov_buf = frame->head;
if (frame->len > 0) {
- ws->txaio->a_iov[1].iov_len = frame->len;
- ws->txaio->a_iov[1].iov_buf = frame->buf;
+ niov++;
+ iov[1].iov_len = frame->len;
+ iov[1].iov_buf = frame->buf;
}
+ nni_aio_set_iov(ws->txaio, niov, iov);
nni_http_write_full(ws->http, ws->txaio);
}
static void
ws_cancel_close(nni_aio *aio, int rv)
{
- nni_ws *ws = aio->a_prov_data;
+ nni_ws *ws = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ws->mtx);
if (ws->wclose) {
ws->wclose = false;
@@ -567,7 +571,7 @@ ws_write_cb(void *arg)
}
ws->closed = true;
- nni_http_close(ws->http);
+ nni_http_conn_close(ws->http);
nni_mtx_unlock(&ws->mtx);
return;
}
@@ -598,11 +602,11 @@ ws_write_cancel(nni_aio *aio, int rv)
// Is this aio active? We can tell by looking at the
// active tx frame.
- wm = aio->a_prov_data;
+ wm = nni_aio_get_prov_data(aio);
ws = wm->ws;
nni_mtx_lock(&ws->mtx);
if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
- nni_aio_cancel(ws->txaio, rv);
+ nni_aio_abort(ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
} else if (nni_list_active(&ws->txmsgs, wm)) {
// If scheduled, just need to remove node and complete it.
@@ -731,6 +735,7 @@ ws_start_read(nni_ws *ws)
ws_frame *frame;
ws_msg * wm;
nni_aio * aio;
+ nni_iov iov;
if ((ws->rxframe != NULL) || ws->closed) {
return; // already reading or closed
@@ -755,10 +760,10 @@ ws_start_read(nni_ws *ws)
frame->len = 0;
ws->rxframe = frame;
- aio = ws->rxaio;
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = 2; // We want the first two bytes.
- aio->a_iov[0].iov_buf = frame->head;
+ aio = ws->rxaio;
+ iov.iov_len = 2; // We want the first two bytes.
+ iov.iov_buf = frame->head;
+ nni_aio_set_iov(aio, 1, &iov);
nni_http_read_full(ws->http, aio);
}
@@ -891,9 +896,10 @@ ws_read_cb(void *arg)
// If we didn't read the full header yet, then read
// the rest of it.
if (frame->hlen != 2) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_buf = frame->head + 2;
- aio->a_iov[0].iov_len = frame->hlen - 2;
+ nni_iov iov;
+ iov.iov_buf = frame->head + 2;
+ iov.iov_len = frame->hlen - 2;
+ nni_aio_set_iov(aio, 1, &iov);
nni_http_read_full(ws->http, aio);
nni_mtx_unlock(&ws->mtx);
return;
@@ -954,6 +960,8 @@ ws_read_cb(void *arg)
// If we expected data, then ask for it.
if (frame->len != 0) {
+ nni_iov iov;
+
// Short frames can avoid an alloc
if (frame->len < 126) {
frame->buf = frame->sdata;
@@ -968,9 +976,9 @@ ws_read_cb(void *arg)
frame->bufsz = frame->len;
}
- aio->a_niov = 1;
- aio->a_iov[0].iov_buf = frame->buf;
- aio->a_iov[0].iov_len = frame->len;
+ iov.iov_buf = frame->buf;
+ iov.iov_len = frame->len;
+ nni_aio_set_iov(aio, 1, &iov);
nni_http_read_full(ws->http, aio);
nni_mtx_unlock(&ws->mtx);
return;
@@ -988,13 +996,13 @@ ws_read_cb(void *arg)
static void
ws_read_cancel(nni_aio *aio, int rv)
{
- ws_msg *wm = aio->a_prov_data;
+ ws_msg *wm = nni_aio_get_prov_data(aio);
nni_ws *ws = wm->ws;
nni_mtx_lock(&ws->mtx);
if (wm == nni_list_first(&ws->rxmsgs)) {
// Cancellation will percolate back up.
- nni_aio_cancel(ws->rxaio, rv);
+ nni_aio_abort(ws->rxaio, rv);
} else if (nni_list_active(&ws->rxmsgs, wm)) {
nni_list_remove(&ws->rxmsgs, wm);
ws_msg_fini(wm);
@@ -1124,13 +1132,13 @@ ws_fini(void *arg)
nni_mtx_unlock(&ws->mtx);
if (ws->http) {
- nni_http_fini(ws->http);
+ nni_http_conn_fini(ws->http);
}
if (ws->req) {
- nni_http_req_fini(ws->req);
+ nni_http_req_free(ws->req);
}
if (ws->res) {
- nni_http_res_fini(ws->res);
+ nni_http_res_free(ws->res);
}
nni_strfree(ws->reqhdrs);
@@ -1201,7 +1209,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
// If we have no response structure, then this was completion of the
// send of the request. Prepare an empty response, and read it.
if (ws->res == NULL) {
- if ((rv = nni_http_res_init(&ws->res)) != 0) {
+ if ((rv = nni_http_res_alloc(&ws->res)) != 0) {
goto err;
}
nni_http_read_res(ws->http, ws->res, ws->httpaio);
@@ -1211,17 +1219,17 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
status = nni_http_res_get_status(ws->res);
switch (status) {
- case NNI_HTTP_STATUS_SWITCHING:
+ case NNG_HTTP_STATUS_SWITCHING:
break;
- case NNI_HTTP_STATUS_FORBIDDEN:
- case NNI_HTTP_STATUS_UNAUTHORIZED:
+ case NNG_HTTP_STATUS_FORBIDDEN:
+ case NNG_HTTP_STATUS_UNAUTHORIZED:
rv = NNG_EPERM;
goto err;
- case NNI_HTTP_STATUS_NOT_FOUND:
- case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ case NNG_HTTP_STATUS_NOT_FOUND:
+ case NNG_HTTP_STATUS_METHOD_NOT_ALLOWED:
rv = NNG_ECONNREFUSED; // Treat these as refusals.
goto err;
- case NNI_HTTP_STATUS_BAD_REQUEST:
+ case NNG_HTTP_STATUS_BAD_REQUEST:
default:
// Perhaps we should use NNG_ETRANERR...
rv = NNG_EPROTO;
@@ -1360,36 +1368,30 @@ ws_handler(nni_aio *aio)
{
nni_ws_listener * l;
nni_ws * ws;
- nni_http * http;
+ nni_http_conn * conn;
nni_http_req * req;
nni_http_res * res;
nni_http_handler *h;
- nni_http_ctx * ctx;
const char * ptr;
const char * proto;
uint16_t status;
int rv;
char key[29];
- req = nni_aio_get_input(aio, 0);
- h = nni_aio_get_input(aio, 1);
- ctx = nni_aio_get_input(aio, 2);
- l = nni_http_handler_get_data(h, 0);
-
- if ((rv = nni_http_ctx_stream(ctx, &http)) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
+ req = nni_aio_get_input(aio, 0);
+ h = nni_aio_get_input(aio, 1);
+ conn = nni_aio_get_input(aio, 2);
+ l = nni_http_handler_get_data(h);
// Now check the headers, etc.
if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) {
- status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP;
+ status = NNG_HTTP_STATUS_HTTP_VERSION_NOT_SUPP;
goto err;
}
if (strcmp(nni_http_req_get_method(req), "GET") != 0) {
// HEAD request. We can't really deal with it.
- status = NNI_HTTP_STATUS_BAD_REQUEST;
+ status = NNG_HTTP_STATUS_BAD_REQUEST;
goto err;
}
@@ -1399,7 +1401,7 @@ ws_handler(nni_aio *aio)
if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) ||
(((ptr = GETH("Transfer-Encoding")) != NULL) &&
(nni_strcasestr(ptr, "chunked") != NULL))) {
- status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE;
+ status = NNG_HTTP_STATUS_PAYLOAD_TOO_LARGE;
goto err;
}
@@ -1410,13 +1412,13 @@ ws_handler(nni_aio *aio)
(!ws_contains_word(ptr, "upgrade")) ||
((ptr = GETH("Sec-WebSocket-Version")) == NULL) ||
(strcmp(ptr, "13") != 0)) {
- status = NNI_HTTP_STATUS_BAD_REQUEST;
+ status = NNG_HTTP_STATUS_BAD_REQUEST;
goto err;
}
if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) ||
(ws_make_accept(ptr, key) != 0)) {
- status = NNI_HTTP_STATUS_BAD_REQUEST;
+ status = NNG_HTTP_STATUS_BAD_REQUEST;
goto err;
}
@@ -1427,51 +1429,50 @@ ws_handler(nni_aio *aio)
proto = GETH("Sec-WebSocket-Protocol");
if (proto == NULL) {
if (l->proto != NULL) {
- status = NNI_HTTP_STATUS_BAD_REQUEST;
+ status = NNG_HTTP_STATUS_BAD_REQUEST;
goto err;
}
} else if ((l->proto == NULL) ||
(!ws_contains_word(l->proto, proto))) {
- status = NNI_HTTP_STATUS_BAD_REQUEST;
+ status = NNG_HTTP_STATUS_BAD_REQUEST;
goto err;
}
- if ((rv = nni_http_res_init(&res)) != 0) {
+ if ((rv = nni_http_res_alloc(&res)) != 0) {
// Give a chance to reply to client.
- status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
- if (nni_http_res_set_status(
- res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) {
- nni_http_res_fini(res);
- status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ if (nni_http_res_set_status(res, NNG_HTTP_STATUS_SWITCHING) != 0) {
+ nni_http_res_free(res);
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
if ((SETH("Connection", "Upgrade") != 0) ||
(SETH("Upgrade", "websocket") != 0) ||
(SETH("Sec-WebSocket-Accept", key) != 0)) {
- status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
- nni_http_res_fini(res);
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_free(res);
goto err;
}
if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) {
- status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
- nni_http_res_fini(res);
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_free(res);
goto err;
}
if (l->hookfn != NULL) {
rv = l->hookfn(l->hookarg, req, res);
if (rv != 0) {
- nni_http_res_fini(res);
+ nni_http_res_free(res);
nni_aio_finish_error(aio, rv);
return;
}
if (nni_http_res_get_status(res) !=
- NNI_HTTP_STATUS_SWITCHING) {
+ NNG_HTTP_STATUS_SWITCHING) {
// The hook has decided to give back a different
// reply and we are not upgrading anymore. For
// example the Origin might not be permitted, or
@@ -1479,7 +1480,7 @@ ws_handler(nni_aio *aio)
// (Note that the hook can also give back various
// other headers, but it would be bad for it to
// alter the websocket mandated headers.)
- nni_http_req_fini(req);
+ nni_http_req_free(req);
nni_aio_set_output(aio, 0, res);
nni_aio_finish(aio, 0, 0);
return;
@@ -1492,12 +1493,12 @@ ws_handler(nni_aio *aio)
// We are good to go, provided we can get the websocket struct,
// and send the reply.
if ((rv = ws_init(&ws)) != 0) {
- nni_http_req_fini(req);
- nni_http_res_fini(res);
- status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_req_free(req);
+ nni_http_res_free(res);
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
- ws->http = http;
+ ws->http = conn;
ws->req = req;
ws->res = res;
ws->mode = NNI_EP_MODE_LISTEN;
@@ -1506,14 +1507,14 @@ ws_handler(nni_aio *aio)
nni_list_append(&l->reply, ws);
nni_aio_set_data(ws->httpaio, 0, l);
- nni_http_write_res(http, res, ws->httpaio);
- (void) nni_http_hijack(ctx);
+ nni_http_write_res(conn, res, ws->httpaio);
+ (void) nni_http_hijack(conn);
nni_aio_set_output(aio, 0, NULL);
nni_aio_finish(aio, 0, 0);
return;
err:
- if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ if ((rv = nni_http_res_alloc_error(&res, status)) != 0) {
nni_aio_finish_error(aio, rv);
} else {
nni_aio_set_output(aio, 0, res);
@@ -1588,7 +1589,7 @@ nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
static void
ws_accept_cancel(nni_aio *aio, int rv)
{
- nni_ws_listener *l = aio->a_prov_data;
+ nni_ws_listener *l = nni_aio_get_prov_data(aio);
nni_mtx_lock(&l->mtx);
if (nni_aio_list_active(aio)) {
@@ -1723,7 +1724,7 @@ ws_conn_cb(void *arg)
nni_ws_dialer *d;
nni_ws * ws;
nni_aio * uaio;
- nni_http * http;
+ nni_http_conn *http;
nni_http_req * req = NULL;
int rv;
uint8_t raw[16];
@@ -1757,7 +1758,7 @@ ws_conn_cb(void *arg)
nni_aio_set_output(ws->connaio, 0, NULL);
if (uaio == NULL) {
// This request was canceled for some reason.
- nni_http_fini(http);
+ nni_http_conn_fini(http);
nni_mtx_unlock(&ws->mtx);
nni_ws_fini(ws);
return;
@@ -1770,11 +1771,7 @@ ws_conn_cb(void *arg)
wskey[24] = '\0';
#define SETH(h, v) nni_http_req_set_header(req, h, v)
- if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) ||
- ((rv = nni_http_req_set_uri(req, d->url->u_rawpath)) != 0) ||
- ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) ||
- ((rv = nni_http_req_set_method(req, "GET")) != 0) ||
- ((rv = SETH("Host", d->url->u_host)) != 0) ||
+ if ((rv != 0) || ((rv = nni_http_req_alloc(&req, d->url)) != 0) ||
((rv = SETH("Upgrade", "websocket")) != 0) ||
((rv = SETH("Connection", "Upgrade")) != 0) ||
((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) ||
@@ -1805,10 +1802,10 @@ err:
nni_aio_finish_error(uaio, rv);
nni_mtx_unlock(&ws->mtx);
if (http != NULL) {
- nni_http_fini(http);
+ nni_http_conn_fini(http);
}
if (req != NULL) {
- nni_http_req_fini(req);
+ nni_http_req_free(req);
}
nni_ws_fini(ws);
}
@@ -1920,12 +1917,12 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
static void
ws_dial_cancel(nni_aio *aio, int rv)
{
- nni_ws *ws = aio->a_prov_data;
+ nni_ws *ws = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ws->mtx);
if (aio == ws->useraio) {
- nni_aio_cancel(ws->connaio, rv);
- nni_aio_cancel(ws->httpaio, rv);
+ nni_aio_abort(ws->connaio, rv);
+ nni_aio_abort(ws->httpaio, rv);
ws->useraio = NULL;
nni_aio_finish_error(aio, rv);
}