aboutsummaryrefslogtreecommitdiff
path: root/src/transport/ws/websocket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-27 14:21:20 -0800
committerGarrett D'Amore <garrett@damore.org>2017-12-26 15:31:53 -0800
commit93db6fe3aaff421d61a15993ba6827b742ab00d1 (patch)
treed4d6372cb5d606ba9bcdb60b88b6271086940895 /src/transport/ws/websocket.c
parentc9bf5a76b0d6aead6ae91af71ada51a17881ac0a (diff)
downloadnng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.gz
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.bz2
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.zip
fixes #2 Websocket transport
This is a rather large changeset -- it fundamentally adds websocket transport, but as part of this changeset we added a generic framework for both HTTP and websocket. We also made some supporting changes to the core, such as changing the way timeouts work for AIOs and adding additional state keeping for AIOs, and adding a common framework for deferred finalization (to avoid certain kinds of circular deadlocks during resource cleanup). We also invented a new initialization framework so that we can avoid wiring in knowledge about them into the master initialization framework. The HTTP framework is not yet complete, but it is good enough for simple static serving and building additional services on top of -- including websocket. We expect both websocket and HTTP support to evolve considerably, and so these are not part of the public API yet. Property support for the websocket transport (in particular address properties) is still missing, as is support for TLS. The websocket transport here is a bit more robust than the original nanomsg implementation, as it supports multiple sockets listening at the same port sharing the same HTTP server instance, discriminating between them based on URI (and possibly the virtual host). Websocket is enabled by default at present, and work to conditionalize HTTP and websocket further (to minimize bloat) is still pending.
Diffstat (limited to 'src/transport/ws/websocket.c')
-rw-r--r--src/transport/ws/websocket.c533
1 files changed, 533 insertions, 0 deletions
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
new file mode 100644
index 00000000..8a73bcfb
--- /dev/null
+++ b/src/transport/ws/websocket.c
@@ -0,0 +1,533 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/websocket/websocket.h"
+
+typedef struct ws_ep ws_ep;
+typedef struct ws_pipe ws_pipe;
+
+struct ws_ep {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ char * addr;
+ uint16_t lproto; // local protocol
+ uint16_t rproto; // remote protocol
+ size_t rcvmax;
+ char * protoname;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_aio * connaio;
+ nni_aio * accaio;
+ nni_ws_listener *listener;
+ nni_ws_dialer * dialer;
+};
+
+struct ws_pipe {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_mtx mtx;
+ size_t rcvmax; // inherited from EP
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_ws * ws;
+};
+
+static void
+ws_pipe_send_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *taio;
+ nni_aio *uaio;
+
+ nni_mtx_lock(&p->mtx);
+ taio = p->txaio;
+ uaio = p->user_txaio;
+ p->user_txaio = NULL;
+
+ if (uaio != NULL) {
+ int rv;
+ if ((rv = nni_aio_result(taio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish(uaio, 0, 0);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *raio = p->rxaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&p->mtx);
+ uaio = p->user_rxaio;
+ p->user_rxaio = NULL;
+ if ((rv = nni_aio_result(raio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_msg *msg = nni_aio_get_msg(raio);
+ if (uaio != NULL) {
+ nni_aio_finish_msg(uaio, msg);
+ } else {
+ nni_msg_free(msg);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_rxaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_cancel(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_rxaio = aio;
+
+ nni_ws_recv_msg(p->ws, p->rxaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_txaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // This aborts the upper send, which will call back with an error
+ // when it is done.
+ nni_aio_cancel(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_txaio = aio;
+ nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+
+ nni_ws_send_msg(p->ws, p->txaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_fini(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+
+ if (p->ws) {
+ nni_ws_fini(p->ws);
+ }
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+static void
+ws_pipe_close(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ nni_ws_close(p->ws);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static int
+ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
+{
+ ws_pipe *p;
+ int rv;
+ nni_aio *aio;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ if (((rv = nni_aio_init(&p->txaio, ws_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, ws_pipe_recv_cb, p)) != 0)) {
+ ws_pipe_fini(p);
+ return (rv);
+ }
+
+ p->mode = ep->mode;
+ p->rcvmax = ep->rcvmax;
+ // p->addr = ep->addr;
+ p->rproto = ep->rproto;
+ p->lproto = ep->lproto;
+ p->ws = ws;
+
+ if ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_pipe(aio, p);
+ }
+
+ *pipep = p;
+ return (0);
+}
+
+static uint16_t
+ws_pipe_peer(void *arg)
+{
+ ws_pipe *p = arg;
+
+ return (p->rproto);
+}
+
+static void
+ws_pipe_start(void *arg, nni_aio *aio)
+{
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+// We have very different approaches for server and client.
+// Servers use the HTTP server framework, and a request methodology.
+
+static int
+ws_ep_bind(void *arg)
+{
+ ws_ep *ep = arg;
+ return (nni_ws_listener_listen(ep->listener));
+}
+
+static void
+ws_ep_cancel(nni_aio *aio, int rv)
+{
+ ws_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_accept(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+
+ // We already bound, so we just need to look for an available
+ // pipe (created by the handler), and match it.
+ // Otherwise we stick the AIO in the accept list.
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_list_append(&ep->aios, aio);
+ if (aio == nni_list_first(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, ep->accaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_connect(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+
+ // If we can't start, then its dying and we can't report
+ // either.
+ if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&ep->aios, aio);
+ nni_ws_dialer_dial(ep->dialer, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz)
+{
+ ws_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_size(v, sz, 0, NNI_MAXSZ));
+ }
+ return (nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ));
+}
+
+static int
+ws_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp)
+{
+ ws_ep *ep = arg;
+ return (nni_getopt_size(ep->rcvmax, v, szp));
+}
+
+static nni_tran_pipe_option ws_pipe_options[] = {
+#if 0
+ // clang-format off
+ { NNG_OPT_LOCADDR, ws_pipe_getopt_locaddr },
+ { NNG_OPT_REMADDR, ws_pipe_getopt_remaddr },
+ // clang-format on
+#endif
+ // terminate list
+ { NULL, NULL }
+};
+
+static nni_tran_pipe ws_pipe_ops = {
+ .p_fini = ws_pipe_fini,
+ .p_start = ws_pipe_start,
+ .p_send = ws_pipe_send,
+ .p_recv = ws_pipe_recv,
+ .p_close = ws_pipe_close,
+ .p_peer = ws_pipe_peer,
+ .p_options = ws_pipe_options,
+};
+
+static nni_tran_ep_option ws_ep_options[] = {
+ {
+ .eo_name = NNG_OPT_RECVMAXSZ,
+ .eo_getopt = ws_ep_getopt_recvmaxsz,
+ .eo_setopt = ws_ep_setopt_recvmaxsz,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
+static void
+ws_ep_fini(void *arg)
+{
+ ws_ep *ep = arg;
+
+ nni_aio_stop(ep->accaio);
+ nni_aio_stop(ep->connaio);
+ nni_aio_fini(ep->accaio);
+ nni_aio_fini(ep->connaio);
+ if (ep->listener != NULL) {
+ nni_ws_listener_fini(ep->listener);
+ }
+ if (ep->dialer != NULL) {
+ nni_ws_dialer_fini(ep->dialer);
+ }
+ nni_strfree(ep->addr);
+ nni_strfree(ep->protoname);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+static void
+ws_ep_conn_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ ws_pipe *p;
+ nni_aio *caio = ep->connaio;
+ nni_aio *uaio;
+ int rv;
+ nni_ws * ws = NULL;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_result(caio) == 0) {
+ ws = nni_aio_get_pipe(caio);
+ }
+ if ((uaio = nni_list_first(&ep->aios)) == NULL) {
+ // The client stopped caring about this!
+ if (ws != NULL) {
+ nni_ws_fini(ws);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(uaio);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+ if ((rv = nni_aio_result(caio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_fini(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_close(void *arg)
+{
+ ws_ep *ep = arg;
+
+ if (ep->mode == NNI_EP_MODE_LISTEN) {
+ nni_ws_listener_close(ep->listener);
+ } else {
+ nni_ws_dialer_close(ep->dialer);
+ }
+}
+
+static void
+ws_ep_acc_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ nni_aio *aaio = ep->accaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ uaio = nni_list_first(&ep->aios);
+ if ((rv = nni_aio_result(aaio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_ws *ws = nni_aio_get_pipe(aaio);
+ if (uaio != NULL) {
+ ws_pipe *p;
+ // Make a pipe
+ nni_aio_list_remove(uaio);
+ if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_close(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ }
+ }
+ if (!nni_list_empty(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, aaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ ws_ep * ep;
+ const char *pname;
+ int rv;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&ep->mtx);
+
+ // List of pipes (server only).
+ nni_aio_list_init(&ep->aios);
+
+ ep->mode = mode;
+ ep->lproto = nni_sock_proto(sock);
+ ep->rproto = nni_sock_peer(sock);
+
+ if (mode == NNI_EP_MODE_DIAL) {
+ pname = nni_sock_peer_name(sock);
+ rv = nni_ws_dialer_init(&ep->dialer, url);
+ } else {
+ pname = nni_sock_proto_name(sock);
+ rv = nni_ws_listener_init(&ep->listener, url);
+ }
+
+ if ((rv == 0) && ((ep->addr = nni_strdup(url)) == NULL)) {
+ rv = NNG_ENOMEM;
+ }
+ if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) !=
+ 0)) {
+ ws_ep_fini(ep);
+ return (rv);
+ }
+
+ *epp = ep;
+ return (0);
+}
+static int
+ws_tran_init(void)
+{
+ return (0);
+}
+
+static void
+ws_tran_fini(void)
+{
+}
+
+static nni_tran_ep ws_ep_ops = {
+ .ep_init = ws_ep_init,
+ .ep_fini = ws_ep_fini,
+ .ep_connect = ws_ep_connect,
+ .ep_bind = ws_ep_bind,
+ .ep_accept = ws_ep_accept,
+ .ep_close = ws_ep_close,
+ .ep_options = ws_ep_options,
+};
+
+static nni_tran ws_tran = {
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ws",
+ .tran_ep = &ws_ep_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
+};
+
+int
+nng_ws_register(void)
+{
+ return (nni_tran_register(&ws_tran));
+}