diff options
Diffstat (limited to 'src/supplemental/websocket')
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 329 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.h | 6 |
2 files changed, 166 insertions, 169 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index aca09749..fe4f002f 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -30,28 +30,32 @@ typedef struct ws_header { } ws_header; struct nni_ws { - nni_list_node node; - nni_reap_item reap; - int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN - bool closed; - bool ready; - bool wclose; - nni_mtx mtx; - nni_list txmsgs; - nni_list rxmsgs; - ws_frame * txframe; - ws_frame * rxframe; - nni_aio * txaio; // physical aios - nni_aio * rxaio; - nni_aio * closeaio; - nni_aio * httpaio; // server only, HTTP reply pending - nni_http * http; - nni_http_req *req; - nni_http_res *res; - char * reqhdrs; - char * reshdrs; - size_t maxframe; - size_t fragsize; + nni_list_node node; + nni_reap_item reap; + int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN + bool closed; + bool ready; + bool wclose; + nni_mtx mtx; + nni_list txmsgs; + nni_list rxmsgs; + ws_frame * txframe; + ws_frame * rxframe; + nni_aio * txaio; // physical aios + nni_aio * rxaio; + nni_aio * closeaio; + nni_aio * httpaio; + nni_aio * connaio; // connect aio + nni_aio * useraio; // user aio, during HTTP negotiation + nni_http * http; + nni_http_req * req; + nni_http_res * res; + char * reqhdrs; + char * reshdrs; + size_t maxframe; + size_t fragsize; + nni_ws_listener *listener; + nni_ws_dialer * dialer; }; struct nni_ws_listener { @@ -83,12 +87,9 @@ struct nni_ws_dialer { nni_http_res * res; nni_http_client *client; nni_mtx mtx; - nni_aio * conaio; char * proto; nni_url * url; - nni_list conaios; // user aios waiting for connect. - nni_list httpaios; // user aios waiting for HTTP nego. - bool started; + nni_list wspend; // ws structures still negotiating bool closed; nng_sockaddr sa; nni_list headers; // request headers @@ -139,6 +140,10 @@ struct ws_msg { }; static void ws_send_close(nni_ws *ws, uint16_t code); +static void ws_conn_cb(void *); +static void ws_close_cb(void *); +static void ws_read_cb(void *); +static void ws_write_cb(void *); // This looks, case independently for a word in a list, which is either // space or comma separated. @@ -455,9 +460,23 @@ ws_close(nni_ws *ws, uint16_t code) // If were closing "gracefully", then don't abort in-flight // stuff yet. Note that reads should have stopped already. + // However, we *do* abort any inflight HTTP negotiation, or + // pending connect request. if (!ws->closed) { + // ABORT connection negotiation. + nni_aio_cancel(ws->connaio, NNG_ECLOSED); + nni_aio_cancel(ws->httpaio, NNG_ECLOSED); ws_send_close(ws, code); - return; + } + + if (nni_list_node_active(&ws->node)) { + nni_ws_dialer *d; + + if ((d = ws->dialer) != NULL) { + nni_mtx_lock(&d->mtx); + nni_list_node_remove(&ws->node); + nni_mtx_unlock(&d->mtx); + } } } @@ -514,7 +533,12 @@ ws_write_cb(void *arg) nni_mtx_lock(&ws->mtx); - if (ws->txframe->op == WS_CLOSE) { + if ((frame = ws->txframe) == NULL) { + nni_mtx_unlock(&ws->mtx); + return; + } + ws->txframe = NULL; + if (frame->op == WS_CLOSE) { // If this was a close frame, we are done. // No other messages may succeed.. while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { @@ -533,10 +557,8 @@ ws_write_cb(void *arg) return; } - frame = ws->txframe; - wm = frame->wmsg; - aio = wm->aio; - ws->txframe = NULL; + wm = frame->wmsg; + aio = wm->aio; if ((rv = nni_aio_result(ws->txaio)) != 0) { @@ -603,7 +625,7 @@ ws_send_close(nni_ws *ws, uint16_t code) NNI_PUT16(buf, code); - if (ws->closed) { + if (ws->closed || !ws->ready) { return; } ws->closed = true; @@ -1067,6 +1089,7 @@ ws_fini(void *arg) nni_aio_stop(ws->txaio); nni_aio_stop(ws->closeaio); nni_aio_stop(ws->httpaio); + nni_aio_stop(ws->connaio); nni_mtx_lock(&ws->mtx); while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { @@ -1090,6 +1113,9 @@ ws_fini(void *arg) } nni_mtx_unlock(&ws->mtx); + if (ws->http) { + nni_http_fini(ws->http); + } if (ws->req) { nni_http_req_fini(ws->req); } @@ -1099,11 +1125,11 @@ ws_fini(void *arg) nni_strfree(ws->reqhdrs); nni_strfree(ws->reshdrs); - nni_http_fini(ws->http); nni_aio_fini(ws->rxaio); nni_aio_fini(ws->txaio); nni_aio_fini(ws->closeaio); nni_aio_fini(ws->httpaio); + nni_aio_fini(ws->connaio); nni_mtx_fini(&ws->mtx); NNI_FREE_STRUCT(ws); } @@ -1150,10 +1176,10 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) char wskey[29]; const char * ptr; - d = nni_aio_get_data(aio, 0); + d = ws->dialer; + uaio = ws->useraio; nni_mtx_lock(&d->mtx); - uaio = nni_list_first(&d->httpaios); NNI_ASSERT(uaio != NULL); // We have two steps. In step 1, we just sent the request, // and need to retrieve the reply. In step two we have @@ -1222,16 +1248,18 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) #undef GETH // At this point, we are in business! - ws->ready = true; - nni_aio_list_remove(uaio); + nni_list_remove(&d->wspend, ws); + ws->ready = true; + ws->useraio = NULL; nni_aio_finish_pipe(uaio, ws); nni_mtx_unlock(&d->mtx); return; err: - nni_aio_list_remove(uaio); + nni_list_remove(&d->wspend, ws); + ws->useraio = NULL; nni_aio_finish_error(uaio, rv); - nni_ws_fini(ws); nni_mtx_unlock(&d->mtx); + nni_ws_fini(ws); } static void @@ -1251,7 +1279,7 @@ ws_http_cb(void *arg) } static int -ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res) +ws_init(nni_ws **wsp) { nni_ws *ws; int rv; @@ -1266,8 +1294,9 @@ ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res) if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) || ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) || ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) || - ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) { - nni_ws_fini(ws); + ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0) || + ((rv = nni_aio_init(&ws->connaio, ws_conn_cb, ws)) != 0)) { + ws_fini(ws); return (rv); } @@ -1276,9 +1305,6 @@ ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res) ws->fragsize = 1 << 20; // we won't send a frame larger than this ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size - ws->http = http; - ws->req = req; - ws->res = res; *wsp = ws; return (0); } @@ -1444,11 +1470,15 @@ ws_handler(nni_aio *aio) // We are good to go, provided we can get the websocket struct, // and send the reply. - if ((rv = ws_init(&ws, http, req, res)) != 0) { + if ((rv = ws_init(&ws)) != 0) { + nni_http_req_fini(req); nni_http_res_fini(res); status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; goto err; } + ws->http = http; + ws->req = req; + ws->res = res; ws->mode = NNI_EP_MODE_LISTEN; // XXX: Inherit fragmentation and message size limits! @@ -1475,8 +1505,6 @@ nni_ws_listener_init(nni_ws_listener **wslp, const char *addr) { nni_ws_listener *l; int rv; - nni_aio * aio; - nni_sockaddr sa; char * host; char * serv; @@ -1511,20 +1539,7 @@ nni_ws_listener_init(nni_ws_listener **wslp, const char *addr) l->handler.h_host = host; // ignore the port l->handler.h_cb = ws_handler; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - nni_ws_listener_fini(l); - return (rv); - } - aio->a_addr = &sa; - nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, true, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - if (rv != 0) { - nni_ws_listener_fini(l); - return (rv); - } - if ((rv = nni_http_server_init(&l->server, &sa)) != 0) { + if ((rv = nni_http_server_init(&l->server, addr)) != 0) { nni_ws_listener_fini(l); return (rv); } @@ -1666,7 +1681,6 @@ nni_ws_listener_hook( nni_mtx_unlock(&l->mtx); } -#ifdef NNG_SUPP_TLS int nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls) { @@ -1676,47 +1690,61 @@ nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls) nni_mtx_unlock(&l->mtx); return (rv); } -#endif + +int +nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp) +{ + int rv; + nni_mtx_lock(&l->mtx); + rv = nni_http_server_get_tls(l->server, tlsp); + nni_mtx_unlock(&l->mtx); + return (rv); +} void ws_conn_cb(void *arg) { - nni_ws_dialer *d = arg; - nni_aio * aio = d->conaio; + nni_ws_dialer *d; + nni_ws * ws; nni_aio * uaio; nni_http * http; nni_http_req * req = NULL; int rv; uint8_t raw[16]; char wskey[25]; - nni_ws * ws; ws_header * hdr; - nni_mtx_lock(&d->mtx); - uaio = nni_list_first(&d->conaios); - rv = nni_aio_result(aio); - http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL; + ws = arg; - if (uaio == NULL) { - if (http) { - // Nobody listening anymore - hard abort. - nni_http_fini(http); + d = ws->dialer; + if ((rv = nni_aio_result(ws->connaio)) != 0) { + nni_mtx_lock(&ws->mtx); + if ((uaio = ws->useraio) != NULL) { + ws->useraio = NULL; + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&ws->mtx); + nni_mtx_lock(&d->mtx); + if (nni_list_node_active(&ws->node)) { + nni_list_remove(&d->wspend, ws); + nni_mtx_unlock(&d->mtx); + nni_ws_fini(ws); + } else { + nni_mtx_unlock(&d->mtx); } - nni_mtx_unlock(&d->mtx); return; } - nni_aio_list_remove(uaio); - nni_aio_set_output(aio, 0, NULL); - - // We are done with this aio, start another connection request while - // we finish up, if we have more clients waiting. - if (!nni_list_empty(&d->conaios)) { - nni_http_client_connect(d->client, aio); - } - - if (rv != 0) { - goto err; + nni_mtx_lock(&ws->mtx); + uaio = ws->useraio; + http = nni_aio_get_output(ws->connaio, 0); + nni_aio_set_output(ws->connaio, 0, NULL); + if (uaio == NULL) { + // This request was canceled for some reason. + nni_http_fini(http); + nni_mtx_unlock(&ws->mtx); + nni_ws_fini(ws); + return; } for (int i = 0; i < 16; i++) { @@ -1738,7 +1766,6 @@ ws_conn_cb(void *arg) goto err; } - // If consumer asked for protocol, pass it on. if ((d->proto != NULL) && ((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) { goto err; @@ -1749,33 +1776,25 @@ ws_conn_cb(void *arg) goto err; } } - #undef SETH - if ((rv = ws_init(&ws, http, req, NULL)) != 0) { - goto err; - } - ws->mode = NNI_EP_MODE_DIAL; + ws->http = http; + ws->req = req; - // Move this uaio to the http wait list. Note that it is not - // required that the uaio will be completed by this connection. - // If another connection attempt completes first, then the first - // aio queued will get the result. - nni_list_append(&d->httpaios, uaio); - nni_aio_set_data(ws->httpaio, 0, d); nni_http_write_req(http, req, ws->httpaio); - nni_mtx_unlock(&d->mtx); + nni_mtx_unlock(&ws->mtx); return; err: nni_aio_finish_error(uaio, rv); + nni_mtx_unlock(&ws->mtx); if (http != NULL) { nni_http_fini(http); } if (req != NULL) { nni_http_req_fini(req); } - nni_mtx_unlock(&d->mtx); + nni_ws_fini(ws); } void @@ -1783,7 +1802,6 @@ nni_ws_dialer_fini(nni_ws_dialer *d) { ws_header *hdr; - nni_aio_fini(d->conaio); nni_strfree(d->proto); while ((hdr = nni_list_first(&d->headers)) != NULL) { nni_list_remove(&d->headers, hdr); @@ -1806,62 +1824,20 @@ nni_ws_dialer_init(nni_ws_dialer **dp, const char *addr) { nni_ws_dialer *d; int rv; - nni_aio * aio; - char * serv; if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { return (NNG_ENOMEM); } NNI_LIST_INIT(&d->headers, ws_header, node); + NNI_LIST_INIT(&d->wspend, nni_ws, node); nni_mtx_init(&d->mtx); - nni_aio_list_init(&d->conaios); - nni_aio_list_init(&d->httpaios); if ((rv = nni_url_parse(&d->url, addr)) != 0) { nni_ws_dialer_fini(d); return (rv); } - // Dialer requires a valid host. - if ((strlen(d->url->u_hostname) == 0) || - (strcmp(d->url->u_hostname, "*") == 0)) { - nni_ws_dialer_fini(d); - return (NNG_EADDRINVAL); - } - - // Default port is 80 for ws, and 443 for wss. - if ((d->url->u_port == NULL) || (strlen(d->url->u_port) == 0)) { - if (strcmp(d->url->u_scheme, "wss") == 0) { - serv = "443"; - } else { - serv = "80"; - } - } else { - serv = d->url->u_port; - } - - if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - // XXX: this is synchronous. We should fix this in the HTTP layer. - aio->a_addr = &d->sa; - nni_plat_tcp_resolv( - d->url->u_hostname, serv, NNG_AF_UNSPEC, false, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - if (rv != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - - if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) { + if ((rv = nni_http_client_init(&d->client, addr)) != 0) { nni_ws_dialer_fini(d); return (rv); } @@ -1870,7 +1846,6 @@ nni_ws_dialer_init(nni_ws_dialer **dp, const char *addr) return (0); } -#ifdef NNG_SUPP_TLS int nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls) { @@ -1880,19 +1855,32 @@ nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls) nni_mtx_unlock(&d->mtx); return (rv); } -#endif + +int +nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp) +{ + int rv; + nni_mtx_lock(&d->mtx); + rv = nni_http_client_get_tls(d->client, tlsp); + nni_mtx_unlock(&d->mtx); + return (rv); +} void nni_ws_dialer_close(nni_ws_dialer *d) { + nni_ws *ws; nni_mtx_lock(&d->mtx); if (d->closed) { nni_mtx_unlock(&d->mtx); return; } d->closed = true; + while ((ws = nni_list_first(&d->wspend)) != 0) { + nni_list_remove(&d->wspend, ws); + nni_ws_close(ws); + } nni_mtx_unlock(&d->mtx); - nni_aio_cancel(d->conaio, NNG_ECLOSED); } int @@ -1916,38 +1904,45 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) static void ws_dial_cancel(nni_aio *aio, int rv) { - nni_ws_dialer *d = aio->a_prov_data; - nni_mtx_lock(&d->mtx); - // If we are waiting, then we can cancel. Otherwise we need - // to abort. - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); + nni_ws *ws = aio->a_prov_data; + + nni_mtx_lock(&ws->mtx); + if (aio == ws->useraio) { + nni_aio_cancel(ws->connaio, rv); + nni_aio_cancel(ws->httpaio, rv); + ws->useraio = NULL; nni_aio_finish_error(aio, rv); } - // This does not cancel in-flight client negotiations with HTTP. - nni_mtx_unlock(&d->mtx); + nni_mtx_unlock(&ws->mtx); } void nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) { - nni_mtx_lock(&d->mtx); - // First look up the host. - if (nni_aio_start(aio, ws_dial_cancel, d) != 0) { - nni_mtx_unlock(&d->mtx); + nni_ws *ws; + int rv; + + if ((rv = ws_init(&ws)) != 0) { + nni_aio_finish_error(aio, rv); return; } + nni_mtx_lock(&d->mtx); if (d->closed) { nni_aio_finish_error(aio, NNG_ECLOSED); nni_mtx_unlock(&d->mtx); + ws_fini(ws); return; } - nni_list_append(&d->conaios, aio); - - if (!d->started) { - d->started = true; - nni_http_client_connect(d->client, d->conaio); + if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) { + nni_mtx_unlock(&d->mtx); + ws_fini(ws); + return; } + ws->dialer = d; + ws->useraio = aio; + ws->mode = NNI_EP_MODE_DIAL; + nni_list_append(&d->wspend, ws); + nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); } diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h index ccf549df..9a52f78c 100644 --- a/src/supplemental/websocket/websocket.h +++ b/src/supplemental/websocket/websocket.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -36,6 +36,7 @@ extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *); extern void nni_ws_listener_hook( nni_ws_listener *, nni_ws_listen_hook, void *); extern int nni_ws_listener_set_tls(nni_ws_listener *, nng_tls_config *); +extern int nni_ws_listener_get_tls(nni_ws_listener *, nng_tls_config **s); extern int nni_ws_dialer_init(nni_ws_dialer **, const char *); extern void nni_ws_dialer_fini(nni_ws_dialer *); @@ -44,6 +45,7 @@ extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *); extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *); extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *); extern int nni_ws_dialer_set_tls(nni_ws_dialer *, nng_tls_config *); +extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **); // Dialer does not get a hook chance, as it can examine the request and reply // after dial is done; this is not a 3-way handshake, so the dialer does |
