aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/websocket')
-rw-r--r--src/supplemental/websocket/websocket.c329
-rw-r--r--src/supplemental/websocket/websocket.h6
2 files changed, 166 insertions, 169 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index aca09749..fe4f002f 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -30,28 +30,32 @@ typedef struct ws_header {
} ws_header;
struct nni_ws {
- nni_list_node node;
- nni_reap_item reap;
- int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
- bool closed;
- bool ready;
- bool wclose;
- nni_mtx mtx;
- nni_list txmsgs;
- nni_list rxmsgs;
- ws_frame * txframe;
- ws_frame * rxframe;
- nni_aio * txaio; // physical aios
- nni_aio * rxaio;
- nni_aio * closeaio;
- nni_aio * httpaio; // server only, HTTP reply pending
- nni_http * http;
- nni_http_req *req;
- nni_http_res *res;
- char * reqhdrs;
- char * reshdrs;
- size_t maxframe;
- size_t fragsize;
+ nni_list_node node;
+ nni_reap_item reap;
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ bool closed;
+ bool ready;
+ bool wclose;
+ nni_mtx mtx;
+ nni_list txmsgs;
+ nni_list rxmsgs;
+ ws_frame * txframe;
+ ws_frame * rxframe;
+ nni_aio * txaio; // physical aios
+ nni_aio * rxaio;
+ nni_aio * closeaio;
+ nni_aio * httpaio;
+ nni_aio * connaio; // connect aio
+ nni_aio * useraio; // user aio, during HTTP negotiation
+ nni_http * http;
+ nni_http_req * req;
+ nni_http_res * res;
+ char * reqhdrs;
+ char * reshdrs;
+ size_t maxframe;
+ size_t fragsize;
+ nni_ws_listener *listener;
+ nni_ws_dialer * dialer;
};
struct nni_ws_listener {
@@ -83,12 +87,9 @@ struct nni_ws_dialer {
nni_http_res * res;
nni_http_client *client;
nni_mtx mtx;
- nni_aio * conaio;
char * proto;
nni_url * url;
- nni_list conaios; // user aios waiting for connect.
- nni_list httpaios; // user aios waiting for HTTP nego.
- bool started;
+ nni_list wspend; // ws structures still negotiating
bool closed;
nng_sockaddr sa;
nni_list headers; // request headers
@@ -139,6 +140,10 @@ struct ws_msg {
};
static void ws_send_close(nni_ws *ws, uint16_t code);
+static void ws_conn_cb(void *);
+static void ws_close_cb(void *);
+static void ws_read_cb(void *);
+static void ws_write_cb(void *);
// This looks, case independently for a word in a list, which is either
// space or comma separated.
@@ -455,9 +460,23 @@ ws_close(nni_ws *ws, uint16_t code)
// If were closing "gracefully", then don't abort in-flight
// stuff yet. Note that reads should have stopped already.
+ // However, we *do* abort any inflight HTTP negotiation, or
+ // pending connect request.
if (!ws->closed) {
+ // ABORT connection negotiation.
+ nni_aio_cancel(ws->connaio, NNG_ECLOSED);
+ nni_aio_cancel(ws->httpaio, NNG_ECLOSED);
ws_send_close(ws, code);
- return;
+ }
+
+ if (nni_list_node_active(&ws->node)) {
+ nni_ws_dialer *d;
+
+ if ((d = ws->dialer) != NULL) {
+ nni_mtx_lock(&d->mtx);
+ nni_list_node_remove(&ws->node);
+ nni_mtx_unlock(&d->mtx);
+ }
}
}
@@ -514,7 +533,12 @@ ws_write_cb(void *arg)
nni_mtx_lock(&ws->mtx);
- if (ws->txframe->op == WS_CLOSE) {
+ if ((frame = ws->txframe) == NULL) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ ws->txframe = NULL;
+ if (frame->op == WS_CLOSE) {
// If this was a close frame, we are done.
// No other messages may succeed..
while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
@@ -533,10 +557,8 @@ ws_write_cb(void *arg)
return;
}
- frame = ws->txframe;
- wm = frame->wmsg;
- aio = wm->aio;
- ws->txframe = NULL;
+ wm = frame->wmsg;
+ aio = wm->aio;
if ((rv = nni_aio_result(ws->txaio)) != 0) {
@@ -603,7 +625,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
NNI_PUT16(buf, code);
- if (ws->closed) {
+ if (ws->closed || !ws->ready) {
return;
}
ws->closed = true;
@@ -1067,6 +1089,7 @@ ws_fini(void *arg)
nni_aio_stop(ws->txaio);
nni_aio_stop(ws->closeaio);
nni_aio_stop(ws->httpaio);
+ nni_aio_stop(ws->connaio);
nni_mtx_lock(&ws->mtx);
while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
@@ -1090,6 +1113,9 @@ ws_fini(void *arg)
}
nni_mtx_unlock(&ws->mtx);
+ if (ws->http) {
+ nni_http_fini(ws->http);
+ }
if (ws->req) {
nni_http_req_fini(ws->req);
}
@@ -1099,11 +1125,11 @@ ws_fini(void *arg)
nni_strfree(ws->reqhdrs);
nni_strfree(ws->reshdrs);
- nni_http_fini(ws->http);
nni_aio_fini(ws->rxaio);
nni_aio_fini(ws->txaio);
nni_aio_fini(ws->closeaio);
nni_aio_fini(ws->httpaio);
+ nni_aio_fini(ws->connaio);
nni_mtx_fini(&ws->mtx);
NNI_FREE_STRUCT(ws);
}
@@ -1150,10 +1176,10 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
char wskey[29];
const char * ptr;
- d = nni_aio_get_data(aio, 0);
+ d = ws->dialer;
+ uaio = ws->useraio;
nni_mtx_lock(&d->mtx);
- uaio = nni_list_first(&d->httpaios);
NNI_ASSERT(uaio != NULL);
// We have two steps. In step 1, we just sent the request,
// and need to retrieve the reply. In step two we have
@@ -1222,16 +1248,18 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
#undef GETH
// At this point, we are in business!
- ws->ready = true;
- nni_aio_list_remove(uaio);
+ nni_list_remove(&d->wspend, ws);
+ ws->ready = true;
+ ws->useraio = NULL;
nni_aio_finish_pipe(uaio, ws);
nni_mtx_unlock(&d->mtx);
return;
err:
- nni_aio_list_remove(uaio);
+ nni_list_remove(&d->wspend, ws);
+ ws->useraio = NULL;
nni_aio_finish_error(uaio, rv);
- nni_ws_fini(ws);
nni_mtx_unlock(&d->mtx);
+ nni_ws_fini(ws);
}
static void
@@ -1251,7 +1279,7 @@ ws_http_cb(void *arg)
}
static int
-ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
+ws_init(nni_ws **wsp)
{
nni_ws *ws;
int rv;
@@ -1266,8 +1294,9 @@ ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) ||
- ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) {
- nni_ws_fini(ws);
+ ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->connaio, ws_conn_cb, ws)) != 0)) {
+ ws_fini(ws);
return (rv);
}
@@ -1276,9 +1305,6 @@ ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
ws->fragsize = 1 << 20; // we won't send a frame larger than this
ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size
- ws->http = http;
- ws->req = req;
- ws->res = res;
*wsp = ws;
return (0);
}
@@ -1444,11 +1470,15 @@ ws_handler(nni_aio *aio)
// We are good to go, provided we can get the websocket struct,
// and send the reply.
- if ((rv = ws_init(&ws, http, req, res)) != 0) {
+ if ((rv = ws_init(&ws)) != 0) {
+ nni_http_req_fini(req);
nni_http_res_fini(res);
status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto err;
}
+ ws->http = http;
+ ws->req = req;
+ ws->res = res;
ws->mode = NNI_EP_MODE_LISTEN;
// XXX: Inherit fragmentation and message size limits!
@@ -1475,8 +1505,6 @@ nni_ws_listener_init(nni_ws_listener **wslp, const char *addr)
{
nni_ws_listener *l;
int rv;
- nni_aio * aio;
- nni_sockaddr sa;
char * host;
char * serv;
@@ -1511,20 +1539,7 @@ nni_ws_listener_init(nni_ws_listener **wslp, const char *addr)
l->handler.h_host = host; // ignore the port
l->handler.h_cb = ws_handler;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
- aio->a_addr = &sa;
- nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, true, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- if (rv != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
- if ((rv = nni_http_server_init(&l->server, &sa)) != 0) {
+ if ((rv = nni_http_server_init(&l->server, addr)) != 0) {
nni_ws_listener_fini(l);
return (rv);
}
@@ -1666,7 +1681,6 @@ nni_ws_listener_hook(
nni_mtx_unlock(&l->mtx);
}
-#ifdef NNG_SUPP_TLS
int
nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls)
{
@@ -1676,47 +1690,61 @@ nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls)
nni_mtx_unlock(&l->mtx);
return (rv);
}
-#endif
+
+int
+nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp)
+{
+ int rv;
+ nni_mtx_lock(&l->mtx);
+ rv = nni_http_server_get_tls(l->server, tlsp);
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
void
ws_conn_cb(void *arg)
{
- nni_ws_dialer *d = arg;
- nni_aio * aio = d->conaio;
+ nni_ws_dialer *d;
+ nni_ws * ws;
nni_aio * uaio;
nni_http * http;
nni_http_req * req = NULL;
int rv;
uint8_t raw[16];
char wskey[25];
- nni_ws * ws;
ws_header * hdr;
- nni_mtx_lock(&d->mtx);
- uaio = nni_list_first(&d->conaios);
- rv = nni_aio_result(aio);
- http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL;
+ ws = arg;
- if (uaio == NULL) {
- if (http) {
- // Nobody listening anymore - hard abort.
- nni_http_fini(http);
+ d = ws->dialer;
+ if ((rv = nni_aio_result(ws->connaio)) != 0) {
+ nni_mtx_lock(&ws->mtx);
+ if ((uaio = ws->useraio) != NULL) {
+ ws->useraio = NULL;
+ nni_aio_finish_error(uaio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ nni_mtx_lock(&d->mtx);
+ if (nni_list_node_active(&ws->node)) {
+ nni_list_remove(&d->wspend, ws);
+ nni_mtx_unlock(&d->mtx);
+ nni_ws_fini(ws);
+ } else {
+ nni_mtx_unlock(&d->mtx);
}
- nni_mtx_unlock(&d->mtx);
return;
}
- nni_aio_list_remove(uaio);
- nni_aio_set_output(aio, 0, NULL);
-
- // We are done with this aio, start another connection request while
- // we finish up, if we have more clients waiting.
- if (!nni_list_empty(&d->conaios)) {
- nni_http_client_connect(d->client, aio);
- }
-
- if (rv != 0) {
- goto err;
+ nni_mtx_lock(&ws->mtx);
+ uaio = ws->useraio;
+ http = nni_aio_get_output(ws->connaio, 0);
+ nni_aio_set_output(ws->connaio, 0, NULL);
+ if (uaio == NULL) {
+ // This request was canceled for some reason.
+ nni_http_fini(http);
+ nni_mtx_unlock(&ws->mtx);
+ nni_ws_fini(ws);
+ return;
}
for (int i = 0; i < 16; i++) {
@@ -1738,7 +1766,6 @@ ws_conn_cb(void *arg)
goto err;
}
- // If consumer asked for protocol, pass it on.
if ((d->proto != NULL) &&
((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) {
goto err;
@@ -1749,33 +1776,25 @@ ws_conn_cb(void *arg)
goto err;
}
}
-
#undef SETH
- if ((rv = ws_init(&ws, http, req, NULL)) != 0) {
- goto err;
- }
- ws->mode = NNI_EP_MODE_DIAL;
+ ws->http = http;
+ ws->req = req;
- // Move this uaio to the http wait list. Note that it is not
- // required that the uaio will be completed by this connection.
- // If another connection attempt completes first, then the first
- // aio queued will get the result.
- nni_list_append(&d->httpaios, uaio);
- nni_aio_set_data(ws->httpaio, 0, d);
nni_http_write_req(http, req, ws->httpaio);
- nni_mtx_unlock(&d->mtx);
+ nni_mtx_unlock(&ws->mtx);
return;
err:
nni_aio_finish_error(uaio, rv);
+ nni_mtx_unlock(&ws->mtx);
if (http != NULL) {
nni_http_fini(http);
}
if (req != NULL) {
nni_http_req_fini(req);
}
- nni_mtx_unlock(&d->mtx);
+ nni_ws_fini(ws);
}
void
@@ -1783,7 +1802,6 @@ nni_ws_dialer_fini(nni_ws_dialer *d)
{
ws_header *hdr;
- nni_aio_fini(d->conaio);
nni_strfree(d->proto);
while ((hdr = nni_list_first(&d->headers)) != NULL) {
nni_list_remove(&d->headers, hdr);
@@ -1806,62 +1824,20 @@ nni_ws_dialer_init(nni_ws_dialer **dp, const char *addr)
{
nni_ws_dialer *d;
int rv;
- nni_aio * aio;
- char * serv;
if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
NNI_LIST_INIT(&d->headers, ws_header, node);
+ NNI_LIST_INIT(&d->wspend, nni_ws, node);
nni_mtx_init(&d->mtx);
- nni_aio_list_init(&d->conaios);
- nni_aio_list_init(&d->httpaios);
if ((rv = nni_url_parse(&d->url, addr)) != 0) {
nni_ws_dialer_fini(d);
return (rv);
}
- // Dialer requires a valid host.
- if ((strlen(d->url->u_hostname) == 0) ||
- (strcmp(d->url->u_hostname, "*") == 0)) {
- nni_ws_dialer_fini(d);
- return (NNG_EADDRINVAL);
- }
-
- // Default port is 80 for ws, and 443 for wss.
- if ((d->url->u_port == NULL) || (strlen(d->url->u_port) == 0)) {
- if (strcmp(d->url->u_scheme, "wss") == 0) {
- serv = "443";
- } else {
- serv = "80";
- }
- } else {
- serv = d->url->u_port;
- }
-
- if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
- // XXX: this is synchronous. We should fix this in the HTTP layer.
- aio->a_addr = &d->sa;
- nni_plat_tcp_resolv(
- d->url->u_hostname, serv, NNG_AF_UNSPEC, false, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- if (rv != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
-
- if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) {
+ if ((rv = nni_http_client_init(&d->client, addr)) != 0) {
nni_ws_dialer_fini(d);
return (rv);
}
@@ -1870,7 +1846,6 @@ nni_ws_dialer_init(nni_ws_dialer **dp, const char *addr)
return (0);
}
-#ifdef NNG_SUPP_TLS
int
nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls)
{
@@ -1880,19 +1855,32 @@ nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls)
nni_mtx_unlock(&d->mtx);
return (rv);
}
-#endif
+
+int
+nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp)
+{
+ int rv;
+ nni_mtx_lock(&d->mtx);
+ rv = nni_http_client_get_tls(d->client, tlsp);
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
void
nni_ws_dialer_close(nni_ws_dialer *d)
{
+ nni_ws *ws;
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
return;
}
d->closed = true;
+ while ((ws = nni_list_first(&d->wspend)) != 0) {
+ nni_list_remove(&d->wspend, ws);
+ nni_ws_close(ws);
+ }
nni_mtx_unlock(&d->mtx);
- nni_aio_cancel(d->conaio, NNG_ECLOSED);
}
int
@@ -1916,38 +1904,45 @@ nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
static void
ws_dial_cancel(nni_aio *aio, int rv)
{
- nni_ws_dialer *d = aio->a_prov_data;
- nni_mtx_lock(&d->mtx);
- // If we are waiting, then we can cancel. Otherwise we need
- // to abort.
- if (nni_aio_list_active(aio)) {
- nni_aio_list_remove(aio);
+ nni_ws *ws = aio->a_prov_data;
+
+ nni_mtx_lock(&ws->mtx);
+ if (aio == ws->useraio) {
+ nni_aio_cancel(ws->connaio, rv);
+ nni_aio_cancel(ws->httpaio, rv);
+ ws->useraio = NULL;
nni_aio_finish_error(aio, rv);
}
- // This does not cancel in-flight client negotiations with HTTP.
- nni_mtx_unlock(&d->mtx);
+ nni_mtx_unlock(&ws->mtx);
}
void
nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
{
- nni_mtx_lock(&d->mtx);
- // First look up the host.
- if (nni_aio_start(aio, ws_dial_cancel, d) != 0) {
- nni_mtx_unlock(&d->mtx);
+ nni_ws *ws;
+ int rv;
+
+ if ((rv = ws_init(&ws)) != 0) {
+ nni_aio_finish_error(aio, rv);
return;
}
+ nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&d->mtx);
+ ws_fini(ws);
return;
}
- nni_list_append(&d->conaios, aio);
-
- if (!d->started) {
- d->started = true;
- nni_http_client_connect(d->client, d->conaio);
+ if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ ws_fini(ws);
+ return;
}
+ ws->dialer = d;
+ ws->useraio = aio;
+ ws->mode = NNI_EP_MODE_DIAL;
+ nni_list_append(&d->wspend, ws);
+ nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h
index ccf549df..9a52f78c 100644
--- a/src/supplemental/websocket/websocket.h
+++ b/src/supplemental/websocket/websocket.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -36,6 +36,7 @@ extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *);
extern void nni_ws_listener_hook(
nni_ws_listener *, nni_ws_listen_hook, void *);
extern int nni_ws_listener_set_tls(nni_ws_listener *, nng_tls_config *);
+extern int nni_ws_listener_get_tls(nni_ws_listener *, nng_tls_config **s);
extern int nni_ws_dialer_init(nni_ws_dialer **, const char *);
extern void nni_ws_dialer_fini(nni_ws_dialer *);
@@ -44,6 +45,7 @@ extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *);
extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *);
extern int nni_ws_dialer_set_tls(nni_ws_dialer *, nng_tls_config *);
+extern int nni_ws_dialer_get_tls(nni_ws_dialer *, nng_tls_config **);
// Dialer does not get a hook chance, as it can examine the request and reply
// after dial is done; this is not a 3-way handshake, so the dialer does