aboutsummaryrefslogtreecommitdiff
path: root/src/sp/transport/ws
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-15 21:07:36 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-15 21:08:10 -0800
commita43eb958e47f4c70725299845a3b7c69bbba9467 (patch)
treeb1206cabba4b6845705b046b35f407a75cba97a0 /src/sp/transport/ws
parent58df588d6737269e5b93996cfe62acfc911269e5 (diff)
downloadnng-a43eb958e47f4c70725299845a3b7c69bbba9467.tar.gz
nng-a43eb958e47f4c70725299845a3b7c69bbba9467.tar.bz2
nng-a43eb958e47f4c70725299845a3b7c69bbba9467.zip
websocket transport: use inline SP objects
This also fixes a possible race in the listener that may cause connections to be dropped incorrectly, if the connection arrives before the common layer has posted an accept request. Instead we save the connection and potentially match later, like we do for the other protocols that need to negotiate.
Diffstat (limited to 'src/sp/transport/ws')
-rw-r--r--src/sp/transport/ws/websocket.c160
1 files changed, 87 insertions, 73 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index 16929ec9..276a8b16 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -28,6 +28,7 @@ struct ws_dialer {
nni_mtx mtx;
nni_aio connaio;
nng_stream_dialer *dialer;
+ nni_dialer *ndialer;
bool started;
};
@@ -37,20 +38,26 @@ struct ws_listener {
nni_mtx mtx;
nni_aio accaio;
nng_stream_listener *listener;
+ nni_listener *nlistener;
+ nni_list wait_pipes;
bool started;
};
struct ws_pipe {
- nni_mtx mtx;
- bool closed;
- uint16_t peer;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
- nni_aio txaio;
- nni_aio rxaio;
- nng_stream *ws;
+ nni_mtx mtx;
+ bool closed;
+ uint16_t peer;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nng_stream *ws;
+ nni_pipe *npipe;
+ nni_list_node node;
};
+static void wstran_listener_match(ws_listener *l);
+
static void
wstran_pipe_send_cb(void *arg)
{
@@ -189,8 +196,14 @@ wstran_pipe_stop(void *arg)
static int
wstran_pipe_init(void *arg, nni_pipe *pipe)
{
- NNI_ARG_UNUSED(arg);
- NNI_ARG_UNUSED(pipe);
+ ws_pipe *p = arg;
+
+ p->npipe = pipe;
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
return (0);
}
@@ -204,7 +217,6 @@ wstran_pipe_fini(void *arg)
nni_aio_fini(&p->txaio);
nni_mtx_fini(&p->mtx);
- NNI_FREE_STRUCT(p);
}
static void
@@ -218,25 +230,6 @@ wstran_pipe_close(void *arg)
nng_stream_close(p->ws);
}
-static int
-wstran_pipe_alloc(ws_pipe **pipep, void *ws)
-{
- ws_pipe *p;
-
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- p->ws = ws;
- nni_mtx_init(&p->mtx);
-
- // Initialize AIOs.
- nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
-
- *pipep = p;
- return (0);
-}
-
static uint16_t
wstran_pipe_peer(void *arg)
{
@@ -253,7 +246,6 @@ wstran_listener_bind(void *arg, nng_url *url)
if ((rv = nng_stream_listener_listen(l->listener)) == 0) {
int port;
- l->started = true;
nng_stream_listener_get_int(
l->listener, NNG_OPT_TCP_BOUND_PORT, &port);
url->u_port = (uint32_t) port;
@@ -293,9 +285,11 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_list_append(&l->aios, aio);
- if (aio == nni_list_first(&l->aios)) {
+ if (!l->started) {
+ l->started = true;
nng_stream_listener_accept(l->listener, &l->accaio);
}
+ wstran_listener_match(l);
nni_mtx_unlock(&l->mtx);
}
@@ -356,6 +350,7 @@ wstran_pipe_getopt(
}
static nni_sp_pipe_ops ws_pipe_ops = {
+ .p_size = sizeof(ws_pipe),
.p_init = wstran_pipe_init,
.p_fini = wstran_pipe_fini,
.p_stop = wstran_pipe_stop,
@@ -383,7 +378,6 @@ wstran_dialer_fini(void *arg)
nng_stream_dialer_free(d->dialer);
nni_aio_fini(&d->connaio);
nni_mtx_fini(&d->mtx);
- NNI_FREE_STRUCT(d);
}
static void
@@ -403,7 +397,6 @@ wstran_listener_fini(void *arg)
nng_stream_listener_free(l->listener);
nni_aio_fini(&l->accaio);
nni_mtx_fini(&l->mtx);
- NNI_FREE_STRUCT(l);
}
static void
@@ -422,7 +415,6 @@ wstran_connect_cb(void *arg)
}
if ((uaio = nni_list_first(&d->aios)) == NULL) {
// The client stopped caring about this!
- nng_stream_stop(ws);
nng_stream_free(ws);
nni_mtx_unlock(&d->mtx);
return;
@@ -431,14 +423,15 @@ wstran_connect_cb(void *arg)
NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
- } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
- nng_stream_stop(ws);
+ } else if ((rv = nni_pipe_alloc_dialer((void **) &p, d->ndialer)) !=
+ 0) {
nng_stream_free(ws);
nni_aio_finish_error(uaio, rv);
} else {
p->peer = d->peer;
+ p->ws = ws;
- nni_aio_set_output(uaio, 0, p);
+ nni_aio_set_output(uaio, 0, p->npipe);
nni_aio_finish(uaio, 0, 0);
}
nni_mtx_unlock(&d->mtx);
@@ -457,66 +450,90 @@ static void
wstran_listener_close(void *arg)
{
ws_listener *l = arg;
+ ws_pipe *p;
nni_aio_close(&l->accaio);
+ NNI_LIST_FOREACH (&l->wait_pipes, p) {
+ nni_pipe_close(p->npipe);
+ }
nng_stream_listener_close(l->listener);
}
static void
+wstran_listener_match(ws_listener *l)
+{
+ nni_aio *uaio;
+ ws_pipe *p;
+ if (((uaio = nni_list_first(&l->aios)) == NULL) ||
+ ((p = nni_list_first(&l->wait_pipes)) == NULL)) {
+ return;
+ }
+
+ nni_list_remove(&l->wait_pipes, p);
+ nni_aio_list_remove(uaio);
+
+ nni_aio_set_output(uaio, 0, p->npipe);
+ nni_aio_finish(uaio, 0, 0);
+}
+
+static void
wstran_accept_cb(void *arg)
{
ws_listener *l = arg;
nni_aio *aaio = &l->accaio;
nni_aio *uaio;
int rv;
+ ws_pipe *p;
+ nng_stream *ws;
nni_mtx_lock(&l->mtx);
+
+ ws = nni_aio_get_output(aaio, 0);
uaio = nni_list_first(&l->aios);
if ((rv = nni_aio_result(aaio)) != 0) {
- if (uaio != NULL) {
- nni_aio_list_remove(uaio);
- nni_aio_finish_error(uaio, rv);
- }
- } else {
- nng_stream *ws = nni_aio_get_output(aaio, 0);
- if (uaio != NULL) {
- ws_pipe *p;
- // Make a pipe
- nni_aio_list_remove(uaio);
- if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
- nng_stream_close(ws);
- nni_aio_finish_error(uaio, rv);
- } else {
- p->peer = l->peer;
-
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
- }
- }
+ goto error;
}
- if (!nni_list_empty(&l->aios)) {
- nng_stream_listener_accept(l->listener, aaio);
+
+ rv = nni_pipe_alloc_listener((void **) &p, l->nlistener);
+ if (rv != 0) {
+ nng_stream_free(ws);
+ goto error;
}
+ p->peer = l->peer;
+ p->ws = ws;
+
+ nni_list_append(&l->wait_pipes, p);
+ wstran_listener_match(l);
+ nng_stream_listener_accept(l->listener, aaio);
+ nni_mtx_unlock(&l->mtx);
+ return;
+
+error:
+
+ // possibly report this upstream
+ if ((uaio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ nng_stream_listener_accept(l->listener, aaio);
nni_mtx_unlock(&l->mtx);
}
static int
wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
- ws_dialer *d;
+ ws_dialer *d = (void *) dp;
nni_sock *s = nni_dialer_sock(ndialer);
int rv;
char name[64];
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&d->mtx);
nni_aio_list_init(&d->aios);
nni_aio_init(&d->connaio, wstran_connect_cb, d);
- d->peer = nni_sock_peer_id(s);
+ d->peer = nni_sock_peer_id(s);
+ d->ndialer = ndialer;
snprintf(
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s));
@@ -526,29 +543,26 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
- wstran_dialer_fini(d);
return (rv);
}
- *dp = d;
return (0);
}
static int
wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
{
- ws_listener *l;
+ ws_listener *l = (void *) lp;
int rv;
nni_sock *s = nni_listener_sock(listener);
char name[64];
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
+ l->nlistener = listener;
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->aios);
nni_aio_init(&l->accaio, wstran_accept_cb, l);
+ NNI_LIST_INIT(&l->wait_pipes, ws_pipe, node);
l->peer = nni_sock_peer_id(s);
@@ -560,10 +574,8 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
- wstran_listener_fini(l);
return (rv);
}
- *lp = l;
return (0);
}
@@ -669,6 +681,7 @@ wstran_listener_set_tls(void *arg, nng_tls_config *tls)
}
static nni_sp_dialer_ops ws_dialer_ops = {
+ .d_size = sizeof(ws_dialer),
.d_init = wstran_dialer_init,
.d_fini = wstran_dialer_fini,
.d_connect = wstran_dialer_connect,
@@ -681,6 +694,7 @@ static nni_sp_dialer_ops ws_dialer_ops = {
};
static nni_sp_listener_ops ws_listener_ops = {
+ .l_size = sizeof(ws_listener),
.l_init = wstran_listener_init,
.l_fini = wstran_listener_fini,
.l_bind = wstran_listener_bind,