aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/websocket')
-rw-r--r--src/supplemental/websocket/websocket.c60
1 files changed, 28 insertions, 32 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 7bd698bd..74b56437 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -655,9 +655,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws->closed = true;
aio = ws->closeaio;
- // We don't care about cancellation here. If this times out,
- // we will still shut all the physical I/O down in the callback.
- if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
ws->wclose = true;
@@ -669,6 +667,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
}
// Close frames get priority!
nni_list_prepend(&ws->txmsgs, wm);
+ nni_aio_schedule(aio, ws_cancel_close, ws);
ws_start_write(ws);
}
@@ -722,26 +721,22 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
msg = nni_aio_get_msg(aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
-
- if (nni_aio_start(aio, ws_write_cancel, ws) != 0) {
- nni_mtx_unlock(&ws->mtx);
- ws_msg_fini(wm);
- return;
- }
if (ws->closed) {
nni_mtx_unlock(&ws->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
ws_msg_fini(wm);
return;
}
+ nni_aio_schedule(aio, ws_write_cancel, ws);
nni_aio_set_prov_extra(aio, 0, wm);
nni_list_append(&ws->sendq, aio);
nni_list_append(&ws->txmsgs, wm);
@@ -1056,19 +1051,20 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
ws_msg *wm;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL)) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
- if (nni_aio_start(aio, ws_read_cancel, ws) == 0) {
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->recvq, aio);
- nni_list_append(&ws->rxmsgs, wm);
- ws_start_read(ws);
- }
+ nni_aio_schedule(aio, ws_read_cancel, ws);
+ nni_aio_set_prov_extra(aio, 0, wm);
+ nni_list_append(&ws->recvq, aio);
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+
nni_mtx_unlock(&ws->mtx);
}
@@ -1651,11 +1647,10 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
{
nni_ws *ws;
- nni_mtx_lock(&l->mtx);
- if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
- nni_mtx_unlock(&l->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&l->mtx);
@@ -1668,11 +1663,13 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
}
if ((ws = nni_list_first(&l->pend)) != NULL) {
nni_list_remove(&l->pend, ws);
+ nni_mtx_unlock(&l->mtx);
nni_aio_set_output(aio, 0, ws);
nni_aio_finish(aio, 0, 0);
- } else {
- nni_list_append(&l->aios, aio);
+ return;
}
+ nni_aio_schedule(aio, ws_accept_cancel, l);
+ nni_list_append(&l->aios, aio);
nni_mtx_unlock(&l->mtx);
}
@@ -1983,19 +1980,17 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
nni_ws *ws;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_init(&ws)) != 0) {
nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&d->mtx);
if (d->closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&d->mtx);
- ws_fini(ws);
- return;
- }
- if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) {
nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
ws_fini(ws);
return;
}
@@ -2003,6 +1998,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
ws->useraio = aio;
ws->mode = NNI_EP_MODE_DIAL;
nni_list_append(&d->wspend, ws);
+ nni_aio_schedule(aio, ws_dial_cancel, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}