diff options
| -rw-r--r-- | src/sp/transport/ws/websocket.c | 160 |
1 files changed, 87 insertions, 73 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index 16929ec9..276a8b16 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -28,6 +28,7 @@ struct ws_dialer { nni_mtx mtx; nni_aio connaio; nng_stream_dialer *dialer; + nni_dialer *ndialer; bool started; }; @@ -37,20 +38,26 @@ struct ws_listener { nni_mtx mtx; nni_aio accaio; nng_stream_listener *listener; + nni_listener *nlistener; + nni_list wait_pipes; bool started; }; struct ws_pipe { - nni_mtx mtx; - bool closed; - uint16_t peer; - nni_aio *user_txaio; - nni_aio *user_rxaio; - nni_aio txaio; - nni_aio rxaio; - nng_stream *ws; + nni_mtx mtx; + bool closed; + uint16_t peer; + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio txaio; + nni_aio rxaio; + nng_stream *ws; + nni_pipe *npipe; + nni_list_node node; }; +static void wstran_listener_match(ws_listener *l); + static void wstran_pipe_send_cb(void *arg) { @@ -189,8 +196,14 @@ wstran_pipe_stop(void *arg) static int wstran_pipe_init(void *arg, nni_pipe *pipe) { - NNI_ARG_UNUSED(arg); - NNI_ARG_UNUSED(pipe); + ws_pipe *p = arg; + + p->npipe = pipe; + nni_mtx_init(&p->mtx); + + // Initialize AIOs. + nni_aio_init(&p->txaio, wstran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p); return (0); } @@ -204,7 +217,6 @@ wstran_pipe_fini(void *arg) nni_aio_fini(&p->txaio); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); } static void @@ -218,25 +230,6 @@ wstran_pipe_close(void *arg) nng_stream_close(p->ws); } -static int -wstran_pipe_alloc(ws_pipe **pipep, void *ws) -{ - ws_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - p->ws = ws; - nni_mtx_init(&p->mtx); - - // Initialize AIOs. - nni_aio_init(&p->txaio, wstran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p); - - *pipep = p; - return (0); -} - static uint16_t wstran_pipe_peer(void *arg) { @@ -253,7 +246,6 @@ wstran_listener_bind(void *arg, nng_url *url) if ((rv = nng_stream_listener_listen(l->listener)) == 0) { int port; - l->started = true; nng_stream_listener_get_int( l->listener, NNG_OPT_TCP_BOUND_PORT, &port); url->u_port = (uint32_t) port; @@ -293,9 +285,11 @@ wstran_listener_accept(void *arg, nni_aio *aio) return; } nni_list_append(&l->aios, aio); - if (aio == nni_list_first(&l->aios)) { + if (!l->started) { + l->started = true; nng_stream_listener_accept(l->listener, &l->accaio); } + wstran_listener_match(l); nni_mtx_unlock(&l->mtx); } @@ -356,6 +350,7 @@ wstran_pipe_getopt( } static nni_sp_pipe_ops ws_pipe_ops = { + .p_size = sizeof(ws_pipe), .p_init = wstran_pipe_init, .p_fini = wstran_pipe_fini, .p_stop = wstran_pipe_stop, @@ -383,7 +378,6 @@ wstran_dialer_fini(void *arg) nng_stream_dialer_free(d->dialer); nni_aio_fini(&d->connaio); nni_mtx_fini(&d->mtx); - NNI_FREE_STRUCT(d); } static void @@ -403,7 +397,6 @@ wstran_listener_fini(void *arg) nng_stream_listener_free(l->listener); nni_aio_fini(&l->accaio); nni_mtx_fini(&l->mtx); - NNI_FREE_STRUCT(l); } static void @@ -422,7 +415,6 @@ wstran_connect_cb(void *arg) } if ((uaio = nni_list_first(&d->aios)) == NULL) { // The client stopped caring about this! - nng_stream_stop(ws); nng_stream_free(ws); nni_mtx_unlock(&d->mtx); return; @@ -431,14 +423,15 @@ wstran_connect_cb(void *arg) NNI_ASSERT(nni_list_empty(&d->aios)); if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); - } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { - nng_stream_stop(ws); + } else if ((rv = nni_pipe_alloc_dialer((void **) &p, d->ndialer)) != + 0) { nng_stream_free(ws); nni_aio_finish_error(uaio, rv); } else { p->peer = d->peer; + p->ws = ws; - nni_aio_set_output(uaio, 0, p); + nni_aio_set_output(uaio, 0, p->npipe); nni_aio_finish(uaio, 0, 0); } nni_mtx_unlock(&d->mtx); @@ -457,66 +450,90 @@ static void wstran_listener_close(void *arg) { ws_listener *l = arg; + ws_pipe *p; nni_aio_close(&l->accaio); + NNI_LIST_FOREACH (&l->wait_pipes, p) { + nni_pipe_close(p->npipe); + } nng_stream_listener_close(l->listener); } static void +wstran_listener_match(ws_listener *l) +{ + nni_aio *uaio; + ws_pipe *p; + if (((uaio = nni_list_first(&l->aios)) == NULL) || + ((p = nni_list_first(&l->wait_pipes)) == NULL)) { + return; + } + + nni_list_remove(&l->wait_pipes, p); + nni_aio_list_remove(uaio); + + nni_aio_set_output(uaio, 0, p->npipe); + nni_aio_finish(uaio, 0, 0); +} + +static void wstran_accept_cb(void *arg) { ws_listener *l = arg; nni_aio *aaio = &l->accaio; nni_aio *uaio; int rv; + ws_pipe *p; + nng_stream *ws; nni_mtx_lock(&l->mtx); + + ws = nni_aio_get_output(aaio, 0); uaio = nni_list_first(&l->aios); if ((rv = nni_aio_result(aaio)) != 0) { - if (uaio != NULL) { - nni_aio_list_remove(uaio); - nni_aio_finish_error(uaio, rv); - } - } else { - nng_stream *ws = nni_aio_get_output(aaio, 0); - if (uaio != NULL) { - ws_pipe *p; - // Make a pipe - nni_aio_list_remove(uaio); - if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { - nng_stream_close(ws); - nni_aio_finish_error(uaio, rv); - } else { - p->peer = l->peer; - - nni_aio_set_output(uaio, 0, p); - nni_aio_finish(uaio, 0, 0); - } - } + goto error; } - if (!nni_list_empty(&l->aios)) { - nng_stream_listener_accept(l->listener, aaio); + + rv = nni_pipe_alloc_listener((void **) &p, l->nlistener); + if (rv != 0) { + nng_stream_free(ws); + goto error; } + p->peer = l->peer; + p->ws = ws; + + nni_list_append(&l->wait_pipes, p); + wstran_listener_match(l); + nng_stream_listener_accept(l->listener, aaio); + nni_mtx_unlock(&l->mtx); + return; + +error: + + // possibly report this upstream + if ((uaio = nni_list_first(&l->aios)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + nng_stream_listener_accept(l->listener, aaio); nni_mtx_unlock(&l->mtx); } static int wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) { - ws_dialer *d; + ws_dialer *d = (void *) dp; nni_sock *s = nni_dialer_sock(ndialer); int rv; char name[64]; - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&d->mtx); nni_aio_list_init(&d->aios); nni_aio_init(&d->connaio, wstran_connect_cb, d); - d->peer = nni_sock_peer_id(s); + d->peer = nni_sock_peer_id(s); + d->ndialer = ndialer; snprintf( name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s)); @@ -526,29 +543,26 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_dialer_set_string( d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) { - wstran_dialer_fini(d); return (rv); } - *dp = d; return (0); } static int wstran_listener_init(void **lp, nng_url *url, nni_listener *listener) { - ws_listener *l; + ws_listener *l = (void *) lp; int rv; nni_sock *s = nni_listener_sock(listener); char name[64]; - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { - return (NNG_ENOMEM); - } + l->nlistener = listener; nni_mtx_init(&l->mtx); nni_aio_list_init(&l->aios); nni_aio_init(&l->accaio, wstran_accept_cb, l); + NNI_LIST_INIT(&l->wait_pipes, ws_pipe, node); l->peer = nni_sock_peer_id(s); @@ -560,10 +574,8 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener) l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) || ((rv = nng_stream_listener_set_string( l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) { - wstran_listener_fini(l); return (rv); } - *lp = l; return (0); } @@ -669,6 +681,7 @@ wstran_listener_set_tls(void *arg, nng_tls_config *tls) } static nni_sp_dialer_ops ws_dialer_ops = { + .d_size = sizeof(ws_dialer), .d_init = wstran_dialer_init, .d_fini = wstran_dialer_fini, .d_connect = wstran_dialer_connect, @@ -681,6 +694,7 @@ static nni_sp_dialer_ops ws_dialer_ops = { }; static nni_sp_listener_ops ws_listener_ops = { + .l_size = sizeof(ws_listener), .l_init = wstran_listener_init, .l_fini = wstran_listener_fini, .l_bind = wstran_listener_bind, |
