aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket/websocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
-rw-r--r--src/supplemental/websocket/websocket.c105
1 files changed, 49 insertions, 56 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 10b308d5..9f3f6d0b 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -62,11 +62,11 @@ struct nni_ws {
nni_list rxq;
ws_frame *txframe;
ws_frame *rxframe;
- nni_aio *txaio; // physical aios
- nni_aio *rxaio;
- nni_aio *closeaio; // used for lingering/draining close
- nni_aio *httpaio;
- nni_aio *connaio; // connect aio
+ nni_aio txaio; // physical aios
+ nni_aio rxaio;
+ nni_aio closeaio; // used for lingering/draining close
+ nni_aio httpaio;
+ nni_aio connaio; // connect aio
nni_aio *useraio; // user aio, during HTTP negotiation
nni_http_conn *http;
nni_http_req *req;
@@ -519,9 +519,9 @@ ws_close_cb(void *arg)
nni_ws *ws = arg;
ws_frame *frame;
- nni_aio_close(ws->txaio);
- nni_aio_close(ws->rxaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->txaio);
+ nni_aio_close(&ws->rxaio);
+ nni_aio_close(&ws->httpaio);
// Either we sent a close frame, or we didn't. Either way,
// we are done, and its time to abort everything else.
@@ -560,8 +560,8 @@ ws_close(nni_ws *ws, uint16_t code)
// pending connect request.
if (!ws->closed) {
// ABORT connection negotiation.
- nni_aio_close(ws->connaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->connaio);
+ nni_aio_close(&ws->httpaio);
ws_send_close(ws, code);
}
}
@@ -593,8 +593,8 @@ ws_start_write(nni_ws *ws)
iov[1].iov_len = frame->len;
iov[1].iov_buf = frame->buf;
}
- nni_aio_set_iov(ws->txaio, niov, iov);
- nni_http_write_full(ws->http, ws->txaio);
+ nni_aio_set_iov(&ws->txaio, niov, iov);
+ nni_http_write_full(ws->http, &ws->txaio);
}
static void
@@ -642,7 +642,7 @@ ws_write_cb(void *arg)
if (ws->peer_closed) {
if (ws->wclose) { // could assert this?
ws->wclose = false;
- nni_aio_finish(ws->closeaio, 0, 0);
+ nni_aio_finish(&ws->closeaio, 0, 0);
}
}
nni_mtx_unlock(&ws->mtx);
@@ -650,7 +650,7 @@ ws_write_cb(void *arg)
}
aio = frame->aio;
- if ((rv = nni_aio_result(ws->txaio)) != 0) {
+ if ((rv = nni_aio_result(&ws->txaio)) != 0) {
// if tx fails, we can't send a close frame either
// we expect the caller to just close this connection
frame->aio = NULL;
@@ -718,7 +718,7 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
}
frame = nni_aio_get_prov_data(aio);
if (frame == ws->txframe) {
- nni_aio_abort(ws->txaio, rv);
+ nni_aio_abort(&ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
} else {
// If scheduled, just need to remove node and complete it.
@@ -737,7 +737,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws_frame *frame;
uint8_t buf[sizeof(uint16_t)];
int rv;
- nni_aio *aio;
+ nni_aio *aio = &ws->closeaio;
NNI_PUT16(buf, code);
@@ -745,7 +745,6 @@ ws_send_close(nni_ws *ws, uint16_t code)
return;
}
ws->closed = true;
- aio = ws->closeaio;
if (nni_aio_begin(aio) != 0) {
return;
@@ -818,7 +817,7 @@ ws_start_read(nni_ws *ws)
frame->len = 0;
ws->rxframe = frame;
- aio = ws->rxaio;
+ aio = &ws->rxaio;
iov.iov_len = 2; // We want the first two bytes.
iov.iov_buf = frame->head;
nni_aio_set_iov(aio, 1, &iov);
@@ -999,7 +998,7 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws_close(ws, WS_CLOSE_NORMAL_CLOSE);
} else {
ws->wclose = false;
- nni_aio_finish(ws->closeaio, 0, 0);
+ nni_aio_finish(&ws->closeaio, 0, 0);
}
return;
default:
@@ -1014,7 +1013,7 @@ static void
ws_read_cb(void *arg)
{
nni_ws *ws = arg;
- nni_aio *aio = ws->rxaio;
+ nni_aio *aio = &ws->rxaio;
ws_frame *frame;
nni_mtx_lock(&ws->mtx);
@@ -1193,15 +1192,13 @@ ws_fini(void *arg)
ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
// Give a chance for the close frame to drain.
- if (ws->closeaio) {
- nni_aio_wait(ws->closeaio);
- }
+ nni_aio_wait(&ws->closeaio);
- nni_aio_stop(ws->rxaio);
- nni_aio_stop(ws->txaio);
- nni_aio_stop(ws->closeaio);
- nni_aio_stop(ws->httpaio);
- nni_aio_stop(ws->connaio);
+ nni_aio_stop(&ws->rxaio);
+ nni_aio_stop(&ws->txaio);
+ nni_aio_stop(&ws->closeaio);
+ nni_aio_stop(&ws->httpaio);
+ nni_aio_stop(&ws->connaio);
if (nni_list_node_active(&ws->node)) {
nni_ws_dialer *d;
@@ -1252,11 +1249,11 @@ ws_fini(void *arg)
nni_strfree(ws->reqhdrs);
nni_strfree(ws->reshdrs);
- nni_aio_free(ws->rxaio);
- nni_aio_free(ws->txaio);
- nni_aio_free(ws->closeaio);
- nni_aio_free(ws->httpaio);
- nni_aio_free(ws->connaio);
+ nni_aio_fini(&ws->rxaio);
+ nni_aio_fini(&ws->txaio);
+ nni_aio_fini(&ws->closeaio);
+ nni_aio_fini(&ws->httpaio);
+ nni_aio_fini(&ws->connaio);
nni_mtx_fini(&ws->mtx);
NNI_FREE_STRUCT(ws);
}
@@ -1335,7 +1332,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
if ((rv = nni_http_res_alloc(&ws->res)) != 0) {
goto err;
}
- nni_http_read_res(ws->http, ws->res, ws->httpaio);
+ nni_http_read_res(ws->http, ws->res, &ws->httpaio);
nni_mtx_unlock(&d->mtx);
return;
}
@@ -1419,7 +1416,7 @@ static void
ws_http_cb(void *arg)
{
nni_ws *ws = arg;
- nni_aio *aio = ws->httpaio;
+ nni_aio *aio = &ws->httpaio;
if (ws->server) {
ws_http_cb_listener(ws, aio);
@@ -1432,7 +1429,6 @@ static int
ws_init(nni_ws **wsp)
{
nni_ws *ws;
- int rv;
if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
return (NNG_ENOMEM);
@@ -1443,17 +1439,14 @@ ws_init(nni_ws **wsp)
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);
- if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) {
- ws_fini(ws);
- return (rv);
- }
+ nni_aio_init(&ws->closeaio, ws_close_cb, ws);
+ nni_aio_init(&ws->txaio, ws_write_cb, ws);
+ nni_aio_init(&ws->rxaio, ws_read_cb, ws);
+ nni_aio_init(&ws->httpaio, ws_http_cb, ws);
+ nni_aio_init(&ws->connaio, ws_conn_cb, ws);
- nni_aio_set_timeout(ws->closeaio, 100);
- nni_aio_set_timeout(ws->httpaio, 2000);
+ nni_aio_set_timeout(&ws->closeaio, 100);
+ nni_aio_set_timeout(&ws->httpaio, 2000);
ws->ops.s_close = ws_str_close;
ws->ops.s_free = ws_str_free;
@@ -1676,7 +1669,7 @@ ws_handler(nni_aio *aio)
ws->listener = l;
nni_list_append(&l->reply, ws);
- nni_http_write_res(conn, res, ws->httpaio);
+ nni_http_write_res(conn, res, &ws->httpaio);
(void) nni_http_hijack(conn);
nni_aio_set_output(aio, 0, NULL);
nni_aio_finish(aio, 0, 0);
@@ -2181,7 +2174,7 @@ ws_conn_cb(void *arg)
ws = arg;
d = ws->dialer;
- if ((rv = nni_aio_result(ws->connaio)) != 0) {
+ if ((rv = nni_aio_result(&ws->connaio)) != 0) {
nni_mtx_lock(&ws->mtx);
if ((uaio = ws->useraio) != NULL) {
ws->useraio = NULL;
@@ -2205,8 +2198,8 @@ ws_conn_cb(void *arg)
nni_mtx_lock(&ws->mtx);
uaio = ws->useraio;
- http = nni_aio_get_output(ws->connaio, 0);
- nni_aio_set_output(ws->connaio, 0, NULL);
+ http = nni_aio_get_output(&ws->connaio, 0);
+ nni_aio_set_output(&ws->connaio, 0, NULL);
if (uaio == NULL) {
// This request was canceled for some reason.
nni_http_conn_fini(http);
@@ -2245,7 +2238,7 @@ ws_conn_cb(void *arg)
ws->http = http;
ws->req = req;
- nni_http_write_req(http, req, ws->httpaio);
+ nni_http_write_req(http, req, &ws->httpaio);
nni_mtx_unlock(&ws->mtx);
return;
@@ -2303,8 +2296,8 @@ ws_dialer_close(void *arg)
}
d->closed = true;
NNI_LIST_FOREACH (&d->wspend, ws) {
- nni_aio_close(ws->connaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->connaio);
+ nni_aio_close(&ws->httpaio);
}
nni_mtx_unlock(&d->mtx);
}
@@ -2316,8 +2309,8 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_lock(&ws->mtx);
if (aio == ws->useraio) {
- nni_aio_abort(ws->connaio, rv);
- nni_aio_abort(ws->httpaio, rv);
+ nni_aio_abort(&ws->connaio, rv);
+ nni_aio_abort(&ws->httpaio, rv);
ws->useraio = NULL;
nni_aio_finish_error(aio, rv);
}
@@ -2359,7 +2352,7 @@ ws_dialer_dial(void *arg, nni_aio *aio)
ws->recv_text = d->recv_text;
ws->send_text = d->send_text;
nni_list_append(&d->wspend, ws);
- nni_http_client_connect(d->client, ws->connaio);
+ nni_http_client_connect(d->client, &ws->connaio);
nni_mtx_unlock(&d->mtx);
}