aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/websocket')
-rw-r--r--src/supplemental/websocket/CMakeLists.txt14
-rw-r--r--src/supplemental/websocket/websocket.c1935
-rw-r--r--src/supplemental/websocket/websocket.h63
3 files changed, 2012 insertions, 0 deletions
diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt
new file mode 100644
index 00000000..f3283257
--- /dev/null
+++ b/src/supplemental/websocket/CMakeLists.txt
@@ -0,0 +1,14 @@
+#
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+set(WEBSOCKET_SOURCES
+ supplemental/websocket/websocket.c
+ supplemental/websocket/websocket.h)
+set(NNG_SOURCES ${NNG_SOURCES} ${WEBSOCKET_SOURCES} PARENT_SCOPE)
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
new file mode 100644
index 00000000..fe0a9bd9
--- /dev/null
+++ b/src/supplemental/websocket/websocket.c
@@ -0,0 +1,1935 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/base64/base64.h"
+#include "supplemental/http/http.h"
+#include "supplemental/sha1/sha1.h"
+
+#include "websocket.h"
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct ws_frame ws_frame;
+typedef struct ws_msg ws_msg;
+
+struct nni_ws {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_list_node node;
+ nni_reap_item reap;
+ bool closed;
+ bool ready;
+ bool wclose;
+ nni_mtx mtx;
+ nni_list txmsgs;
+ nni_list rxmsgs;
+ ws_frame * txframe;
+ ws_frame * rxframe;
+ nni_aio * txaio; // physical aios
+ nni_aio * rxaio;
+ nni_aio * closeaio;
+ nni_aio * httpaio; // server only, HTTP reply pending
+ nni_http * http;
+ nni_http_req *req;
+ nni_http_res *res;
+ size_t maxframe;
+ size_t fragsize;
+};
+
+struct nni_ws_listener {
+ nni_tls_config * tls;
+ nni_http_server * server;
+ char * proto;
+ char * url;
+ char * host;
+ char * serv;
+ char * path;
+ nni_mtx mtx;
+ nni_list pend;
+ nni_list reply;
+ nni_list aios;
+ bool started;
+ bool closed;
+ void * hp; // handler pointer
+ nni_http_handler handler;
+ nni_ws_listen_hook hookfn;
+ void * hookarg;
+};
+
+// The dialer tracks user aios in two lists. The first list is for aios
+// waiting for the http connection to be established, while the second
+// are waiting for the HTTP negotiation to complete. We keep two lists
+// so we know whether to initiate another outgoing connection after the
+// completion of an earlier connection. (We don't want to establish
+// requests when we already have connects negotiating.)
+struct nni_ws_dialer {
+ nni_tls_config * tls;
+ nni_http_req * req;
+ nni_http_res * res;
+ nni_http_client *client;
+ nni_mtx mtx;
+ nni_aio * conaio;
+ char * proto;
+ char * host;
+ char * serv;
+ char * path;
+ char * qinfo;
+ char * addr; // full address (a URL really)
+ char * uri; // path + query
+ nni_list conaios; // user aios waiting for connect.
+ nni_list httpaios; // user aios waiting for HTTP nego.
+ bool started;
+ bool closed;
+ nng_sockaddr sa;
+};
+
+typedef enum ws_type {
+ WS_CONT = 0x0,
+ WS_TEXT = 0x1,
+ WS_BINARY = 0x2,
+ WS_CLOSE = 0x8,
+ WS_PING = 0x9,
+ WS_PONG = 0xA,
+} ws_type;
+
+typedef enum ws_reason {
+ WS_CLOSE_NORMAL_CLOSE = 1000,
+ WS_CLOSE_GOING_AWAY = 1001,
+ WS_CLOSE_PROTOCOL_ERR = 1002,
+ WS_CLOSE_UNSUPP_FORMAT = 1003,
+ WS_CLOSE_INVALID_DATA = 1007,
+ WS_CLOSE_POLICY = 1008,
+ WS_CLOSE_TOO_BIG = 1009,
+ WS_CLOSE_NO_EXTENSION = 1010,
+ WS_CLOSE_INTERNAL = 1011,
+} ws_reason;
+
+struct ws_frame {
+ nni_list_node node;
+ uint8_t head[14]; // maximum header size
+ uint8_t mask[4]; // read by server, sent by client
+ uint8_t sdata[125]; // short data (for short frames only)
+ size_t hlen; // header length
+ size_t len; // payload length
+ enum ws_type op;
+ bool final;
+ bool masked;
+ size_t bufsz; // allocated size
+ uint8_t * buf;
+ ws_msg * wmsg;
+};
+
+struct ws_msg {
+ nni_list frames;
+ nni_list_node node;
+ nni_ws * ws;
+ nni_msg * msg;
+ nni_aio * aio;
+};
+
+static void ws_send_close(nni_ws *ws, uint16_t code);
+
+// This looks, case independently for a word in a list, which is either
+// space or comma separated.
+static bool
+ws_contains_word(const char *phrase, const char *word)
+{
+ size_t len = strlen(word);
+
+ while ((phrase != NULL) && (*phrase != '\0')) {
+ if ((nni_strncasecmp(phrase, word, len) == 0) &&
+ ((phrase[len] == 0) || (phrase[len] == ' ') ||
+ (phrase[len] == ','))) {
+ return (true);
+ }
+ // Skip to next word.
+ if ((phrase = strchr(phrase, ' ')) != NULL) {
+ while ((*phrase == ' ') || (*phrase == ',')) {
+ phrase++;
+ }
+ }
+ }
+ return (false);
+}
+
+// input is base64 challenge, output is the accepted. input should be
+// 24 character base 64, output is 28 character base64 reply. (output
+// must be large enough to hold 29 bytes to allow for termination.)
+// Returns 0 on success, NNG_EINVAL if the input is malformed somehow.
+static int
+ws_make_accept(const char *key, char *accept)
+{
+ uint8_t rawkey[16];
+ uint8_t digest[20];
+ nni_sha1_ctx ctx;
+
+#define WS_KEY_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+#define WS_KEY_GUIDLEN 36
+
+ if ((strlen(key) != 24) ||
+ (nni_base64_decode(key, 24, rawkey, 16) != 16)) {
+ return (NNG_EINVAL);
+ }
+
+ nni_sha1_init(&ctx);
+ nni_sha1_update(&ctx, rawkey, 16);
+ nni_sha1_update(&ctx, (uint8_t *) WS_KEY_GUID, WS_KEY_GUIDLEN);
+ nni_sha1_final(&ctx, digest);
+
+ nni_base64_encode(digest, 20, accept, 28);
+ accept[28] = '\0';
+ return (0);
+}
+
+static void
+ws_frame_fini(ws_frame *frame)
+{
+ if (frame->bufsz) {
+ nni_free(frame->buf, frame->bufsz);
+ }
+ NNI_FREE_STRUCT(frame);
+}
+
+static void
+ws_msg_fini(ws_msg *wm)
+{
+ ws_frame *frame;
+
+ NNI_ASSERT(!nni_list_node_active(&wm->node));
+ while ((frame = nni_list_first(&wm->frames)) != NULL) {
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ }
+
+ if (wm->msg != NULL) {
+ nni_msg_free(wm->msg);
+ }
+ NNI_FREE_STRUCT(wm);
+}
+
+static void
+ws_mask_frame(ws_frame *frame)
+{
+ uint32_t r;
+ // frames sent by client need mask.
+ if (frame->masked) {
+ return;
+ }
+ r = nni_random();
+ NNI_PUT32(frame->mask, r);
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ memcpy(frame->head + frame->hlen, frame->mask, 4);
+ frame->hlen += 4;
+ frame->head[1] |= 0x80; // set masked bit
+ frame->masked = true;
+}
+
+static void
+ws_unmask_frame(ws_frame *frame)
+{
+ // frames sent by client need mask.
+ if (!frame->masked) {
+ return;
+ }
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ frame->hlen -= 4;
+ frame->head[1] &= 0x7f; // clear masked bit
+ frame->masked = false;
+}
+
+static int
+ws_msg_init_control(
+ ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
+{
+ ws_msg * wm;
+ ws_frame *frame;
+
+ if (len > 125) {
+ return (NNG_EINVAL);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ memcpy(frame->sdata, buf, len);
+
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len;
+ frame->final = true;
+ frame->op = op;
+ frame->head[0] = op | 0x80; // final frame (control)
+ frame->head[1] = len & 0x7F;
+ frame->hlen = 2;
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ wm->aio = NULL;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
+{
+ ws_msg * wm;
+ size_t len;
+ size_t maxfrag = ws->fragsize; // make this tunable. (1MB default)
+ uint8_t *buf;
+ uint8_t op;
+
+ // If the message has a header, move it to front of body. Most of
+ // the time this will not cause a reallocation (there should be
+ // headroom). Doing this simplifies our framing, and avoids sending
+ // tiny frames for headers.
+ if ((len = nni_msg_header_len(msg)) != 0) {
+ int rv;
+ buf = nni_msg_header(msg);
+ if ((rv = nni_msg_insert(msg, buf, len)) != 0) {
+ return (rv);
+ }
+ nni_msg_header_clear(msg);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+
+ len = nni_msg_len(msg);
+ buf = nni_msg_body(msg);
+ op = WS_BINARY; // to start -- no support for sending TEXT frames
+
+ // do ... while because we want at least one frame (even for empty
+ // messages.) Headers get their own frame, if present. Best bet
+ // is to try not to have a header when coming here.
+ do {
+ ws_frame *frame;
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len > maxfrag ? maxfrag : len;
+ frame->buf = buf;
+ frame->op = op;
+
+ buf += frame->len;
+ len -= frame->len;
+ op = WS_CONT;
+
+ if (len == 0) {
+ frame->final = true;
+ }
+ frame->head[0] = frame->op;
+ frame->hlen = 2;
+ if (frame->final) {
+ frame->head[0] |= 0x80; // final frame bit
+ }
+ if (frame->len < 126) {
+ frame->head[1] = frame->len & 0x7f;
+ } else if (frame->len < 65536) {
+ frame->head[1] = 126;
+ NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
+ frame->hlen += 2;
+ } else {
+ frame->head[1] = 127;
+ NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
+ frame->hlen += 8;
+ }
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ } while (len);
+
+ wm->msg = msg;
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static void
+ws_close_cb(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ // Either we sent a close frame, or we didn't. Either way,
+ // we are done, and its time to abort everything else.
+ nni_mtx_lock(&ws->mtx);
+
+ nni_http_close(ws->http);
+ nni_aio_cancel(ws->txaio, NNG_ECLOSED);
+ nni_aio_cancel(ws->rxaio, NNG_ECLOSED);
+
+ // This list (receive) should be empty.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe != NULL) {
+ ws_frame_fini(ws->rxframe);
+ ws->rxframe = NULL;
+ }
+
+ // Any txframe should have been killed with its wmsg.
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg *wm;
+
+ // Receive stuff gets aborted always. No further receives
+ // once we get a close.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ // If were closing "gracefully", then don't abort in-flight
+ // stuff yet. Note that reads should have stopped already.
+ if (!ws->closed) {
+ ws_send_close(ws, code);
+ return;
+ }
+}
+
+static void
+ws_start_write(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+
+ if ((ws->txframe != NULL) || (!ws->ready)) {
+ return; // busy
+ }
+
+ if ((wm = nni_list_first(&ws->txmsgs)) == NULL) {
+ // Nothing to send.
+ return;
+ }
+
+ frame = nni_list_first(&wm->frames);
+ NNI_ASSERT(frame != NULL);
+
+ // Push it out.
+ ws->txframe = frame;
+ ws->txaio->a_niov = frame->len > 0 ? 2 : 1;
+ ws->txaio->a_iov[0].iov_len = frame->hlen;
+ ws->txaio->a_iov[0].iov_buf = frame->head;
+ if (frame->len > 0) {
+ ws->txaio->a_iov[1].iov_len = frame->len;
+ ws->txaio->a_iov[1].iov_buf = frame->buf;
+ }
+ nni_http_write_full(ws->http, ws->txaio);
+}
+
+static void
+ws_cancel_close(nni_aio *aio, int rv)
+{
+ nni_ws *ws = aio->a_prov_data;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+
+ if (ws->txframe->op == WS_CLOSE) {
+ // If this was a close frame, we are done.
+ // No other messages may succeed..
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_set_msg(wm->aio, NULL);
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish(ws->closeaio, 0, 0);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ frame = ws->txframe;
+ wm = frame->wmsg;
+ aio = wm->aio;
+
+ if ((rv = nni_aio_result(ws->txaio)) != 0) {
+
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish_error(aio, rv);
+ }
+
+ ws->closed = true;
+ nni_http_close(ws->http);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // good frame, was it the last
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ if (nni_list_empty(&wm->frames)) {
+ nni_list_remove(&ws->txmsgs, wm);
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ // Write the next frame.
+ ws_start_write(ws);
+
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cancel(nni_aio *aio, int rv)
+{
+ nni_ws * ws;
+ ws_msg * wm;
+ ws_frame *frame;
+
+ // Is this aio active? We can tell by looking at the
+ // active tx frame.
+ wm = aio->a_prov_data;
+ ws = wm->ws;
+ nni_mtx_lock(&ws->mtx);
+ if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
+ nni_aio_cancel(ws->txaio, rv);
+ // We will wait for callback on the txaio to finish aio.
+ } else if (nni_list_active(&ws->txmsgs, wm)) {
+ // If scheduled, just need to remove node and complete it.
+ nni_list_remove(&ws->txmsgs, wm);
+ wm->aio = NULL;
+ nni_aio_finish_error(aio, rv);
+ ws_msg_fini(wm);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_send_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg * wm;
+ uint8_t buf[sizeof(uint16_t)];
+ int rv;
+ nni_aio *aio;
+
+ NNI_PUT16(buf, code);
+
+ if (ws->closed) {
+ return;
+ }
+ ws->closed = true;
+ aio = ws->closeaio;
+
+ // We don't care about cancellation here. If this times out,
+ // we will still shut all the physical I/O down in the callback.
+ if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ return;
+ }
+ ws->wclose = true;
+ rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf));
+ if (rv != 0) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ // Close frames get priority!
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+}
+
+static void
+ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len)
+{
+ ws_msg *wm;
+
+ // Note that we do not care if this works or not. So no AIO needed.
+
+ nni_mtx_lock(&ws->mtx);
+ if ((ws->closed) ||
+ (ws_msg_init_control(&wm, ws, op, buf, sizeof(buf)) != 0)) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Control frames at head of list. (Note that this may preempt
+ // the close frame or other ping/pong requests. Oh well.)
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg * wm;
+ nni_msg *msg;
+ int rv;
+
+ msg = nni_aio_get_msg(aio);
+
+ if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
+ return;
+ }
+
+ nni_mtx_lock(&ws->mtx);
+ nni_aio_set_msg(aio, NULL);
+
+ if (ws->closed) {
+ ws_msg_fini(wm);
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_write_cancel, wm) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ ws_msg_fini(wm);
+ return;
+ }
+ nni_list_append(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_start_read(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+
+ if ((ws->rxframe != NULL) || ws->closed) {
+ return; // already reading or closed
+ }
+
+ if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) {
+ return; // no body expecting a message.
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_finish_error(wm->aio, NNG_ENOMEM);
+ }
+ ws_msg_fini(wm);
+ // XXX: NOW WHAT?
+ return;
+ }
+
+ // Note that the frame is *not* associated with the message
+ // as yet, because we don't know if that's right until we receive it.
+ frame->hlen = 0;
+ frame->len = 0;
+ ws->rxframe = frame;
+
+ aio = ws->rxaio;
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = 2; // We want the first two bytes.
+ aio->a_iov[0].iov_buf = frame->head;
+ nni_http_read_full(ws->http, aio);
+}
+
+static void
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+{
+ ws_msg *wm = nni_list_first(&ws->rxmsgs);
+
+ switch (frame->op) {
+ case WS_CONT:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_BINARY:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (!nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_TEXT:
+ // No support for text mode at present.
+ ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
+ return;
+
+ case WS_PING:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws_send_control(ws, WS_PONG, frame->buf, frame->len);
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_PONG:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_CLOSE:
+ ws->closed = true; // no need to send close reply
+ ws_close(ws, 0);
+ return;
+ default:
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+
+ // If this was the last (final) frame, then complete it. But
+ // we have to look at the msg, since we might have got a
+ // control frame.
+ if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
+ size_t len = 0;
+ nni_msg *msg;
+ uint8_t *body;
+ int rv;
+
+ nni_list_remove(&ws->rxmsgs, wm);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ len += frame->len;
+ }
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(wm->aio, rv);
+ ws_msg_fini(wm);
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ }
+ nni_aio_finish_msg(wm->aio, msg);
+ wm->aio = NULL;
+ ws_msg_fini(wm);
+ }
+}
+
+static void
+ws_read_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ nni_aio * aio = ws->rxaio;
+ ws_frame *frame;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+ if ((frame = ws->rxframe) == NULL) {
+ nni_mtx_unlock(&ws->mtx); // canceled during close
+ return;
+ }
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ ws->closed = true; // do not send a close frame
+ ws_close(ws, 0);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ if (frame->hlen == 0) {
+ frame->hlen = 2;
+ frame->op = frame->head[0] & 0x7f;
+ frame->final = (frame->head[0] & 0x80) ? 1 : 0;
+ frame->masked = (frame->head[1] & 0x80) ? 1 : 0;
+ if (frame->masked) {
+ frame->hlen += 4;
+ }
+ if ((frame->head[1] & 0x7F) == 127) {
+ frame->hlen += 8;
+ } else if ((frame->head[1] & 0x7F) == 126) {
+ frame->hlen += 2;
+ }
+
+ // If we didn't read the full header yet, then read
+ // the rest of it.
+ if (frame->hlen != 2) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->head + 2;
+ aio->a_iov[0].iov_len = frame->hlen - 2;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // If we are returning from a read of additional data, then
+ // the buf will be set. Otherwise we need to determine
+ // how much data to read. As our headers are complete, we take
+ // this time to do some protocol checks -- no point in waiting
+ // to read data. (Frame size check needs to be done first
+ // anyway to prevent DoS.)
+
+ if (frame->buf == NULL) {
+
+ // Determine expected frame size.
+ switch ((frame->len = (frame->head[1] & 0x7F))) {
+ case 127:
+ NNI_GET64(frame->head + 2, frame->len);
+ if (frame->len < 65536) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ break;
+ case 126:
+ NNI_GET16(frame->head + 2, frame->len);
+ if (frame->len < 126) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ break;
+ }
+
+ if (frame->len > ws->maxframe) {
+ ws_close(ws, WS_CLOSE_TOO_BIG);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Check for masking. (We don't actually do the unmask
+ // here, because we don't have data yet.)
+ if (frame->masked) {
+ memcpy(frame->mask, frame->head + frame->hlen - 4, 4);
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ } else if (ws->mode == NNI_EP_MODE_LISTEN) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // If we expected data, then ask for it.
+ if (frame->len != 0) {
+
+ // Short frames can avoid an alloc
+ if (frame->len < 126) {
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+ } else {
+ frame->buf = nni_alloc(frame->len);
+ if (frame->buf == NULL) {
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ frame->bufsz = frame->len;
+ }
+
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->buf;
+ aio->a_iov[0].iov_len = frame->len;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // At this point, we have a complete frame.
+ ws_unmask_frame(frame); // idempotent
+
+ ws_read_frame_cb(ws, frame);
+ ws_start_read(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_read_cancel(nni_aio *aio, int rv)
+{
+ ws_msg *wm = aio->a_prov_data;
+ nni_ws *ws = wm->ws;
+
+ nni_mtx_lock(&ws->mtx);
+ if (wm == nni_list_first(&ws->rxmsgs)) {
+ // Cancellation will percolate back up.
+ nni_aio_cancel(ws->rxaio, rv);
+ } else if (nni_list_active(&ws->rxmsgs, wm)) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ ws_msg_fini(wm);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+ int rv;
+ nni_mtx_lock(&ws->mtx);
+ if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL)) {
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_read_cancel, wm) == 0) {
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close_error(nni_ws *ws, uint16_t code)
+{
+ nni_mtx_lock(&ws->mtx);
+ ws_close(ws, code);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close(nni_ws *ws)
+{
+ nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
+}
+
+nni_http_res *
+nni_ws_response(nni_ws *ws)
+{
+ return (ws->res);
+}
+
+nni_http_req *
+nni_ws_request(nni_ws *ws)
+{
+ return (ws->req);
+}
+
+static void
+ws_fini(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ nni_ws_close(ws);
+
+ // Give a chance for the close frame to drain.
+ if (ws->closeaio) {
+ nni_aio_wait(ws->closeaio);
+ }
+
+ nni_aio_stop(ws->rxaio);
+ nni_aio_stop(ws->txaio);
+ nni_aio_stop(ws->closeaio);
+ nni_aio_stop(ws->httpaio);
+
+ nni_mtx_lock(&ws->mtx);
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe) {
+ ws_frame_fini(ws->rxframe);
+ }
+ nni_mtx_unlock(&ws->mtx);
+
+ if (ws->req) {
+ nni_http_req_fini(ws->req);
+ }
+ if (ws->res) {
+ nni_http_res_fini(ws->res);
+ }
+
+ nni_http_fini(ws->http);
+ nni_aio_fini(ws->rxaio);
+ nni_aio_fini(ws->txaio);
+ nni_aio_fini(ws->closeaio);
+ nni_aio_fini(ws->httpaio);
+ nni_mtx_fini(&ws->mtx);
+ NNI_FREE_STRUCT(ws);
+}
+
+void
+nni_ws_fini(nni_ws *ws)
+{
+ nni_reap(&ws->reap, ws_fini, ws);
+}
+
+static void
+ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
+{
+ // This is only
+ nni_ws_listener *l;
+ l = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&l->mtx);
+ nni_list_remove(&l->reply, ws);
+ if (nni_aio_result(aio) != 0) {
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ ws->ready = true;
+ if ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->pend, ws);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
+{
+ nni_ws_dialer *d;
+ nni_aio * uaio;
+ int rv;
+ uint16_t status;
+ char wskey[29];
+ const char * ptr;
+
+ d = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->httpaios);
+ NNI_ASSERT(uaio != NULL);
+ // We have two steps. In step 1, we just sent the request,
+ // and need to retrieve the reply. In step two we have
+ // received the reply, and need to validate it.
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto err;
+ }
+
+ // If we have no response structure, then this was completion of the
+ // send of the request. Prepare an empty response, and read it.
+ if (ws->res == NULL) {
+ if ((rv = nni_http_res_init(&ws->res)) != 0) {
+ goto err;
+ }
+ nni_http_read_res(ws->http, ws->res, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ status = nni_http_res_get_status(ws->res);
+ switch (status) {
+ case NNI_HTTP_STATUS_SWITCHING:
+ break;
+ case NNI_HTTP_STATUS_FORBIDDEN:
+ case NNI_HTTP_STATUS_UNAUTHORIZED:
+ rv = NNG_EPERM;
+ goto err;
+ case NNI_HTTP_STATUS_NOT_FOUND:
+ case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ rv = NNG_ECONNREFUSED; // Treat these as refusals.
+ goto err;
+ case NNI_HTTP_STATUS_BAD_REQUEST:
+ default:
+ // Perhaps we should use NNG_ETRANERR...
+ rv = NNG_EPROTO;
+ goto err;
+ }
+
+ // Check that the server gave us back the right key.
+ rv = ws_make_accept(
+ nni_http_req_get_header(ws->req, "Sec-WebSocket-Key"), wskey);
+ if (rv != 0) {
+ goto err;
+ }
+
+#define GETH(h) nni_http_res_get_header(ws->res, h)
+
+ if (((ptr = GETH("Sec-WebSocket-Accept")) == NULL) ||
+ (strcmp(ptr, wskey) != 0) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Upgrade")) == NULL) ||
+ (strcmp(ptr, "websocket") != 0)) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ if (d->proto != NULL) {
+ if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) ||
+ (!ws_contains_word(d->proto, ptr))) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ }
+#undef GETH
+
+ // At this point, we are in business!
+ ws->ready = true;
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_pipe(uaio, ws);
+ nni_mtx_unlock(&d->mtx);
+ return;
+err:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&d->mtx);
+}
+
+static void
+ws_http_cb(void *arg)
+{
+ // This is only done on the server/listener side.
+ nni_ws * ws = arg;
+ nni_aio *aio = ws->httpaio;
+
+ switch (ws->mode) {
+ case NNI_EP_MODE_LISTEN:
+ ws_http_cb_listener(ws, aio);
+ break;
+ case NNI_EP_MODE_DIAL:
+ ws_http_cb_dialer(ws, aio);
+ break;
+ }
+}
+
+static int
+ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
+{
+ nni_ws *ws;
+ int rv;
+
+ if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ws->mtx);
+ NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
+ NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+
+ if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) {
+ nni_ws_fini(ws);
+ return (rv);
+ }
+
+ nni_aio_set_timeout(ws->closeaio, 100);
+ nni_aio_set_timeout(ws->httpaio, 1000);
+
+ ws->fragsize = 1 << 20; // we won't send a frame larger than this
+ ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size
+ ws->http = http;
+ ws->req = req;
+ ws->res = res;
+ *wsp = ws;
+ return (0);
+}
+
+void
+nni_ws_listener_fini(nni_ws_listener *l)
+{
+ nni_mtx_fini(&l->mtx);
+ nni_strfree(l->url);
+ nni_strfree(l->proto);
+ nni_strfree(l->host);
+ nni_strfree(l->serv);
+ nni_strfree(l->path);
+ NNI_FREE_STRUCT(l);
+}
+
+static void
+ws_handler(nni_aio *aio)
+{
+ nni_ws_listener *l;
+ nni_ws * ws;
+ nni_http * http;
+ nni_http_req * req;
+ nni_http_res * res;
+ const char * ptr;
+ const char * proto;
+ uint16_t status;
+ int rv;
+ char key[29];
+
+ http = nni_aio_get_input(aio, 0);
+ req = nni_aio_get_input(aio, 1);
+ l = nni_aio_get_input(aio, 2);
+
+ // Now check the headers, etc.
+ if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) {
+ status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP;
+ goto err;
+ }
+
+ if (strcmp(nni_http_req_get_method(req), "GET") != 0) {
+ // HEAD request. We can't really deal with it.
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+#define GETH(h) nni_http_req_get_header(req, h)
+#define SETH(h, v) nni_http_res_set_header(res, h, v)
+
+ if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) ||
+ (((ptr = GETH("Transfer-Encoding")) != NULL) &&
+ (nni_strcasestr(ptr, "chunked") != NULL))) {
+ status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE;
+ goto err;
+ }
+
+ // These headers have to be present.
+ if (((ptr = GETH("Upgrade")) == NULL) ||
+ (!ws_contains_word(ptr, "websocket")) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Sec-WebSocket-Version")) == NULL) ||
+ (strcmp(ptr, "13") != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) ||
+ (ws_make_accept(ptr, key) != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ // If the client has requested a specific subprotocol, then
+ // we need to try to match it to what the handler says we support.
+ // (If no suitable option is found in the handler, we fail the
+ // request.)
+ proto = GETH("Sec-WebSocket-Protocol");
+ if (proto == NULL) {
+ if (l->proto != NULL) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+ } else if ((l->proto == NULL) ||
+ (!ws_contains_word(l->proto, proto))) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if ((rv = nni_http_res_init(&res)) != 0) {
+ // Give a chance to reply to client.
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if (nni_http_res_set_status(
+ res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if ((SETH("Connection", "Upgrade") != 0) ||
+ (SETH("Upgrade", "websocket") != 0) ||
+ (SETH("Sec-WebSocket-Accept", key) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+ if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+
+ if (l->hookfn != NULL) {
+ rv = l->hookfn(l->hookarg, req, res);
+ if (rv != 0) {
+ nni_http_res_fini(res);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (nni_http_res_get_status(res) !=
+ NNI_HTTP_STATUS_SWITCHING) {
+ // The hook has decided to give back a different
+ // reply and we are not upgrading anymore. For
+ // example the Origin might not be permitted, or
+ // another level of authentication may be required.
+ // (Note that the hook can also give back various
+ // other headers, but it would be bad for it to
+ // alter the websocket mandated headers.)
+ nni_http_req_fini(req);
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ return;
+ }
+ }
+
+#undef GETH
+#undef SETH
+
+ // We are good to go, provided we can get the websocket struct,
+ // and send the reply.
+ if ((rv = ws_init(&ws, http, req, res)) != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_LISTEN;
+
+ // XXX: Inherit fragmentation and message size limits!
+
+ nni_list_append(&l->reply, ws);
+ nni_aio_set_data(ws->httpaio, 0, l);
+ nni_http_write_res(http, res, ws->httpaio);
+ nni_aio_set_output(aio, 0, NULL);
+ nni_aio_set_input(aio, 1, NULL);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+err:
+ nni_http_req_fini(req);
+ if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+static int
+ws_parse_url(const char *url, char **schemep, char **hostp, char **servp,
+ char **pathp, char **queryp)
+{
+ size_t scrlen;
+ char * scr;
+ char * pair;
+ char * scheme = NULL;
+ char * path = NULL;
+ char * query = NULL;
+ char * host = NULL;
+ char * serv = NULL;
+ int rv;
+
+ // We need a scratch copy of the url to parse.
+ scrlen = strlen(url) + 1;
+ if ((scr = nni_alloc(scrlen)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_strlcpy(scr, url, scrlen);
+ scheme = scr;
+ pair = strchr(scr, ':');
+ if ((pair == NULL) || (pair[1] != '/') || (pair[2] != '/')) {
+ nni_free(scr, scrlen);
+ return (NNG_EADDRINVAL);
+ }
+
+ *pair = '\0';
+ pair += 3;
+
+ path = strchr(pair, '/');
+ if (path != NULL) {
+ *path = '\0'; // We will restore it shortly.
+ }
+ if ((rv = nni_tran_parse_host_port(pair, hostp, servp)) != 0) {
+ nni_free(scr, scrlen);
+ return (rv);
+ }
+
+ // If service was missing, assume normal defaults.
+ if (*servp == NULL) {
+ if (strcmp(scheme, "wss")) {
+ *servp = nni_strdup("443");
+ } else {
+ *servp = nni_strdup("80");
+ }
+ }
+
+ if (path) {
+ // Restore the path, and trim off the query parameter.
+ *path = '/';
+ if ((query = strchr(path, '?')) != NULL) {
+ *query = '\0';
+ query++;
+ } else {
+ query = "";
+ }
+ } else {
+ path = "/";
+ query = "";
+ }
+
+ if (schemep) {
+ *schemep = nni_strdup(scheme);
+ }
+ if (pathp) {
+ *pathp = nni_strdup(path);
+ }
+ if (queryp) {
+ *queryp = nni_strdup(query);
+ }
+ nni_free(scr, scrlen);
+
+ if ((schemep && (*schemep == NULL)) || (*pathp == NULL) ||
+ (*servp == NULL) || (queryp && (*queryp == NULL))) {
+ nni_strfree(*hostp);
+ nni_strfree(*servp);
+ nni_strfree(*pathp);
+ if (schemep) {
+ nni_strfree(*schemep);
+ }
+ if (queryp) {
+ nni_strfree(*queryp);
+ }
+ return (NNG_ENOMEM);
+ }
+
+ return (0);
+}
+
+int
+nni_ws_listener_init(nni_ws_listener **wslp, const char *url)
+{
+ nni_ws_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->aios);
+
+ NNI_LIST_INIT(&l->pend, nni_ws, node);
+ NNI_LIST_INIT(&l->reply, nni_ws, node);
+
+ rv = ws_parse_url(url, NULL, &l->host, &l->serv, &l->path, NULL);
+ if (rv != 0) {
+ nni_ws_listener_fini(l);
+ return (rv);
+ }
+ l->handler.h_is_dir = false;
+ l->handler.h_is_upgrader = true;
+ l->handler.h_method = "GET";
+ l->handler.h_path = l->path;
+ l->handler.h_host = l->host;
+ l->handler.h_cb = ws_handler;
+
+ *wslp = l;
+ return (0);
+}
+
+int
+nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ rv = NNG_EBUSY;
+ } else if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (l->proto != NULL) {
+ nni_strfree(l->proto);
+ }
+ l->proto = ns;
+ }
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static void
+ws_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_listener *l = aio->a_prov_data;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
+{
+ nni_ws *ws;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (!l->started) {
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if ((ws = nni_list_first(&l->pend)) != NULL) {
+ nni_list_remove(&l->pend, ws);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->aios, aio);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_close(nni_ws_listener *l)
+{
+ nni_aio *aio;
+ nni_ws * ws;
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ l->closed = true;
+ if (l->server != NULL) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+ NNI_LIST_FOREACH (&l->pend, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ NNI_LIST_FOREACH (&l->reply, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ws_listener_listen(nni_ws_listener *l)
+{
+ nng_sockaddr sa;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv(l->host, l->serv, NNG_AF_UNSPEC, true, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ if ((rv = nni_http_server_init(&l->server, &sa)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ rv = nni_http_server_add_handler(&l->hp, l->server, &l->handler, l);
+ if (rv != 0) {
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ // XXX: DEAL WITH HTTPS here.
+
+ if ((rv = nni_http_server_start(l->server)) != 0) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+
+ l->started = true;
+
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+void
+nni_ws_listener_hook(
+ nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
+{
+ nni_mtx_lock(&l->mtx);
+ l->hookfn = hookfn;
+ l->hookarg = hookarg;
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_tls(nni_ws_listener *l, nni_tls_config *tls)
+{
+ // We need to add this later.
+}
+
+void
+ws_conn_cb(void *arg)
+{
+ nni_ws_dialer *d = arg;
+ nni_aio * aio = d->conaio;
+ nni_aio * uaio;
+ nni_http * http;
+ nni_http_req * req = NULL;
+ int rv;
+ uint8_t raw[16];
+ char wskey[25];
+ nni_ws * ws;
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->conaios);
+ rv = nni_aio_result(aio);
+ http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL;
+
+ if (uaio == NULL) {
+ if (http) {
+ // Nobody listening anymore - hard abort.
+ nni_http_fini(http);
+ }
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ nni_aio_list_remove(uaio);
+ nni_aio_set_output(aio, 0, NULL);
+
+ // We are done with this aio, start another connection request while
+ // we finish up, if we have more clients waiting.
+ if (!nni_list_empty(&d->conaios)) {
+ nni_http_client_connect(d->client, aio);
+ }
+
+ if (rv != 0) {
+ goto err;
+ }
+
+ for (int i = 0; i < 16; i++) {
+ raw[i] = nni_random();
+ }
+ nni_base64_encode(raw, 16, wskey, 24);
+ wskey[24] = '\0';
+
+ if (d->qinfo && d->qinfo[0] != '\0') {
+ rv = nni_asprintf(&d->uri, "%s?%s", d->path, d->qinfo);
+ } else if ((d->uri = nni_strdup(d->path)) == NULL) {
+ rv = NNG_ENOMEM;
+ }
+
+#define SETH(h, v) nni_http_req_set_header(req, h, v)
+ if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) ||
+ ((rv = nni_http_req_set_uri(req, d->uri)) != 0) ||
+ ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) ||
+ ((rv = nni_http_req_set_method(req, "GET")) != 0) ||
+ ((rv = SETH("Host", d->host)) != 0) ||
+ ((rv = SETH("Upgrade", "websocket")) != 0) ||
+ ((rv = SETH("Connection", "Upgrade")) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Version", "13")) != 0)) {
+ goto err;
+ }
+
+ // If consumer asked for protocol, pass it on.
+ if ((d->proto != NULL) &&
+ ((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) {
+ goto err;
+ }
+#undef SETH
+
+ if ((rv = ws_init(&ws, http, req, NULL)) != 0) {
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_DIAL;
+
+ // Move this uaio to the http wait list. Note that it is not
+ // required that the uaio will be completed by this connection.
+ // If another connection attempt completes first, then the first
+ // aio queued will get the result.
+ nni_list_append(&d->httpaios, uaio);
+ nni_aio_set_data(ws->httpaio, 0, d);
+ nni_http_write_req(http, req, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+
+err:
+ nni_aio_finish_error(uaio, rv);
+ if (http != NULL) {
+ nni_http_fini(http);
+ }
+ if (req != NULL) {
+ nni_http_req_fini(req);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_fini(nni_ws_dialer *d)
+{
+ nni_aio_fini(d->conaio);
+ nni_strfree(d->proto);
+ nni_strfree(d->addr);
+ nni_strfree(d->uri);
+ nni_strfree(d->host);
+ nni_strfree(d->serv);
+ nni_strfree(d->path);
+ nni_strfree(d->qinfo);
+ if (d->client) {
+ nni_http_client_fini(d->client);
+ }
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_ws_dialer_init(nni_ws_dialer **dp, const char *url)
+{
+ nni_ws_dialer *d;
+ int rv;
+ nni_aio * aio;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->conaios);
+ nni_aio_list_init(&d->httpaios);
+
+ if ((d->addr = nni_strdup(url)) == NULL) {
+ nni_ws_dialer_fini(d);
+ return (NNG_ENOMEM);
+ }
+ if ((rv = ws_parse_url(
+ url, NULL, &d->host, &d->serv, &d->path, &d->qinfo)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ // XXX: this is synchronous. We should fix this in the HTTP layer.
+ aio->a_addr = &d->sa;
+ nni_plat_tcp_resolv(d->host, d->serv, NNG_AF_UNSPEC, false, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ws_dialer_close(nni_ws_dialer *d)
+{
+ // XXX: what to do here?
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ d->closed = true;
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_cancel(d->conaio, NNG_ECLOSED);
+}
+
+int
+nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&d->mtx);
+ if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (d->proto != NULL) {
+ nni_strfree(d->proto);
+ }
+ d->proto = ns;
+ }
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static void
+ws_dial_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_dialer *d = aio->a_prov_data;
+ nni_mtx_lock(&d->mtx);
+ // If we are waiting, then we can cancel. Otherwise we need
+ // to abort.
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ // This does not cancel in-flight client negotiations with HTTP.
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
+{
+ nni_mtx_lock(&d->mtx);
+ // First look up the host.
+ if (nni_aio_start(aio, ws_dial_cancel, d) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ if (d->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_list_append(&d->conaios, aio);
+
+ if (!d->started) {
+ d->started = true;
+ nni_http_client_connect(d->client, d->conaio);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+
+// Dialer does not get a hook chance, as it can examine the request
+// and reply after dial is done; this is not a 3-way handshake, so
+// the dialer does not confirm the server's response at the HTTP
+// level. (It can still issue a websocket close).
+
+// The implementation will send periodic PINGs, and respond with
+// PONGs.
diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h
new file mode 100644
index 00000000..25add55e
--- /dev/null
+++ b/src/supplemental/websocket/websocket.h
@@ -0,0 +1,63 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+#define NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct nni_tls_config nni_tls_config;
+typedef struct nni_http_req nni_http_req;
+typedef struct nni_http_res nni_http_res;
+
+typedef struct nni_ws nni_ws;
+typedef struct nni_ws_listener nni_ws_listener;
+typedef struct nni_ws_dialer nni_ws_dialer;
+
+typedef int (*nni_ws_listen_hook)(void *, nni_http_req *, nni_http_res *);
+
+// Specify URL as ws://[<host>][:port][/path]
+// If host is missing, INADDR_ANY is assumed. If port is missing,
+// then either 80 or 443 are assumed. Note that ws:// means listen
+// on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY
+// makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?)
+
+extern int nni_ws_listener_init(nni_ws_listener **, const char *);
+extern void nni_ws_listener_fini(nni_ws_listener *);
+extern void nni_ws_listener_close(nni_ws_listener *);
+extern int nni_ws_listener_proto(nni_ws_listener *, const char *);
+extern int nni_ws_listener_listen(nni_ws_listener *);
+extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *);
+extern void nni_ws_listener_hook(
+ nni_ws_listener *, nni_ws_listen_hook, void *);
+extern void nni_ws_listener_tls(nni_ws_listener *, nni_tls_config *);
+
+extern int nni_ws_dialer_init(nni_ws_dialer **, const char *);
+extern void nni_ws_dialer_fini(nni_ws_dialer *);
+extern void nni_ws_dialer_close(nni_ws_dialer *);
+extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *);
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *);
+
+// Dialer does not get a hook chance, as it can examine the request and reply
+// after dial is done; this is not a 3-way handshake, so the dialer does
+// not confirm the server's response at the HTTP level. (It can still issue
+// a websocket close).
+
+extern void nni_ws_send_msg(nni_ws *, nni_aio *);
+extern void nni_ws_recv_msg(nni_ws *, nni_aio *);
+extern nni_http_res *nni_ws_response(nni_ws *);
+extern nni_http_req *nni_ws_request(nni_ws *);
+extern void nni_ws_close(nni_ws *);
+extern void nni_ws_close_error(nni_ws *, uint16_t);
+extern void nni_ws_fini(nni_ws *);
+
+// The implementation will send periodic PINGs, and respond with PONGs.
+
+#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H \ No newline at end of file