diff options
Diffstat (limited to 'src/supplemental/websocket')
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 177 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.h | 20 |
2 files changed, 95 insertions, 102 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 8e75490b..ad4ce196 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -47,7 +47,7 @@ struct nni_ws { nni_aio * httpaio; nni_aio * connaio; // connect aio nni_aio * useraio; // user aio, during HTTP negotiation - nni_http * http; + nni_http_conn * http; nni_http_req * req; nni_http_res * res; char * reqhdrs; @@ -411,10 +411,10 @@ ws_close_cb(void *arg) // we are done, and its time to abort everything else. nni_mtx_lock(&ws->mtx); - nni_http_close(ws->http); - nni_aio_cancel(ws->txaio, NNG_ECLOSED); - nni_aio_cancel(ws->rxaio, NNG_ECLOSED); - nni_aio_cancel(ws->httpaio, NNG_ECLOSED); + nni_http_conn_close(ws->http); + nni_aio_abort(ws->txaio, NNG_ECLOSED); + nni_aio_abort(ws->rxaio, NNG_ECLOSED); + nni_aio_abort(ws->httpaio, NNG_ECLOSED); // This list (receive) should be empty. while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { @@ -463,8 +463,8 @@ ws_close(nni_ws *ws, uint16_t code) // pending connect request. if (!ws->closed) { // ABORT connection negotiation. - nni_aio_cancel(ws->connaio, NNG_ECLOSED); - nni_aio_cancel(ws->httpaio, NNG_ECLOSED); + nni_aio_abort(ws->connaio, NNG_ECLOSED); + nni_aio_abort(ws->httpaio, NNG_ECLOSED); ws_send_close(ws, code); } @@ -484,6 +484,8 @@ ws_start_write(nni_ws *ws) { ws_frame *frame; ws_msg * wm; + nni_iov iov[2]; + int niov; if ((ws->txframe != NULL) || (!ws->ready)) { return; // busy @@ -498,21 +500,23 @@ ws_start_write(nni_ws *ws) NNI_ASSERT(frame != NULL); // Push it out. - ws->txframe = frame; - ws->txaio->a_niov = frame->len > 0 ? 2 : 1; - ws->txaio->a_iov[0].iov_len = frame->hlen; - ws->txaio->a_iov[0].iov_buf = frame->head; + ws->txframe = frame; + niov = 1; + iov[0].iov_len = frame->hlen; + iov[0].iov_buf = frame->head; if (frame->len > 0) { - ws->txaio->a_iov[1].iov_len = frame->len; - ws->txaio->a_iov[1].iov_buf = frame->buf; + niov++; + iov[1].iov_len = frame->len; + iov[1].iov_buf = frame->buf; } + nni_aio_set_iov(ws->txaio, niov, iov); nni_http_write_full(ws->http, ws->txaio); } static void ws_cancel_close(nni_aio *aio, int rv) { - nni_ws *ws = aio->a_prov_data; + nni_ws *ws = nni_aio_get_prov_data(aio); nni_mtx_lock(&ws->mtx); if (ws->wclose) { ws->wclose = false; @@ -567,7 +571,7 @@ ws_write_cb(void *arg) } ws->closed = true; - nni_http_close(ws->http); + nni_http_conn_close(ws->http); nni_mtx_unlock(&ws->mtx); return; } @@ -598,11 +602,11 @@ ws_write_cancel(nni_aio *aio, int rv) // Is this aio active? We can tell by looking at the // active tx frame. - wm = aio->a_prov_data; + wm = nni_aio_get_prov_data(aio); ws = wm->ws; nni_mtx_lock(&ws->mtx); if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) { - nni_aio_cancel(ws->txaio, rv); + nni_aio_abort(ws->txaio, rv); // We will wait for callback on the txaio to finish aio. } else if (nni_list_active(&ws->txmsgs, wm)) { // If scheduled, just need to remove node and complete it. @@ -731,6 +735,7 @@ ws_start_read(nni_ws *ws) ws_frame *frame; ws_msg * wm; nni_aio * aio; + nni_iov iov; if ((ws->rxframe != NULL) || ws->closed) { return; // already reading or closed @@ -755,10 +760,10 @@ ws_start_read(nni_ws *ws) frame->len = 0; ws->rxframe = frame; - aio = ws->rxaio; - aio->a_niov = 1; - aio->a_iov[0].iov_len = 2; // We want the first two bytes. - aio->a_iov[0].iov_buf = frame->head; + aio = ws->rxaio; + iov.iov_len = 2; // We want the first two bytes. + iov.iov_buf = frame->head; + nni_aio_set_iov(aio, 1, &iov); nni_http_read_full(ws->http, aio); } @@ -891,9 +896,10 @@ ws_read_cb(void *arg) // If we didn't read the full header yet, then read // the rest of it. if (frame->hlen != 2) { - aio->a_niov = 1; - aio->a_iov[0].iov_buf = frame->head + 2; - aio->a_iov[0].iov_len = frame->hlen - 2; + nni_iov iov; + iov.iov_buf = frame->head + 2; + iov.iov_len = frame->hlen - 2; + nni_aio_set_iov(aio, 1, &iov); nni_http_read_full(ws->http, aio); nni_mtx_unlock(&ws->mtx); return; @@ -954,6 +960,8 @@ ws_read_cb(void *arg) // If we expected data, then ask for it. if (frame->len != 0) { + nni_iov iov; + // Short frames can avoid an alloc if (frame->len < 126) { frame->buf = frame->sdata; @@ -968,9 +976,9 @@ ws_read_cb(void *arg) frame->bufsz = frame->len; } - aio->a_niov = 1; - aio->a_iov[0].iov_buf = frame->buf; - aio->a_iov[0].iov_len = frame->len; + iov.iov_buf = frame->buf; + iov.iov_len = frame->len; + nni_aio_set_iov(aio, 1, &iov); nni_http_read_full(ws->http, aio); nni_mtx_unlock(&ws->mtx); return; @@ -988,13 +996,13 @@ ws_read_cb(void *arg) static void ws_read_cancel(nni_aio *aio, int rv) { - ws_msg *wm = aio->a_prov_data; + ws_msg *wm = nni_aio_get_prov_data(aio); nni_ws *ws = wm->ws; nni_mtx_lock(&ws->mtx); if (wm == nni_list_first(&ws->rxmsgs)) { // Cancellation will percolate back up. - nni_aio_cancel(ws->rxaio, rv); + nni_aio_abort(ws->rxaio, rv); } else if (nni_list_active(&ws->rxmsgs, wm)) { nni_list_remove(&ws->rxmsgs, wm); ws_msg_fini(wm); @@ -1124,13 +1132,13 @@ ws_fini(void *arg) nni_mtx_unlock(&ws->mtx); if (ws->http) { - nni_http_fini(ws->http); + nni_http_conn_fini(ws->http); } if (ws->req) { - nni_http_req_fini(ws->req); + nni_http_req_free(ws->req); } if (ws->res) { - nni_http_res_fini(ws->res); + nni_http_res_free(ws->res); } nni_strfree(ws->reqhdrs); @@ -1201,7 +1209,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) // If we have no response structure, then this was completion of the // send of the request. Prepare an empty response, and read it. if (ws->res == NULL) { - if ((rv = nni_http_res_init(&ws->res)) != 0) { + if ((rv = nni_http_res_alloc(&ws->res)) != 0) { goto err; } nni_http_read_res(ws->http, ws->res, ws->httpaio); @@ -1211,17 +1219,17 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) status = nni_http_res_get_status(ws->res); switch (status) { - case NNI_HTTP_STATUS_SWITCHING: + case NNG_HTTP_STATUS_SWITCHING: break; - case NNI_HTTP_STATUS_FORBIDDEN: - case NNI_HTTP_STATUS_UNAUTHORIZED: + case NNG_HTTP_STATUS_FORBIDDEN: + case NNG_HTTP_STATUS_UNAUTHORIZED: rv = NNG_EPERM; goto err; - case NNI_HTTP_STATUS_NOT_FOUND: - case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED: + case NNG_HTTP_STATUS_NOT_FOUND: + case NNG_HTTP_STATUS_METHOD_NOT_ALLOWED: rv = NNG_ECONNREFUSED; // Treat these as refusals. goto err; - case NNI_HTTP_STATUS_BAD_REQUEST: + case NNG_HTTP_STATUS_BAD_REQUEST: default: // Perhaps we should use NNG_ETRANERR... rv = NNG_EPROTO; @@ -1360,36 +1368,30 @@ ws_handler(nni_aio *aio) { nni_ws_listener * l; nni_ws * ws; - nni_http * http; + nni_http_conn * conn; nni_http_req * req; nni_http_res * res; nni_http_handler *h; - nni_http_ctx * ctx; const char * ptr; const char * proto; uint16_t status; int rv; char key[29]; - req = nni_aio_get_input(aio, 0); - h = nni_aio_get_input(aio, 1); - ctx = nni_aio_get_input(aio, 2); - l = nni_http_handler_get_data(h, 0); - - if ((rv = nni_http_ctx_stream(ctx, &http)) != 0) { - nni_aio_finish_error(aio, rv); - return; - } + req = nni_aio_get_input(aio, 0); + h = nni_aio_get_input(aio, 1); + conn = nni_aio_get_input(aio, 2); + l = nni_http_handler_get_data(h); // Now check the headers, etc. if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) { - status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP; + status = NNG_HTTP_STATUS_HTTP_VERSION_NOT_SUPP; goto err; } if (strcmp(nni_http_req_get_method(req), "GET") != 0) { // HEAD request. We can't really deal with it. - status = NNI_HTTP_STATUS_BAD_REQUEST; + status = NNG_HTTP_STATUS_BAD_REQUEST; goto err; } @@ -1399,7 +1401,7 @@ ws_handler(nni_aio *aio) if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) || (((ptr = GETH("Transfer-Encoding")) != NULL) && (nni_strcasestr(ptr, "chunked") != NULL))) { - status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE; + status = NNG_HTTP_STATUS_PAYLOAD_TOO_LARGE; goto err; } @@ -1410,13 +1412,13 @@ ws_handler(nni_aio *aio) (!ws_contains_word(ptr, "upgrade")) || ((ptr = GETH("Sec-WebSocket-Version")) == NULL) || (strcmp(ptr, "13") != 0)) { - status = NNI_HTTP_STATUS_BAD_REQUEST; + status = NNG_HTTP_STATUS_BAD_REQUEST; goto err; } if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) || (ws_make_accept(ptr, key) != 0)) { - status = NNI_HTTP_STATUS_BAD_REQUEST; + status = NNG_HTTP_STATUS_BAD_REQUEST; goto err; } @@ -1427,51 +1429,50 @@ ws_handler(nni_aio *aio) proto = GETH("Sec-WebSocket-Protocol"); if (proto == NULL) { if (l->proto != NULL) { - status = NNI_HTTP_STATUS_BAD_REQUEST; + status = NNG_HTTP_STATUS_BAD_REQUEST; goto err; } } else if ((l->proto == NULL) || (!ws_contains_word(l->proto, proto))) { - status = NNI_HTTP_STATUS_BAD_REQUEST; + status = NNG_HTTP_STATUS_BAD_REQUEST; goto err; } - if ((rv = nni_http_res_init(&res)) != 0) { + if ((rv = nni_http_res_alloc(&res)) != 0) { // Give a chance to reply to client. - status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; goto err; } - if (nni_http_res_set_status( - res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) { - nni_http_res_fini(res); - status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + if (nni_http_res_set_status(res, NNG_HTTP_STATUS_SWITCHING) != 0) { + nni_http_res_free(res); + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; goto err; } if ((SETH("Connection", "Upgrade") != 0) || (SETH("Upgrade", "websocket") != 0) || (SETH("Sec-WebSocket-Accept", key) != 0)) { - status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; - nni_http_res_fini(res); + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_free(res); goto err; } if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) { - status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; - nni_http_res_fini(res); + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_free(res); goto err; } if (l->hookfn != NULL) { rv = l->hookfn(l->hookarg, req, res); if (rv != 0) { - nni_http_res_fini(res); + nni_http_res_free(res); nni_aio_finish_error(aio, rv); return; } if (nni_http_res_get_status(res) != - NNI_HTTP_STATUS_SWITCHING) { + NNG_HTTP_STATUS_SWITCHING) { // The hook has decided to give back a different // reply and we are not upgrading anymore. For // example the Origin might not be permitted, or @@ -1479,7 +1480,7 @@ ws_handler(nni_aio *aio) // (Note that the hook can also give back various // other headers, but it would be bad for it to // alter the websocket mandated headers.) - nni_http_req_fini(req); + nni_http_req_free(req); nni_aio_set_output(aio, 0, res); nni_aio_finish(aio, 0, 0); return; @@ -1492,12 +1493,12 @@ ws_handler(nni_aio *aio) // We are good to go, provided we can get the websocket struct, // and send the reply. if ((rv = ws_init(&ws)) != 0) { - nni_http_req_fini(req); - nni_http_res_fini(res); - status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_req_free(req); + nni_http_res_free(res); + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; goto err; } - ws->http = http; + ws->http = conn; ws->req = req; ws->res = res; ws->mode = NNI_EP_MODE_LISTEN; @@ -1506,14 +1507,14 @@ ws_handler(nni_aio *aio) nni_list_append(&l->reply, ws); nni_aio_set_data(ws->httpaio, 0, l); - nni_http_write_res(http, res, ws->httpaio); - (void) nni_http_hijack(ctx); + nni_http_write_res(conn, res, ws->httpaio); + (void) nni_http_hijack(conn); nni_aio_set_output(aio, 0, NULL); nni_aio_finish(aio, 0, 0); return; err: - if ((rv = nni_http_res_init_error(&res, status)) != 0) { + if ((rv = nni_http_res_alloc_error(&res, status)) != 0) { nni_aio_finish_error(aio, rv); } else { nni_aio_set_output(aio, 0, res); @@ -1588,7 +1589,7 @@ nni_ws_listener_proto(nni_ws_listener *l, const char *proto) static void ws_accept_cancel(nni_aio *aio, int rv) { - nni_ws_listener *l = aio->a_prov_data; + nni_ws_listener *l = nni_aio_get_prov_data(aio); nni_mtx_lock(&l->mtx); if (nni_aio_list_active(aio)) { @@ -1723,7 +1724,7 @@ ws_conn_cb(void *arg) nni_ws_dialer *d; nni_ws * ws; nni_aio * uaio; - nni_http * http; + nni_http_conn *http; nni_http_req * req = NULL; int rv; uint8_t raw[16]; @@ -1757,7 +1758,7 @@ ws_conn_cb(void *arg) nni_aio_set_output(ws->connaio, 0, NULL); if (uaio == NULL) { // This request was canceled for some reason. - nni_http_fini(http); + nni_http_conn_fini(http); nni_mtx_unlock(&ws->mtx); nni_ws_fini(ws); return; @@ -1770,11 +1771,7 @@ ws_conn_cb(void *arg) wskey[24] = '\0'; #define SETH(h, v) nni_http_req_set_header(req, h, v) - if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) || - ((rv = nni_http_req_set_uri(req, d->url->u_rawpath)) != 0) || - ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) || - ((rv = nni_http_req_set_method(req, "GET")) != 0) || - ((rv = SETH("Host", d->url->u_host)) != 0) || + if ((rv != 0) || ((rv = nni_http_req_alloc(&req, d->url)) != 0) || ((rv = SETH("Upgrade", "websocket")) != 0) || ((rv = SETH("Connection", "Upgrade")) != 0) || ((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) || @@ -1805,10 +1802,10 @@ err: nni_aio_finish_error(uaio, rv); nni_mtx_unlock(&ws->mtx); if (http != NULL) { - nni_http_fini(http); + nni_http_conn_fini(http); } if (req != NULL) { - nni_http_req_fini(req); + nni_http_req_free(req); } nni_ws_fini(ws); } @@ -1920,12 +1917,12 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) static void ws_dial_cancel(nni_aio *aio, int rv) { - nni_ws *ws = aio->a_prov_data; + nni_ws *ws = nni_aio_get_prov_data(aio); nni_mtx_lock(&ws->mtx); if (aio == ws->useraio) { - nni_aio_cancel(ws->connaio, rv); - nni_aio_cancel(ws->httpaio, rv); + nni_aio_abort(ws->connaio, rv); + nni_aio_abort(ws->httpaio, rv); ws->useraio = NULL; nni_aio_finish_error(aio, rv); } diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h index 2cabb9cc..546f41a9 100644 --- a/src/supplemental/websocket/websocket.h +++ b/src/supplemental/websocket/websocket.h @@ -13,15 +13,11 @@ #include <stdbool.h> -// Pre-defined types for some prototypes. These are from other subsystems. -typedef struct nni_http_req nni_http_req; -typedef struct nni_http_res nni_http_res; - typedef struct nni_ws nni_ws; typedef struct nni_ws_listener nni_ws_listener; typedef struct nni_ws_dialer nni_ws_dialer; -typedef int (*nni_ws_listen_hook)(void *, nni_http_req *, nni_http_res *); +typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); // Specify URL as ws://[<host>][:port][/path] // If host is missing, INADDR_ANY is assumed. If port is missing, @@ -34,7 +30,7 @@ extern void nni_ws_listener_fini(nni_ws_listener *); extern void nni_ws_listener_close(nni_ws_listener *); extern int nni_ws_listener_proto(nni_ws_listener *, const char *); extern int nni_ws_listener_listen(nni_ws_listener *); -extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *); +extern void nni_ws_listener_accept(nni_ws_listener *, nng_aio *); extern void nni_ws_listener_hook( nni_ws_listener *, nni_ws_listen_hook, void *); extern int nni_ws_listener_set_tls(nni_ws_listener *, nng_tls_config *); @@ -45,7 +41,7 @@ extern void nni_ws_dialer_fini(nni_ws_dialer *); extern void nni_ws_dialer_close(nni_ws_dialer *); extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *); extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *); -extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *); +extern void nni_ws_dialer_dial(nni_ws_dialer *, nng_aio *); extern int nni_ws_dialer_set_tls(nni_ws_dialer *, nng_tls_config *); extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **); @@ -54,10 +50,10 @@ extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **); // not confirm the server's response at the HTTP level. (It can still issue // a websocket close). -extern void nni_ws_send_msg(nni_ws *, nni_aio *); -extern void nni_ws_recv_msg(nni_ws *, nni_aio *); -extern nni_http_res *nni_ws_response(nni_ws *); -extern nni_http_req *nni_ws_request(nni_ws *); +extern void nni_ws_send_msg(nni_ws *, nng_aio *); +extern void nni_ws_recv_msg(nni_ws *, nng_aio *); +extern nng_http_res *nni_ws_response(nni_ws *); +extern nng_http_req *nni_ws_request(nni_ws *); extern int nni_ws_sock_addr(nni_ws *, nni_sockaddr *); extern int nni_ws_peer_addr(nni_ws *, nni_sockaddr *); extern void nni_ws_close(nni_ws *); @@ -69,4 +65,4 @@ extern bool nni_ws_tls_verified(nni_ws *); // The implementation will send periodic PINGs, and respond with PONGs. -#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
\ No newline at end of file +#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H |
