diff options
Diffstat (limited to 'src/sp/transport/ws/websocket.c')
| -rw-r--r-- | src/sp/transport/ws/websocket.c | 740 |
1 files changed, 740 insertions, 0 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c new file mode 100644 index 00000000..3f73f47f --- /dev/null +++ b/src/sp/transport/ws/websocket.c @@ -0,0 +1,740 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "supplemental/websocket/websocket.h" + +#include <nng/supplemental/tls/tls.h> +#include <nng/transport/ws/websocket.h> + +typedef struct ws_dialer ws_dialer; +typedef struct ws_listener ws_listener; +typedef struct ws_pipe ws_pipe; + +struct ws_dialer { + uint16_t peer; // remote protocol + nni_list aios; + nni_mtx mtx; + nni_aio * connaio; + nng_stream_dialer *dialer; + bool started; +}; + +struct ws_listener { + uint16_t peer; // remote protocol + nni_list aios; + nni_mtx mtx; + nni_aio * accaio; + nng_stream_listener *listener; + bool started; +}; + +struct ws_pipe { + nni_mtx mtx; + bool closed; + uint16_t peer; + nni_aio * user_txaio; + nni_aio * user_rxaio; + nni_aio * txaio; + nni_aio * rxaio; + nng_stream *ws; +}; + +static void +wstran_pipe_send_cb(void *arg) +{ + ws_pipe *p = arg; + nni_aio *taio; + nni_aio *uaio; + + nni_mtx_lock(&p->mtx); + taio = p->txaio; + uaio = p->user_txaio; + p->user_txaio = NULL; + + if (uaio != NULL) { + int rv; + if ((rv = nni_aio_result(taio)) != 0) { + nni_aio_finish_error(uaio, rv); + } else { + nni_aio_finish(uaio, 0, 0); + } + } + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_recv_cb(void *arg) +{ + ws_pipe *p = arg; + nni_aio *raio = p->rxaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&p->mtx); + uaio = p->user_rxaio; + p->user_rxaio = NULL; + if ((rv = nni_aio_result(raio)) != 0) { + if (uaio != NULL) { + nni_aio_finish_error(uaio, rv); + } + } else { + nni_msg *msg = nni_aio_get_msg(raio); + if (uaio != NULL) { + nni_aio_finish_msg(uaio, msg); + } else { + nni_msg_free(msg); + } + } + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + ws_pipe *p = arg; + nni_mtx_lock(&p->mtx); + if (p->user_rxaio != aio) { + nni_mtx_unlock(&p->mtx); + return; + } + p->user_rxaio = NULL; + nni_aio_abort(p->rxaio, rv); + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_recv(void *arg, nni_aio *aio) +{ + ws_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + p->user_rxaio = aio; + nng_stream_recv(p->ws, p->rxaio); + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) +{ + ws_pipe *p = arg; + nni_mtx_lock(&p->mtx); + if (p->user_txaio != aio) { + nni_mtx_unlock(&p->mtx); + return; + } + p->user_txaio = NULL; + nni_aio_abort(p->txaio, rv); + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_send(void *arg, nni_aio *aio) +{ + ws_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + p->user_txaio = aio; + nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + + nng_stream_send(p->ws, p->txaio); + nni_mtx_unlock(&p->mtx); +} + +static void +wstran_pipe_stop(void *arg) +{ + ws_pipe *p = arg; + + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); +} + +static int +wstran_pipe_init(void *arg, nni_pipe *pipe) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(pipe); + return (0); +} + +static void +wstran_pipe_fini(void *arg) +{ + ws_pipe *p = arg; + + nni_aio_free(p->rxaio); + nni_aio_free(p->txaio); + + nng_stream_free(p->ws); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static void +wstran_pipe_close(void *arg) +{ + ws_pipe *p = arg; + + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + + nni_mtx_lock(&p->mtx); + nng_stream_close(p->ws); + nni_mtx_unlock(&p->mtx); +} + +static int +wstran_pipe_alloc(ws_pipe **pipep, void *ws) +{ + ws_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + + // Initialize AIOs. + if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) { + wstran_pipe_fini(p); + return (rv); + } + p->ws = ws; + + *pipep = p; + return (0); +} + +static uint16_t +wstran_pipe_peer(void *arg) +{ + ws_pipe *p = arg; + + return (p->peer); +} + +static int +ws_listener_bind(void *arg) +{ + ws_listener *l = arg; + int rv; + + if ((rv = nng_stream_listener_listen(l->listener)) == 0) { + l->started = true; + } + return (rv); +} + +static void +ws_listener_cancel(nni_aio *aio, void *arg, int rv) +{ + ws_listener *l = arg; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +static void +wstran_listener_accept(void *arg, nni_aio *aio) +{ + ws_listener *l = arg; + int rv; + + // We already bound, so we just need to look for an available + // pipe (created by the handler), and match it. + // Otherwise we stick the AIO in the accept list. + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&l->mtx); + if ((rv = nni_aio_schedule(aio, ws_listener_cancel, l)) != 0) { + nni_mtx_unlock(&l->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&l->aios, aio); + if (aio == nni_list_first(&l->aios)) { + nng_stream_listener_accept(l->listener, l->accaio); + } + nni_mtx_unlock(&l->mtx); +} + +static void +wstran_dialer_cancel(nni_aio *aio, void *arg, int rv) +{ + ws_dialer *d = arg; + + nni_mtx_lock(&d->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&d->mtx); +} + +static void +wstran_dialer_connect(void *arg, nni_aio *aio) +{ + ws_dialer *d = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&d->mtx); + if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + return; + } + NNI_ASSERT(nni_list_empty(&d->aios)); + d->started = true; + nni_list_append(&d->aios, aio); + nng_stream_dialer_dial(d->dialer, d->connaio); + nni_mtx_unlock(&d->mtx); +} + +static const nni_option ws_pipe_options[] = { + // terminate list + { + .o_name = NULL, + } +}; + +static int +wstran_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + ws_pipe *p = arg; + int rv; + + if ((rv = nni_stream_get(p->ws, name, buf, szp, t)) == NNG_ENOTSUP) { + rv = nni_getopt(ws_pipe_options, name, p, buf, szp, t); + } + return (rv); +} + +static nni_tran_pipe_ops ws_pipe_ops = { + .p_init = wstran_pipe_init, + .p_fini = wstran_pipe_fini, + .p_stop = wstran_pipe_stop, + .p_send = wstran_pipe_send, + .p_recv = wstran_pipe_recv, + .p_close = wstran_pipe_close, + .p_peer = wstran_pipe_peer, + .p_getopt = wstran_pipe_getopt, +}; + +static void +wstran_dialer_fini(void *arg) +{ + ws_dialer *d = arg; + + nni_aio_stop(d->connaio); + nng_stream_dialer_free(d->dialer); + nni_aio_free(d->connaio); + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +static void +wstran_listener_fini(void *arg) +{ + ws_listener *l = arg; + + nni_aio_stop(l->accaio); + nng_stream_listener_free(l->listener); + nni_aio_free(l->accaio); + nni_mtx_fini(&l->mtx); + NNI_FREE_STRUCT(l); +} + +static void +wstran_connect_cb(void *arg) +{ + ws_dialer * d = arg; + ws_pipe * p; + nni_aio * caio = d->connaio; + nni_aio * uaio; + int rv; + nng_stream *ws = NULL; + + nni_mtx_lock(&d->mtx); + if (nni_aio_result(caio) == 0) { + ws = nni_aio_get_output(caio, 0); + } + if ((uaio = nni_list_first(&d->aios)) == NULL) { + // The client stopped caring about this! + nng_stream_free(ws); + nni_mtx_unlock(&d->mtx); + return; + } + nni_aio_list_remove(uaio); + NNI_ASSERT(nni_list_empty(&d->aios)); + if ((rv = nni_aio_result(caio)) != 0) { + nni_aio_finish_error(uaio, rv); + } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { + nng_stream_free(ws); + nni_aio_finish_error(uaio, rv); + } else { + p->peer = d->peer; + + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); + } + nni_mtx_unlock(&d->mtx); +} + +static void +wstran_dialer_close(void *arg) +{ + ws_dialer *d = arg; + + nni_aio_close(d->connaio); + nng_stream_dialer_close(d->dialer); +} + +static void +wstran_listener_close(void *arg) +{ + ws_listener *l = arg; + + nni_aio_close(l->accaio); + nng_stream_listener_close(l->listener); +} + +static void +wstran_accept_cb(void *arg) +{ + ws_listener *l = arg; + nni_aio * aaio = l->accaio; + nni_aio * uaio; + int rv; + + nni_mtx_lock(&l->mtx); + uaio = nni_list_first(&l->aios); + if ((rv = nni_aio_result(aaio)) != 0) { + if (uaio != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + } else { + nng_stream *ws = nni_aio_get_output(aaio, 0); + if (uaio != NULL) { + ws_pipe *p; + // Make a pipe + nni_aio_list_remove(uaio); + if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { + nng_stream_close(ws); + nni_aio_finish_error(uaio, rv); + } else { + p->peer = l->peer; + + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); + } + } + } + if (!nni_list_empty(&l->aios)) { + nng_stream_listener_accept(l->listener, aaio); + } + nni_mtx_unlock(&l->mtx); +} + +static int +wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + ws_dialer *d; + nni_sock * s = nni_dialer_sock(ndialer); + int rv; + char name[64]; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + + nni_aio_list_init(&d->aios); + + d->peer = nni_sock_peer_id(s); + + snprintf( + name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s)); + + if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) || + ((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) || + ((rv = nng_stream_dialer_set_bool( + d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) || + ((rv = nng_stream_dialer_set_string( + d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) { + wstran_dialer_fini(d); + return (rv); + } + + *dp = d; + return (0); +} + +static int +wstran_listener_init(void **lp, nng_url *url, nni_listener *listener) +{ + ws_listener *l; + int rv; + nni_sock * s = nni_listener_sock(listener); + char name[64]; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + + nni_aio_list_init(&l->aios); + + l->peer = nni_sock_peer_id(s); + + snprintf( + name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_proto_name(s)); + + if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) || + ((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) || + ((rv = nng_stream_listener_set_bool( + l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) || + ((rv = nng_stream_listener_set_string( + l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) { + wstran_listener_fini(l); + return (rv); + } + *lp = l; + return (0); +} + +static int +wstran_init(void) +{ + return (0); +} + +static void +wstran_fini(void) +{ +} + +static const nni_option wstran_ep_opts[] = { + // terminate list + { + .o_name = NULL, + }, +}; + +static int +wstran_dialer_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + ws_dialer *d = arg; + int rv; + + rv = nni_stream_dialer_get(d->dialer, name, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(wstran_ep_opts, name, d, buf, szp, t); + } + return (rv); +} + +static int +wstran_dialer_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + ws_dialer *d = arg; + int rv; + + rv = nni_stream_dialer_set(d->dialer, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(wstran_ep_opts, name, d, buf, sz, t); + } + return (rv); +} + +static int +wstran_listener_get( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + ws_listener *l = arg; + int rv; + + rv = nni_stream_listener_get(l->listener, name, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(wstran_ep_opts, name, l, buf, szp, t); + } + return (rv); +} + +static int +wstran_listener_set( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + ws_listener *l = arg; + int rv; + + rv = nni_stream_listener_set(l->listener, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(wstran_ep_opts, name, l, buf, sz, t); + } + return (rv); +} + +static nni_tran_dialer_ops ws_dialer_ops = { + .d_init = wstran_dialer_init, + .d_fini = wstran_dialer_fini, + .d_connect = wstran_dialer_connect, + .d_close = wstran_dialer_close, + .d_setopt = wstran_dialer_setopt, + .d_getopt = wstran_dialer_getopt, +}; + +static nni_tran_listener_ops ws_listener_ops = { + .l_init = wstran_listener_init, + .l_fini = wstran_listener_fini, + .l_bind = ws_listener_bind, + .l_accept = wstran_listener_accept, + .l_close = wstran_listener_close, + .l_setopt = wstran_listener_set, + .l_getopt = wstran_listener_get, +}; + +static nni_tran ws_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ws", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +static nni_tran ws4_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ws4", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +static nni_tran ws6_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ws6", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +int +nng_ws_register(void) +{ + int rv; + if (((rv = nni_tran_register(&ws_tran)) != 0) || + ((rv = nni_tran_register(&ws4_tran)) != 0) || + ((rv = nni_tran_register(&ws6_tran)) != 0)) { + return (rv); + } + + return (0); +} + +#ifdef NNG_TRANSPORT_WSS + +static nni_tran wss_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "wss", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +static nni_tran wss4_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "wss4", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +static nni_tran wss6_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "wss6", + .tran_dialer = &ws_dialer_ops, + .tran_listener = &ws_listener_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = wstran_init, + .tran_fini = wstran_fini, +}; + +int +nng_wss_register(void) +{ + int rv; + if (((rv = nni_tran_register(&wss_tran)) != 0) || + ((rv = nni_tran_register(&wss4_tran)) != 0) || + ((rv = nni_tran_register(&wss6_tran)) != 0)) { + return (rv); + } + + return (0); +} + +#else + +int +nng_wss_register(void) +{ + return (0); +} + +#endif // NNG_TRANSPORT_WSS |
