aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-01 08:07:07 -0500
committerGarrett D'Amore <garrett@damore.org>2024-12-07 15:08:18 -0800
commit32c78f7b7b4fcfacea6b45e8601c82771f1e40b3 (patch)
tree559c4a33ae585f102b0c832c1e29afb93d40f71c
parentf8a314ea075745c244172173391e44c146837b87 (diff)
downloadnng-32c78f7b7b4fcfacea6b45e8601c82771f1e40b3.tar.gz
nng-32c78f7b7b4fcfacea6b45e8601c82771f1e40b3.tar.bz2
nng-32c78f7b7b4fcfacea6b45e8601c82771f1e40b3.zip
websocket: inline the aios
This covers both the ttransport and the supplemental layers.
-rw-r--r--src/sp/transport/ws/websocket.c71
-rw-r--r--src/supplemental/websocket/websocket.c105
2 files changed, 82 insertions, 94 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index dec646ce..805f19cf 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -26,7 +26,7 @@ struct ws_dialer {
uint16_t peer; // remote protocol
nni_list aios;
nni_mtx mtx;
- nni_aio *connaio;
+ nni_aio connaio;
nng_stream_dialer *dialer;
bool started;
};
@@ -35,7 +35,7 @@ struct ws_listener {
uint16_t peer; // remote protocol
nni_list aios;
nni_mtx mtx;
- nni_aio *accaio;
+ nni_aio accaio;
nng_stream_listener *listener;
bool started;
};
@@ -46,20 +46,19 @@ struct ws_pipe {
uint16_t peer;
nni_aio *user_txaio;
nni_aio *user_rxaio;
- nni_aio *txaio;
- nni_aio *rxaio;
+ nni_aio txaio;
+ nni_aio rxaio;
nng_stream *ws;
};
static void
wstran_pipe_send_cb(void *arg)
{
- ws_pipe *p = arg;
- nni_aio *taio;
+ ws_pipe *p = arg;
+ nni_aio *taio = &p->txaio;
nni_aio *uaio;
nni_mtx_lock(&p->mtx);
- taio = p->txaio;
uaio = p->user_txaio;
p->user_txaio = NULL;
@@ -78,7 +77,7 @@ static void
wstran_pipe_recv_cb(void *arg)
{
ws_pipe *p = arg;
- nni_aio *raio = p->rxaio;
+ nni_aio *raio = &p->rxaio;
nni_aio *uaio;
int rv;
@@ -110,7 +109,7 @@ wstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
return;
}
p->user_rxaio = NULL;
- nni_aio_abort(p->rxaio, rv);
+ nni_aio_abort(&p->rxaio, rv);
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
}
@@ -131,7 +130,7 @@ wstran_pipe_recv(void *arg, nni_aio *aio)
return;
}
p->user_rxaio = aio;
- nng_stream_recv(p->ws, p->rxaio);
+ nng_stream_recv(p->ws, &p->rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -145,7 +144,7 @@ wstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
return;
}
p->user_txaio = NULL;
- nni_aio_abort(p->txaio, rv);
+ nni_aio_abort(&p->txaio, rv);
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
}
@@ -170,10 +169,10 @@ wstran_pipe_send(void *arg, nni_aio *aio)
return;
}
p->user_txaio = aio;
- nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
+ nni_aio_set_msg(&p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
- nng_stream_send(p->ws, p->txaio);
+ nng_stream_send(p->ws, &p->txaio);
nni_mtx_unlock(&p->mtx);
}
@@ -182,8 +181,8 @@ wstran_pipe_stop(void *arg)
{
ws_pipe *p = arg;
- nni_aio_stop(p->rxaio);
- nni_aio_stop(p->txaio);
+ nni_aio_stop(&p->rxaio);
+ nni_aio_stop(&p->txaio);
}
static int
@@ -200,8 +199,8 @@ wstran_pipe_fini(void *arg)
ws_pipe *p = arg;
nng_stream_free(p->ws);
- nni_aio_free(p->rxaio);
- nni_aio_free(p->txaio);
+ nni_aio_fini(&p->rxaio);
+ nni_aio_fini(&p->txaio);
nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
@@ -212,8 +211,8 @@ wstran_pipe_close(void *arg)
{
ws_pipe *p = arg;
- nni_aio_close(p->rxaio);
- nni_aio_close(p->txaio);
+ nni_aio_close(&p->rxaio);
+ nni_aio_close(&p->txaio);
nni_mtx_lock(&p->mtx);
nng_stream_close(p->ws);
@@ -224,20 +223,16 @@ static int
wstran_pipe_alloc(ws_pipe **pipep, void *ws)
{
ws_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ p->ws = ws;
nni_mtx_init(&p->mtx);
// Initialize AIOs.
- if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) {
- wstran_pipe_fini(p);
- return (rv);
- }
- p->ws = ws;
+ nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
*pipep = p;
return (0);
@@ -300,7 +295,7 @@ wstran_listener_accept(void *arg, nni_aio *aio)
}
nni_list_append(&l->aios, aio);
if (aio == nni_list_first(&l->aios)) {
- nng_stream_listener_accept(l->listener, l->accaio);
+ nng_stream_listener_accept(l->listener, &l->accaio);
}
nni_mtx_unlock(&l->mtx);
}
@@ -337,7 +332,7 @@ wstran_dialer_connect(void *arg, nni_aio *aio)
NNI_ASSERT(nni_list_empty(&d->aios));
d->started = true;
nni_list_append(&d->aios, aio);
- nng_stream_dialer_dial(d->dialer, d->connaio);
+ nng_stream_dialer_dial(d->dialer, &d->connaio);
nni_mtx_unlock(&d->mtx);
}
@@ -377,9 +372,9 @@ wstran_dialer_fini(void *arg)
{
ws_dialer *d = arg;
- nni_aio_stop(d->connaio);
+ nni_aio_stop(&d->connaio);
nng_stream_dialer_free(d->dialer);
- nni_aio_free(d->connaio);
+ nni_aio_fini(&d->connaio);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
@@ -389,9 +384,9 @@ wstran_listener_fini(void *arg)
{
ws_listener *l = arg;
- nni_aio_stop(l->accaio);
+ nni_aio_stop(&l->accaio);
nng_stream_listener_free(l->listener);
- nni_aio_free(l->accaio);
+ nni_aio_fini(&l->accaio);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -401,7 +396,7 @@ wstran_connect_cb(void *arg)
{
ws_dialer *d = arg;
ws_pipe *p;
- nni_aio *caio = d->connaio;
+ nni_aio *caio = &d->connaio;
nni_aio *uaio;
int rv;
nng_stream *ws = NULL;
@@ -437,7 +432,7 @@ wstran_dialer_close(void *arg)
{
ws_dialer *d = arg;
- nni_aio_close(d->connaio);
+ nni_aio_close(&d->connaio);
nng_stream_dialer_close(d->dialer);
}
@@ -446,7 +441,7 @@ wstran_listener_close(void *arg)
{
ws_listener *l = arg;
- nni_aio_close(l->accaio);
+ nni_aio_close(&l->accaio);
nng_stream_listener_close(l->listener);
}
@@ -454,7 +449,7 @@ static void
wstran_accept_cb(void *arg)
{
ws_listener *l = arg;
- nni_aio *aaio = l->accaio;
+ nni_aio *aaio = &l->accaio;
nni_aio *uaio;
int rv;
@@ -502,6 +497,7 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
nni_mtx_init(&d->mtx);
nni_aio_list_init(&d->aios);
+ nni_aio_init(&d->connaio, wstran_connect_cb, d);
d->peer = nni_sock_peer_id(s);
@@ -509,7 +505,6 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s));
if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) ||
- ((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) ||
((rv = nng_stream_dialer_set_bool(
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
@@ -536,6 +531,7 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->aios);
+ nni_aio_init(&l->accaio, wstran_accept_cb, l);
l->peer = nni_sock_peer_id(s);
@@ -543,7 +539,6 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_proto_name(s));
if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) ||
- ((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) ||
((rv = nng_stream_listener_set_bool(
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 10b308d5..9f3f6d0b 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -62,11 +62,11 @@ struct nni_ws {
nni_list rxq;
ws_frame *txframe;
ws_frame *rxframe;
- nni_aio *txaio; // physical aios
- nni_aio *rxaio;
- nni_aio *closeaio; // used for lingering/draining close
- nni_aio *httpaio;
- nni_aio *connaio; // connect aio
+ nni_aio txaio; // physical aios
+ nni_aio rxaio;
+ nni_aio closeaio; // used for lingering/draining close
+ nni_aio httpaio;
+ nni_aio connaio; // connect aio
nni_aio *useraio; // user aio, during HTTP negotiation
nni_http_conn *http;
nni_http_req *req;
@@ -519,9 +519,9 @@ ws_close_cb(void *arg)
nni_ws *ws = arg;
ws_frame *frame;
- nni_aio_close(ws->txaio);
- nni_aio_close(ws->rxaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->txaio);
+ nni_aio_close(&ws->rxaio);
+ nni_aio_close(&ws->httpaio);
// Either we sent a close frame, or we didn't. Either way,
// we are done, and its time to abort everything else.
@@ -560,8 +560,8 @@ ws_close(nni_ws *ws, uint16_t code)
// pending connect request.
if (!ws->closed) {
// ABORT connection negotiation.
- nni_aio_close(ws->connaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->connaio);
+ nni_aio_close(&ws->httpaio);
ws_send_close(ws, code);
}
}
@@ -593,8 +593,8 @@ ws_start_write(nni_ws *ws)
iov[1].iov_len = frame->len;
iov[1].iov_buf = frame->buf;
}
- nni_aio_set_iov(ws->txaio, niov, iov);
- nni_http_write_full(ws->http, ws->txaio);
+ nni_aio_set_iov(&ws->txaio, niov, iov);
+ nni_http_write_full(ws->http, &ws->txaio);
}
static void
@@ -642,7 +642,7 @@ ws_write_cb(void *arg)
if (ws->peer_closed) {
if (ws->wclose) { // could assert this?
ws->wclose = false;
- nni_aio_finish(ws->closeaio, 0, 0);
+ nni_aio_finish(&ws->closeaio, 0, 0);
}
}
nni_mtx_unlock(&ws->mtx);
@@ -650,7 +650,7 @@ ws_write_cb(void *arg)
}
aio = frame->aio;
- if ((rv = nni_aio_result(ws->txaio)) != 0) {
+ if ((rv = nni_aio_result(&ws->txaio)) != 0) {
// if tx fails, we can't send a close frame either
// we expect the caller to just close this connection
frame->aio = NULL;
@@ -718,7 +718,7 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
}
frame = nni_aio_get_prov_data(aio);
if (frame == ws->txframe) {
- nni_aio_abort(ws->txaio, rv);
+ nni_aio_abort(&ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
} else {
// If scheduled, just need to remove node and complete it.
@@ -737,7 +737,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws_frame *frame;
uint8_t buf[sizeof(uint16_t)];
int rv;
- nni_aio *aio;
+ nni_aio *aio = &ws->closeaio;
NNI_PUT16(buf, code);
@@ -745,7 +745,6 @@ ws_send_close(nni_ws *ws, uint16_t code)
return;
}
ws->closed = true;
- aio = ws->closeaio;
if (nni_aio_begin(aio) != 0) {
return;
@@ -818,7 +817,7 @@ ws_start_read(nni_ws *ws)
frame->len = 0;
ws->rxframe = frame;
- aio = ws->rxaio;
+ aio = &ws->rxaio;
iov.iov_len = 2; // We want the first two bytes.
iov.iov_buf = frame->head;
nni_aio_set_iov(aio, 1, &iov);
@@ -999,7 +998,7 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws_close(ws, WS_CLOSE_NORMAL_CLOSE);
} else {
ws->wclose = false;
- nni_aio_finish(ws->closeaio, 0, 0);
+ nni_aio_finish(&ws->closeaio, 0, 0);
}
return;
default:
@@ -1014,7 +1013,7 @@ static void
ws_read_cb(void *arg)
{
nni_ws *ws = arg;
- nni_aio *aio = ws->rxaio;
+ nni_aio *aio = &ws->rxaio;
ws_frame *frame;
nni_mtx_lock(&ws->mtx);
@@ -1193,15 +1192,13 @@ ws_fini(void *arg)
ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
// Give a chance for the close frame to drain.
- if (ws->closeaio) {
- nni_aio_wait(ws->closeaio);
- }
+ nni_aio_wait(&ws->closeaio);
- nni_aio_stop(ws->rxaio);
- nni_aio_stop(ws->txaio);
- nni_aio_stop(ws->closeaio);
- nni_aio_stop(ws->httpaio);
- nni_aio_stop(ws->connaio);
+ nni_aio_stop(&ws->rxaio);
+ nni_aio_stop(&ws->txaio);
+ nni_aio_stop(&ws->closeaio);
+ nni_aio_stop(&ws->httpaio);
+ nni_aio_stop(&ws->connaio);
if (nni_list_node_active(&ws->node)) {
nni_ws_dialer *d;
@@ -1252,11 +1249,11 @@ ws_fini(void *arg)
nni_strfree(ws->reqhdrs);
nni_strfree(ws->reshdrs);
- nni_aio_free(ws->rxaio);
- nni_aio_free(ws->txaio);
- nni_aio_free(ws->closeaio);
- nni_aio_free(ws->httpaio);
- nni_aio_free(ws->connaio);
+ nni_aio_fini(&ws->rxaio);
+ nni_aio_fini(&ws->txaio);
+ nni_aio_fini(&ws->closeaio);
+ nni_aio_fini(&ws->httpaio);
+ nni_aio_fini(&ws->connaio);
nni_mtx_fini(&ws->mtx);
NNI_FREE_STRUCT(ws);
}
@@ -1335,7 +1332,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
if ((rv = nni_http_res_alloc(&ws->res)) != 0) {
goto err;
}
- nni_http_read_res(ws->http, ws->res, ws->httpaio);
+ nni_http_read_res(ws->http, ws->res, &ws->httpaio);
nni_mtx_unlock(&d->mtx);
return;
}
@@ -1419,7 +1416,7 @@ static void
ws_http_cb(void *arg)
{
nni_ws *ws = arg;
- nni_aio *aio = ws->httpaio;
+ nni_aio *aio = &ws->httpaio;
if (ws->server) {
ws_http_cb_listener(ws, aio);
@@ -1432,7 +1429,6 @@ static int
ws_init(nni_ws **wsp)
{
nni_ws *ws;
- int rv;
if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
return (NNG_ENOMEM);
@@ -1443,17 +1439,14 @@ ws_init(nni_ws **wsp)
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);
- if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) ||
- ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) {
- ws_fini(ws);
- return (rv);
- }
+ nni_aio_init(&ws->closeaio, ws_close_cb, ws);
+ nni_aio_init(&ws->txaio, ws_write_cb, ws);
+ nni_aio_init(&ws->rxaio, ws_read_cb, ws);
+ nni_aio_init(&ws->httpaio, ws_http_cb, ws);
+ nni_aio_init(&ws->connaio, ws_conn_cb, ws);
- nni_aio_set_timeout(ws->closeaio, 100);
- nni_aio_set_timeout(ws->httpaio, 2000);
+ nni_aio_set_timeout(&ws->closeaio, 100);
+ nni_aio_set_timeout(&ws->httpaio, 2000);
ws->ops.s_close = ws_str_close;
ws->ops.s_free = ws_str_free;
@@ -1676,7 +1669,7 @@ ws_handler(nni_aio *aio)
ws->listener = l;
nni_list_append(&l->reply, ws);
- nni_http_write_res(conn, res, ws->httpaio);
+ nni_http_write_res(conn, res, &ws->httpaio);
(void) nni_http_hijack(conn);
nni_aio_set_output(aio, 0, NULL);
nni_aio_finish(aio, 0, 0);
@@ -2181,7 +2174,7 @@ ws_conn_cb(void *arg)
ws = arg;
d = ws->dialer;
- if ((rv = nni_aio_result(ws->connaio)) != 0) {
+ if ((rv = nni_aio_result(&ws->connaio)) != 0) {
nni_mtx_lock(&ws->mtx);
if ((uaio = ws->useraio) != NULL) {
ws->useraio = NULL;
@@ -2205,8 +2198,8 @@ ws_conn_cb(void *arg)
nni_mtx_lock(&ws->mtx);
uaio = ws->useraio;
- http = nni_aio_get_output(ws->connaio, 0);
- nni_aio_set_output(ws->connaio, 0, NULL);
+ http = nni_aio_get_output(&ws->connaio, 0);
+ nni_aio_set_output(&ws->connaio, 0, NULL);
if (uaio == NULL) {
// This request was canceled for some reason.
nni_http_conn_fini(http);
@@ -2245,7 +2238,7 @@ ws_conn_cb(void *arg)
ws->http = http;
ws->req = req;
- nni_http_write_req(http, req, ws->httpaio);
+ nni_http_write_req(http, req, &ws->httpaio);
nni_mtx_unlock(&ws->mtx);
return;
@@ -2303,8 +2296,8 @@ ws_dialer_close(void *arg)
}
d->closed = true;
NNI_LIST_FOREACH (&d->wspend, ws) {
- nni_aio_close(ws->connaio);
- nni_aio_close(ws->httpaio);
+ nni_aio_close(&ws->connaio);
+ nni_aio_close(&ws->httpaio);
}
nni_mtx_unlock(&d->mtx);
}
@@ -2316,8 +2309,8 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_lock(&ws->mtx);
if (aio == ws->useraio) {
- nni_aio_abort(ws->connaio, rv);
- nni_aio_abort(ws->httpaio, rv);
+ nni_aio_abort(&ws->connaio, rv);
+ nni_aio_abort(&ws->httpaio, rv);
ws->useraio = NULL;
nni_aio_finish_error(aio, rv);
}
@@ -2359,7 +2352,7 @@ ws_dialer_dial(void *arg, nni_aio *aio)
ws->recv_text = d->recv_text;
ws->send_text = d->send_text;
nni_list_append(&d->wspend, ws);
- nni_http_client_connect(d->client, ws->connaio);
+ nni_http_client_connect(d->client, &ws->connaio);
nni_mtx_unlock(&d->mtx);
}