aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/supplemental/websocket/websocket.c51
-rw-r--r--src/transport/ws/websocket.c4
2 files changed, 38 insertions, 17 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 55befdb2..bdb1a5b5 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -89,6 +89,7 @@ struct nni_ws_dialer {
nni_http_res * res;
nni_http_client *client;
nni_mtx mtx;
+ nni_cv cv;
char * proto;
nni_url * url;
nni_list wspend; // ws structures still negotiating
@@ -471,16 +472,6 @@ ws_close(nni_ws *ws, uint16_t code)
nni_aio_close(ws->httpaio);
ws_send_close(ws, code);
}
-
- if (nni_list_node_active(&ws->node)) {
- nni_ws_dialer *d;
-
- if ((d = ws->dialer) != NULL) {
- nni_mtx_lock(&d->mtx);
- nni_list_node_remove(&ws->node);
- nni_mtx_unlock(&d->mtx);
- }
- }
}
static void
@@ -1164,6 +1155,17 @@ ws_fini(void *arg)
nni_aio_stop(ws->httpaio);
nni_aio_stop(ws->connaio);
+ if (nni_list_node_active(&ws->node)) {
+ nni_ws_dialer *d;
+
+ if ((d = ws->dialer) != NULL) {
+ nni_mtx_lock(&d->mtx);
+ nni_list_node_remove(&ws->node);
+ ws->dialer = NULL;
+ nni_mtx_unlock(&d->mtx);
+ }
+ }
+
nni_mtx_lock(&ws->mtx);
while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
nni_list_remove(&ws->rxmsgs, wm);
@@ -1326,15 +1328,24 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
nni_list_remove(&d->wspend, ws);
ws->ready = true;
ws->useraio = NULL;
+ ws->dialer = NULL;
nni_aio_set_output(uaio, 0, ws);
nni_aio_finish(uaio, 0, 0);
+ if (nni_list_empty(&d->wspend)) {
+ nni_cv_wake(&d->cv);
+ }
nni_mtx_unlock(&d->mtx);
return;
err:
nni_list_remove(&d->wspend, ws);
ws->useraio = NULL;
+ ws->dialer = NULL;
+ if (nni_list_empty(&d->wspend)) {
+ nni_cv_wake(&d->cv);
+ }
nni_aio_finish_error(uaio, rv);
nni_mtx_unlock(&d->mtx);
+
nni_ws_fini(ws);
}
@@ -1821,6 +1832,10 @@ ws_conn_cb(void *arg)
nni_mtx_lock(&d->mtx);
if (nni_list_node_active(&ws->node)) {
nni_list_remove(&d->wspend, ws);
+ ws->dialer = NULL;
+ if (nni_list_empty(&d->wspend)) {
+ nni_cv_wake(&d->cv);
+ }
nni_mtx_unlock(&d->mtx);
nni_ws_fini(ws);
} else {
@@ -1892,6 +1907,12 @@ nni_ws_dialer_fini(nni_ws_dialer *d)
{
ws_header *hdr;
+ nni_mtx_lock(&d->mtx);
+ while (!nni_list_empty(&d->wspend)) {
+ nni_cv_wait(&d->cv);
+ }
+ nni_mtx_unlock(&d->mtx);
+
nni_strfree(d->proto);
while ((hdr = nni_list_first(&d->headers)) != NULL) {
nni_list_remove(&d->headers, hdr);
@@ -1905,6 +1926,7 @@ nni_ws_dialer_fini(nni_ws_dialer *d)
if (d->url) {
nni_url_free(d->url);
}
+ nni_cv_fini(&d->cv);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
@@ -1921,6 +1943,7 @@ nni_ws_dialer_init(nni_ws_dialer **dp, nni_url *url)
NNI_LIST_INIT(&d->headers, ws_header, node);
NNI_LIST_INIT(&d->wspend, nni_ws, node);
nni_mtx_init(&d->mtx);
+ nni_cv_init(&d->cv, &d->mtx);
if ((rv = nni_url_clone(&d->url, url)) != 0) {
nni_ws_dialer_fini(d);
@@ -1966,11 +1989,9 @@ nni_ws_dialer_close(nni_ws_dialer *d)
return;
}
d->closed = true;
- while ((ws = nni_list_first(&d->wspend)) != 0) {
- nni_list_remove(&d->wspend, ws);
- nni_mtx_unlock(&d->mtx);
- nni_ws_fini(ws);
- nni_mtx_lock(&d->mtx);
+ NNI_LIST_FOREACH (&d->wspend, ws) {
+ nni_aio_close(ws->connaio);
+ nni_aio_close(ws->httpaio);
}
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 7dbf6903..19cc347c 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -662,14 +662,14 @@ ws_ep_fini(void *arg)
nni_aio_stop(ep->accaio);
nni_aio_stop(ep->connaio);
- nni_aio_fini(ep->accaio);
- nni_aio_fini(ep->connaio);
if (ep->listener != NULL) {
nni_ws_listener_fini(ep->listener);
}
if (ep->dialer != NULL) {
nni_ws_dialer_fini(ep->dialer);
}
+ nni_aio_fini(ep->accaio);
+ nni_aio_fini(ep->connaio);
while ((hdr = nni_list_first(&ep->headers)) != NULL) {
nni_list_remove(&ep->headers, hdr);
nni_strfree(hdr->name);