diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-15 21:07:36 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-15 21:08:10 -0800 |
| commit | a43eb958e47f4c70725299845a3b7c69bbba9467 (patch) | |
| tree | b1206cabba4b6845705b046b35f407a75cba97a0 /src/sp/transport/ws | |
| parent | 58df588d6737269e5b93996cfe62acfc911269e5 (diff) | |
| download | nng-a43eb958e47f4c70725299845a3b7c69bbba9467.tar.gz nng-a43eb958e47f4c70725299845a3b7c69bbba9467.tar.bz2 nng-a43eb958e47f4c70725299845a3b7c69bbba9467.zip | |
websocket transport: use inline SP objects
This also fixes a possible race in the listener that may cause
connections to be dropped incorrectly, if the connection arrives
before the common layer has posted an accept request.
Instead we save the connection and potentially match later, like
we do for the other protocols that need to negotiate.
Diffstat (limited to 'src/sp/transport/ws')
| -rw-r--r-- | src/sp/transport/ws/websocket.c | 160 |
1 files changed, 87 insertions, 73 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index 16929ec9..276a8b16 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -28,6 +28,7 @@ struct ws_dialer { nni_mtx mtx; nni_aio connaio; nng_stream_dialer *dialer; + nni_dialer *ndialer; bool started; }; @@ -37,20 +38,26 @@ struct ws_listener { nni_mtx mtx; nni_aio accaio; nng_stream_listener *listener; + nni_listener *nlistener; + nni_list wait_pipes; bool started; }; struct ws_pipe { - nni_mtx mtx; - bool closed; - uint16_t peer; - nni_aio *user_txaio; - nni_aio *user_rxaio; - nni_aio txaio; - nni_aio rxaio; - nng_stream *ws; + nni_mtx mtx; + bool closed; + uint16_t peer; + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio txaio; + nni_aio rxaio; + nng_stream *ws; + nni_pipe *npipe; + nni_list_node node; }; +static void wstran_listener_match(ws_listener *l); + static void wstran_pipe_send_cb(void *arg) { @@ -189,8 +196,14 @@ wstran_pipe_stop(void *arg) static int wstran_pipe_init(void *arg, nni_pipe *pipe) { - NNI_ARG_UNUSED(arg); - NNI_ARG_UNUSED(pipe); + ws_pipe *p = arg; + + p->npipe = pipe; + nni_mtx_init(&p->mtx); + + // Initialize AIOs. + nni_aio_init(&p->txaio, wstran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p); return (0); } @@ -204,7 +217,6 @@ wstran_pipe_fini(void *arg) nni_aio_fini(&p->txaio); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); } static void @@ -218,25 +230,6 @@ wstran_pipe_close(void *arg) nng_stream_close(p->ws); } -static int -wstran_pipe_alloc(ws_pipe **pipep, void *ws) -{ - ws_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - p->ws = ws; - nni_mtx_init(&p->mtx); - - // Initialize AIOs. - nni_aio_init(&p->txaio, wstran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p); - - *pipep = p; - return (0); -} - static uint16_t wstran_pipe_peer(void *arg) { @@ -253,7 +246,6 @@ wstran_listener_bind(void *arg, nng_url *url) if ((rv = nng_stream_listener_listen(l->listener)) == 0) { int port; - l->started = true; nng_stream_listener_get_int( l->listener, NNG_OPT_TCP_BOUND_PORT, &port); url->u_port = (uint32_t) port; @@ -293,9 +285,11 @@ wstran_listener_accept(void *arg, nni_aio *aio) return; } nni_list_append(&l->aios, aio); - if (aio == nni_list_first(&l->aios)) { + if (!l->started) { + l->started = true; nng_stream_listener_accept(l->listener, &l->accaio); } + wstran_listener_match(l); nni_mtx_unlock(&l->mtx); } @@ -356,6 +350,7 @@ wstran_pipe_getopt( } static nni_sp_pipe_ops ws_pipe_ops = { + .p_size = sizeof(ws_pipe), .p_init = wstran_pipe_init, .p_fini = wstran_pipe_fini, .p_stop = wstran_pipe_stop, @@ -383,7 +378,6 @@ wstran_dialer_fini(void *arg) nng_stream_dialer_free(d->dialer); nni_aio_fini(&d->connaio); nni_mtx_fini(&d->mtx); - NNI_FREE_STRUCT(d); } static void @@ -403,7 +397,6 @@ wstran_listener_fini(void *arg) nng_stream_listener_free(l->listener); nni_aio_fini(&l->accaio); nni_mtx_fini(&l->mtx); - NNI_FREE_STRUCT(l); } static void @@ -422,7 +415,6 @@ wstran_connect_cb(void *arg) } if ((uaio = nni_list_first(&d->aios)) == NULL) { // The client stopped caring about this! - nng_stream_stop(ws); nng_stream_free(ws); nni_mtx_unlock(&d->mtx); return; @@ -431,14 +423,15 @@ wstran_connect_cb(void *arg) NNI_ASSERT(nni_list_empty(&d->aios)); if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); - } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { - nng_stream_stop(ws); + } else if ((rv = nni_pipe_alloc_dialer((void **) &p, d->ndialer)) != + 0) { nng_stream_free(ws); nni_aio_finish_error(uaio, rv); } else { p->peer = d->peer; + p->ws = ws; - nni_aio_set_output(uaio, 0, p); + nni_aio_set_output(uaio, 0, p->npipe); nni_aio_finish(uaio, 0, 0); } nni_mtx_unlock(&d->mtx); @@ -457,66 +450,90 @@ static void wstran_listener_close(void *arg) { ws_listener *l = arg; + ws_pipe *p; nni_aio_close(&l->accaio); + NNI_LIST_FOREACH (&l->wait_pipes, p) { + nni_pipe_close(p->npipe); + } nng_stream_listener_close(l->listener); } static void +wstran_listener_match(ws_listener *l) +{ + nni_aio *uaio; + ws_pipe *p; + if (((uaio = nni_list_first(&l->aios)) == NULL) || + ((p = nni_list_first(&l->wait_pipes)) == NULL)) { + return; + } + + nni_list_remove(&l->wait_pipes, p); + nni_aio_list_remove(uaio); + + nni_aio_set_output(uaio, 0, p->npipe); + nni_aio_finish(uaio, 0, 0); +} + +static void wstran_accept_cb(void *arg) { ws_listener *l = arg; nni_aio *aaio = &l->accaio; nni_aio *uaio; int rv; + ws_pipe *p; + nng_stream *ws; nni_mtx_lock(&l->mtx); + + ws = nni_aio_get_output(aaio, 0); uaio = nni_list_first(&l->aios); if ((rv = nni_aio_result(aaio)) != 0) { - if (uaio != NULL) { - nni_aio_list_remove(uaio); - nni_aio_finish_error(uaio, rv); - } - } else { - nng_stream *ws = nni_aio_get_output(aaio, 0); - if (uaio != NULL) { - ws_pipe *p; - // Make a pipe - nni_aio_list_remove(uaio); - if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { - nng_stream_close(ws); - nni_aio_finish_error(uaio, rv); - } else { - p->peer = l->peer; - - nni_aio_set_output(uaio, 0, p); - nni_aio_finish(uaio, 0, 0); - } - } + goto error; } - if (!nni_list_empty(&l->aios)) { - nng_stream_listener_accept(l->listener, aaio); + + rv = nni_pipe_alloc_listener((void **) &p, l->nlistener); + if (rv != 0) { + nng_stream_free(ws); + goto error; } + p->peer = l->peer; + p->ws = ws; + + nni_list_append(&l->wait_pipes, p); + wstran_listener_match(l); + nng_stream_listener_accept(l->listener, aaio); + nni_mtx_unlock(&l->mtx); + return; + +error: + + // possibly report this upstream + if ((uaio = nni_list_first(&l->aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + nng_stream_listener_accept(l->listener, aaio); nni_mtx_unlock(&l->mtx); } static int wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) { - ws_dialer *d; + ws_dialer *d = (void *) dp; nni_sock *s = nni_dialer_sock(ndialer); int rv; char name[64]; - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&d->mtx); nni_aio_list_init(&d->aios); nni_aio_init(&d->connaio, wstran_connect_cb, d); - d->peer = nni_sock_peer_id(s); + d->peer = nni_sock_peer_id(s); + d->ndialer = ndialer; snprintf( name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s)); @@ -526,29 +543,26 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_dialer_set_string( d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) { - wstran_dialer_fini(d); return (rv); } - *dp = d; return (0); } static int wstran_listener_init(void **lp, nng_url *url, nni_listener *listener) { - ws_listener *l; + ws_listener *l = (void *) lp; int rv; nni_sock *s = nni_listener_sock(listener); char name[64]; - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { - return (NNG_ENOMEM); - } + l->nlistener = listener; nni_mtx_init(&l->mtx); nni_aio_list_init(&l->aios); nni_aio_init(&l->accaio, wstran_accept_cb, l); + NNI_LIST_INIT(&l->wait_pipes, ws_pipe, node); l->peer = nni_sock_peer_id(s); @@ -560,10 +574,8 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener) l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_listener_set_string( l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) { - wstran_listener_fini(l); return (rv); } - *lp = l; return (0); } @@ -669,6 +681,7 @@ wstran_listener_set_tls(void *arg, nng_tls_config *tls) } static nni_sp_dialer_ops ws_dialer_ops = { + .d_size = sizeof(ws_dialer), .d_init = wstran_dialer_init, .d_fini = wstran_dialer_fini, .d_connect = wstran_dialer_connect, @@ -681,6 +694,7 @@ static nni_sp_dialer_ops ws_dialer_ops = { }; static nni_sp_listener_ops ws_listener_ops = { + .l_size = sizeof(ws_listener), .l_init = wstran_listener_init, .l_fini = wstran_listener_fini, .l_bind = wstran_listener_bind, |
