aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/websocket/websocket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-01-21 22:40:10 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-16 19:22:27 -0800
commit5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch)
treebf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/supplemental/websocket/websocket.c
parentca655b9db689ee0e655248b1a9f166b8db6cc984 (diff)
downloadnng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic stream API for all transports. There have been related bugs fixed along the way. Additionally the man pages have changed. The old non-polymorphic APIs are removed now. This is a breaking change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
-rw-r--r--src/supplemental/websocket/websocket.c2066
1 files changed, 1365 insertions, 701 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 3d3a68cb..4cecf430 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -18,11 +18,25 @@
#include "supplemental/http/http_api.h"
#include "supplemental/sha1/sha1.h"
+#include <nng/transport/ws/websocket.h>
+
#include "websocket.h"
+// This should be removed or handled differently in the future.
+typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *);
+
+// We have chosen to be a bit more stringent in the size of the frames that
+// we send, while we more generously allow larger incoming frames. These
+// may be tuned by options.
+#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only)
+#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv)
+#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send)
+
+// Alias for checking the prefix of a string.
+#define startswith(s, t) (strncmp(s, t, strlen(t)) == 0)
+
// Pre-defined types for some prototypes. These are from other subsystems.
typedef struct ws_frame ws_frame;
-typedef struct ws_msg ws_msg;
typedef struct ws_header {
nni_list_node node;
@@ -31,17 +45,20 @@ typedef struct ws_header {
} ws_header;
struct nni_ws {
+ nng_stream ops;
nni_list_node node;
nni_reap_item reap;
bool server;
bool closed;
bool ready;
bool wclose;
+ bool isstream;
+ bool inmsg;
nni_mtx mtx;
- nni_list txmsgs;
- nni_list rxmsgs;
nni_list sendq;
nni_list recvq;
+ nni_list txq;
+ nni_list rxq;
ws_frame * txframe;
ws_frame * rxframe;
nni_aio * txaio; // physical aios
@@ -57,26 +74,31 @@ struct nni_ws {
char * reshdrs;
size_t maxframe;
size_t fragsize;
+ size_t recvmax; // largest message size
nni_ws_listener *listener;
nni_ws_dialer * dialer;
};
struct nni_ws_listener {
- nni_http_server * server;
- char * proto;
- nni_mtx mtx;
- nni_cv cv;
- nni_list pend;
- nni_list reply;
- nni_list aios;
- nni_url * url;
- bool started;
- bool closed;
- nni_http_handler * handler;
- nni_ws_listen_hook hookfn;
- void * hookarg;
- nni_list headers; // response headers
- size_t maxframe;
+ nng_stream_listener ops;
+ nni_http_server * server;
+ char * proto;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_list pend;
+ nni_list reply;
+ nni_list aios;
+ nng_url * url;
+ bool started;
+ bool closed;
+ bool isstream;
+ nni_http_handler * handler;
+ nni_ws_listen_hook hookfn;
+ void * hookarg;
+ nni_list headers; // response headers
+ size_t maxframe;
+ size_t fragsize;
+ size_t recvmax; // largest message size
};
// The dialer tracks user aios in two lists. The first list is for aios
@@ -86,18 +108,21 @@ struct nni_ws_listener {
// completion of an earlier connection. (We don't want to establish
// requests when we already have connects negotiating.)
struct nni_ws_dialer {
- nni_http_req * req;
- nni_http_res * res;
- nni_http_client *client;
- nni_mtx mtx;
- nni_cv cv;
- char * proto;
- nni_url * url;
- nni_list wspend; // ws structures still negotiating
- bool closed;
- nng_sockaddr sa;
- nni_list headers; // request headers
- size_t maxframe;
+ nng_stream_dialer ops;
+ nni_http_req * req;
+ nni_http_res * res;
+ nni_http_client * client;
+ nni_mtx mtx;
+ nni_cv cv;
+ char * proto;
+ nng_url * url;
+ nni_list wspend; // ws structures still negotiating
+ bool closed;
+ bool isstream;
+ nni_list headers; // request headers
+ size_t maxframe;
+ size_t fragsize;
+ size_t recvmax;
};
typedef enum ws_type {
@@ -133,16 +158,7 @@ struct ws_frame {
bool masked;
size_t bufsz; // allocated size
uint8_t * buf;
- ws_msg * wmsg;
-};
-
-struct ws_msg {
- nni_list frames;
- nni_list_node node;
- nni_ws * ws;
- nni_aio * aio;
- uint8_t * buf;
- size_t bufsz;
+ nng_aio * aio;
};
static void ws_send_close(nni_ws *ws, uint16_t code);
@@ -150,6 +166,127 @@ static void ws_conn_cb(void *);
static void ws_close_cb(void *);
static void ws_read_cb(void *);
static void ws_write_cb(void *);
+static void ws_close_error(nni_ws *ws, uint16_t code);
+
+static void ws_str_free(void *);
+static void ws_str_close(void *);
+static void ws_str_send(void *, nng_aio *);
+static void ws_str_recv(void *, nng_aio *);
+static int ws_str_getx(void *, const char *, void *, size_t *, nni_type);
+static int ws_str_setx(void *, const char *, const void *, size_t, nni_type);
+
+static void ws_listener_close(void *);
+static void ws_listener_free(void *);
+
+static int
+ws_check_string(const void *v, size_t sz, nni_opt_type t)
+{
+ if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
+ return (NNG_EBADTYPE);
+ }
+ if (nni_strnlen(v, sz) >= sz) {
+ return (NNG_EINVAL);
+ }
+ return (0);
+}
+
+static int
+ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups)
+{
+ ws_header *hdr;
+ char * nv;
+
+ if ((nv = nni_strdup(v)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if (strip_dups) {
+ NNI_LIST_FOREACH (l, hdr) {
+ if (nni_strcasecmp(hdr->name, n) == 0) {
+ nni_strfree(hdr->value);
+ hdr->value = nv;
+ return (0);
+ }
+ }
+ }
+
+ if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) {
+ nni_strfree(nv);
+ return (NNG_ENOMEM);
+ }
+ if ((hdr->name = nni_strdup(n)) == NULL) {
+ nni_strfree(nv);
+ NNI_FREE_STRUCT(hdr);
+ return (NNG_ENOMEM);
+ }
+ hdr->value = nv;
+ nni_list_append(l, hdr);
+ return (0);
+}
+
+static int
+ws_set_header(nni_list *l, const char *n, const char *v)
+{
+ return (ws_set_header_ext(l, n, v, true));
+}
+
+static int
+ws_set_headers(nni_list *l, const char *str)
+{
+ char * dupstr;
+ size_t duplen;
+ char * n;
+ char * v;
+ char * nl;
+ int rv;
+
+ if ((dupstr = nni_strdup(str)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ duplen = strlen(dupstr) + 1; // so we can free it later
+
+ n = dupstr;
+ for (;;) {
+ if ((v = strchr(n, ':')) == NULL) {
+ // Note that this also means that if
+ // a bare word is present, we ignore it.
+ break;
+ }
+ *v = '\0';
+ v++;
+ while (*v == ' ') {
+ // Skip leading whitespace. Not strictly
+ // necessary, but still a good idea.
+ v++;
+ }
+ nl = v;
+ // Find the end of the line -- should be CRLF, but can
+ // also be unterminated or just LF if user
+ while ((*nl != '\0') && (*nl != '\r') && (*nl != '\n')) {
+ nl++;
+ }
+ while ((*nl == '\r') || (*nl == '\n')) {
+ *nl = '\0';
+ nl++;
+ }
+
+ // Note that this can lead to a partial failure. As this
+ // is most likely ENOMEM, don't worry too much about it.
+ // This method does *not* eliminate duplicates.
+ if ((rv = ws_set_header_ext(l, n, v, false)) != 0) {
+ goto done;
+ }
+
+ // Advance to the next name.
+ n = nl;
+ }
+
+ rv = 0;
+
+done:
+ nni_free(dupstr, duplen);
+ return (rv);
+}
// This looks, case independently for a word in a list, which is either
// space or comma separated.
@@ -204,30 +341,13 @@ ws_make_accept(const char *key, char *accept)
static void
ws_frame_fini(ws_frame *frame)
{
- if (frame->bufsz) {
+ if (frame->bufsz != 0) {
nni_free(frame->buf, frame->bufsz);
}
NNI_FREE_STRUCT(frame);
}
static void
-ws_msg_fini(ws_msg *wm)
-{
- ws_frame *frame;
-
- NNI_ASSERT(!nni_list_node_active(&wm->node));
- while ((frame = nni_list_first(&wm->frames)) != NULL) {
- nni_list_remove(&wm->frames, frame);
- ws_frame_fini(frame);
- }
- if (wm->bufsz != 0) {
- nni_free(wm->buf, wm->bufsz);
- }
-
- NNI_FREE_STRUCT(wm);
-}
-
-static void
ws_mask_frame(ws_frame *frame)
{
uint32_t r;
@@ -263,31 +383,19 @@ ws_unmask_frame(ws_frame *frame)
static int
ws_msg_init_control(
- ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
+ ws_frame **framep, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
{
- ws_msg * wm;
ws_frame *frame;
if (len > 125) {
return (NNG_EINVAL);
}
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
- }
- wm->buf = NULL;
- wm->bufsz = 0;
-
if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- ws_msg_fini(wm);
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
memcpy(frame->sdata, buf, len);
-
- nni_list_append(&wm->frames, frame);
- frame->wmsg = wm;
frame->len = len;
frame->final = true;
frame->op = op;
@@ -303,114 +411,100 @@ ws_msg_init_control(
ws_mask_frame(frame);
}
- wm->aio = NULL;
- wm->ws = ws;
- *wmp = wm;
+ *framep = frame;
return (0);
}
static int
-ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
+ws_frame_prep_tx(nni_ws *ws, ws_frame *frame)
{
- ws_msg * wm;
+ nng_aio *aio = frame->aio;
+ nni_iov *iov;
+ unsigned niov;
size_t len;
- size_t maxfrag = ws->fragsize; // make this tunable. (1MB default)
uint8_t *buf;
- uint8_t op;
-
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
- }
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
-
- len = nni_msg_len(msg) + nni_msg_header_len(msg);
- wm->bufsz = len;
- if ((wm->buf = nni_alloc(len)) == NULL) {
- NNI_FREE_STRUCT(wm);
- return (NNG_ENOMEM);
- }
- buf = wm->buf;
- memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg));
- memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg),
- nni_msg_len(msg));
-
- op = WS_BINARY; // to start -- no support for sending TEXT frames
-
- // do ... while because we want at least one frame (even for empty
- // messages.) Headers get their own frame, if present. Best bet
- // is to try not to have a header when coming here.
- do {
- ws_frame *frame;
- if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- ws_msg_fini(wm);
+ // Figure out how much we need for the entire aio.
+ frame->len = 0;
+ nni_aio_get_iov(aio, &niov, &iov);
+ for (unsigned i = 0; i < niov; i++) {
+ frame->len += iov[i].iov_len;
+ }
+
+ if ((frame->len > ws->fragsize) && (ws->fragsize > 0)) {
+ // Limit it to a single frame per policy (fragsize), as needed.
+ frame->len = ws->fragsize;
+ // For stream mode, we constrain ourselves to one frame
+ // per message. Submitter may see a partial transmit, and
+ // should resubmit as needed. For message mode, we will
+ // continue to resubmit.
+ frame->final = ws->isstream ? true : false;
+ } else {
+ // It all fits in this frame (which might not be the first),
+ // so we're done.
+ frame->final = true;
+ }
+ // Potentially allocate space for the data if we need to.
+ // Note that an empty message is legal.
+ if ((frame->bufsz < frame->len) && (frame->len > 0)) {
+ frame->buf = nni_alloc(frame->len);
+ if (frame->buf == NULL) {
return (NNG_ENOMEM);
}
- nni_list_append(&wm->frames, frame);
- frame->wmsg = wm;
- frame->len = len > maxfrag ? maxfrag : len;
- frame->buf = buf;
- frame->op = op;
-
- buf += frame->len;
- len -= frame->len;
- op = WS_CONT;
-
- if (len == 0) {
- frame->final = true;
- }
- frame->head[0] = frame->op;
- frame->hlen = 2;
- if (frame->final) {
- frame->head[0] |= 0x80; // final frame bit
- }
- if (frame->len < 126) {
- frame->head[1] = frame->len & 0x7f;
- } else if (frame->len < 65536) {
- frame->head[1] = 126;
- NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
- frame->hlen += 2;
- } else {
- frame->head[1] = 127;
- NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
- frame->hlen += 8;
- }
+ }
+ buf = frame->buf;
- if (ws->server) {
- frame->masked = false;
- } else {
- ws_mask_frame(frame);
+ // Now copy the data into the frame.
+ len = frame->len;
+ while (len != 0) {
+ size_t n = len;
+ if (n > iov->iov_len) {
+ n = iov->iov_len;
}
+ memcpy(buf, iov->iov_buf, n);
+ iov++;
+ len -= n;
+ buf += n;
+ }
- } while (len);
-
- wm->aio = aio;
- wm->ws = ws;
- *wmp = wm;
- return (0);
-}
+ if (nni_aio_count(aio) == 0) {
+ // This is the first frame.
+ frame->op = WS_BINARY;
+ } else {
+ frame->op = WS_CONT;
+ }
-static int
-ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio)
-{
- ws_msg *wm;
+ // Populate the frame header.
+ frame->head[0] = frame->op;
+ frame->hlen = 2;
+ if (frame->final) {
+ frame->head[0] |= 0x80; // final frame bit
+ }
+ if (frame->len < 126) {
+ frame->head[1] = frame->len & 0x7f;
+ } else if (frame->len < 65536) {
+ frame->head[1] = 126;
+ NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
+ frame->hlen += 2;
+ } else {
+ frame->head[1] = 127;
+ NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
+ frame->hlen += 8;
+ }
- if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
- return (NNG_ENOMEM);
+ // If we are on the client, then we need to mask the frame.
+ frame->masked = false;
+ if (!ws->server) {
+ ws_mask_frame(frame);
}
- NNI_LIST_INIT(&wm->frames, ws_frame, node);
- wm->aio = aio;
- wm->ws = ws;
- *wmp = wm;
return (0);
}
static void
ws_close_cb(void *arg)
{
- nni_ws * ws = arg;
- ws_msg * wm;
- nni_aio *aio;
+ nni_ws * ws = arg;
+ ws_frame *frame;
nni_aio_close(ws->txaio);
nni_aio_close(ws->rxaio);
@@ -422,31 +516,13 @@ ws_close_cb(void *arg)
nni_http_conn_close(ws->http);
- // This list (receive) should be empty.
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
- }
-
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ if (frame->aio != NULL) {
+ nni_aio_list_remove(frame->aio);
+ nni_aio_finish_error(frame->aio, NNG_ECLOSED);
}
- ws_msg_fini(wm);
- }
- ws->txframe = NULL;
-
- if (ws->rxframe != NULL) {
- ws_frame_fini(ws->rxframe);
- ws->rxframe = NULL;
+ ws_frame_fini(frame);
}
// Any txframe should have been killed with its wmsg.
@@ -456,19 +532,13 @@ ws_close_cb(void *arg)
static void
ws_close(nni_ws *ws, uint16_t code)
{
- ws_msg *wm;
+ nng_aio *aio;
// Receive stuff gets aborted always. No further receives
// once we get a close.
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_aio *aio;
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((aio = nni_list_first(&ws->recvq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
// If were closing "gracefully", then don't abort in-flight
@@ -487,7 +557,6 @@ static void
ws_start_write(nni_ws *ws)
{
ws_frame *frame;
- ws_msg * wm;
nni_iov iov[2];
int niov;
@@ -495,13 +564,11 @@ ws_start_write(nni_ws *ws)
return; // busy
}
- if ((wm = nni_list_first(&ws->txmsgs)) == NULL) {
- // Nothing to send.
- return;
+ if ((frame = nni_list_first(&ws->txq)) == NULL) {
+ return; // nothing to send
}
-
- frame = nni_list_first(&wm->frames);
NNI_ASSERT(frame != NULL);
+ nni_list_remove(&ws->txq, frame);
// Push it out.
ws->txframe = frame;
@@ -534,7 +601,6 @@ ws_write_cb(void *arg)
{
nni_ws * ws = arg;
ws_frame *frame;
- ws_msg * wm;
nni_aio * aio;
int rv;
@@ -545,17 +611,20 @@ ws_write_cb(void *arg)
return;
}
ws->txframe = NULL;
+
if (frame->op == WS_CLOSE) {
// If this was a close frame, we are done.
// No other messages may succeed..
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
+ ws->txframe = NULL;
+ ws_frame_fini(frame);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ if ((aio = frame->aio) != NULL) {
+ frame->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
+ ws_frame_fini(frame);
}
- ws_msg_fini(wm);
}
if (ws->wclose) {
ws->wclose = false;
@@ -565,54 +634,55 @@ ws_write_cb(void *arg)
return;
}
- wm = frame->wmsg;
- aio = wm->aio;
-
+ aio = frame->aio;
if ((rv = nni_aio_result(ws->txaio)) != 0) {
-
- nni_list_remove(&ws->txmsgs, wm);
- ws_msg_fini(wm);
+ frame->aio = NULL;
if (aio != NULL) {
- wm->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
-
+ ws_frame_fini(frame);
ws->closed = true;
nni_http_conn_close(ws->http);
nni_mtx_unlock(&ws->mtx);
return;
}
- // good frame, was it the last
- nni_list_remove(&wm->frames, frame);
- ws_frame_fini(frame);
-
- // if we still have more frames to transmit, then schedule it.
- if (!nni_list_empty(&wm->frames)) {
- ws_start_write(ws);
- nni_mtx_unlock(&ws->mtx);
- return;
+ if (aio != NULL) {
+ nni_aio_iov_advance(aio, frame->len);
+ nni_aio_bump_count(aio, frame->len);
+ if (frame->final) {
+ frame->aio = NULL;
+ nni_aio_list_remove(aio);
+ } else {
+ // Clear the aio so that we won't attempt to finish
+ // it outside the lock
+ aio = NULL;
+ }
}
- if (aio != NULL) {
- wm->aio = NULL;
- nni_aio_list_remove(aio);
+ if (frame->final) {
+ ws_frame_fini(frame);
+ } else {
+ // This one cannot fail here, since we only do allocation
+ // at initial scheduling.
+ ws_frame_prep_tx(ws, frame);
+ // Schedule at end. This permits other frames to interleave.
+ nni_list_append(&ws->txq, frame);
}
- nni_list_remove(&ws->txmsgs, wm);
- // Write the next frame.
ws_start_write(ws);
nni_mtx_unlock(&ws->mtx);
- // discard while not holding lock (just deallocations)
- ws_msg_fini(wm);
-
+ // We attempt to finish the operation synchronously, outside the lock.
if (aio != NULL) {
- nng_msg *msg = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
- nni_msg_free(msg);
+ nni_msg *msg;
+ // Successful send, don't leak the message!
+ if ((msg = nni_aio_get_msg(aio)) != NULL) {
+ nni_aio_set_msg(aio, NULL);
+ nni_msg_free(msg);
+ }
+ nni_aio_finish_synch(aio, 0, nni_aio_count(aio));
}
}
@@ -620,7 +690,6 @@ static void
ws_write_cancel(nni_aio *aio, void *arg, int rv)
{
nni_ws * ws = arg;
- ws_msg * wm;
ws_frame *frame;
// Is this aio active? We can tell by looking at the active tx frame.
@@ -630,17 +699,17 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&ws->mtx);
return;
}
- wm = nni_aio_get_prov_extra(aio, 0);
- if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
+ frame = nni_aio_get_prov_extra(aio, 0);
+ if (frame == ws->txframe) {
nni_aio_abort(ws->txaio, rv);
// We will wait for callback on the txaio to finish aio.
- } else if (nni_list_active(&ws->txmsgs, wm)) {
+ } else {
// If scheduled, just need to remove node and complete it.
- nni_list_remove(&ws->txmsgs, wm);
- wm->aio = NULL;
+ nni_list_remove(&ws->txq, frame);
+ frame->aio = NULL;
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
+ ws_frame_fini(frame);
}
nni_mtx_unlock(&ws->mtx);
}
@@ -648,10 +717,10 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
static void
ws_send_close(nni_ws *ws, uint16_t code)
{
- ws_msg * wm;
- uint8_t buf[sizeof(uint16_t)];
- int rv;
- nni_aio *aio;
+ ws_frame *frame;
+ uint8_t buf[sizeof(uint16_t)];
+ int rv;
+ nni_aio * aio;
NNI_PUT16(buf, code);
@@ -665,125 +734,45 @@ ws_send_close(nni_ws *ws, uint16_t code)
return;
}
ws->wclose = true;
- rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf));
+ rv = ws_msg_init_control(&frame, ws, WS_CLOSE, buf, sizeof(buf));
if (rv != 0) {
ws->wclose = false;
nni_aio_finish_error(aio, rv);
return;
}
- // Close frames get priority!
if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) {
ws->wclose = false;
nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
return;
}
- nni_list_prepend(&ws->txmsgs, wm);
+ // This gets inserted at the head.
+ nni_list_prepend(&ws->txq, frame);
ws_start_write(ws);
}
static void
ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len)
{
- ws_msg *wm;
+ ws_frame *frame;
// Note that we do not care if this works or not. So no AIO needed.
if ((ws->closed) ||
- (ws_msg_init_control(&wm, ws, op, buf, len) != 0)) {
+ (ws_msg_init_control(&frame, ws, op, buf, len) != 0)) {
return;
}
// Control frames at head of list. (Note that this may preempt
// the close frame or other ping/pong requests. Oh well.)
- nni_list_prepend(&ws->txmsgs, wm);
+ nni_list_prepend(&ws->txq, frame);
ws_start_write(ws);
}
-static const nni_option ws_options[] = {
- {
- .o_name = NULL,
- },
-};
-
-int
-nni_ws_getopt(nni_ws *ws, const char *name, void *buf, size_t *szp, nni_type t)
-{
- int rv;
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_http_conn_getopt(ws->http, name, buf, szp, t);
- if (rv == NNG_ENOTSUP) {
- rv = nni_getopt(ws_options, name, ws, buf, szp, t);
- }
- nni_mtx_unlock(&ws->mtx);
- return (rv);
-}
-
-int
-nni_ws_setopt(
- nni_ws *ws, const char *name, const void *buf, size_t sz, nni_type t)
-{
- int rv;
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_http_conn_setopt(ws->http, name, buf, sz, t);
- if (rv == NNG_ENOTSUP) {
- rv = nni_setopt(ws_options, name, ws, buf, sz, t);
- }
- nni_mtx_unlock(&ws->mtx);
- return (rv);
-}
-
-void
-nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
-{
- ws_msg * wm;
- nni_msg *msg;
- int rv;
-
- msg = nni_aio_get_msg(aio);
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_mtx_lock(&ws->mtx);
- if (ws->closed) {
- nni_mtx_unlock(&ws->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- ws_msg_fini(wm);
- return;
- }
- if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) {
- nni_mtx_unlock(&ws->mtx);
- nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
- return;
- }
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->sendq, aio);
- nni_list_append(&ws->txmsgs, wm);
- ws_start_write(ws);
- nni_mtx_unlock(&ws->mtx);
-}
-
static void
ws_start_read(nni_ws *ws)
{
ws_frame *frame;
- ws_msg * wm;
nni_aio * aio;
nni_iov iov;
@@ -791,18 +780,18 @@ ws_start_read(nni_ws *ws)
return; // already reading or closed
}
- if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) {
- return; // no body expecting a message.
+ // If nobody is waiting for recv, and we already have a data
+ // frame, stop reading. This keeps us from buffering infinitely.
+ if (nni_list_empty(&ws->recvq) && !nni_list_empty(&ws->rxq)) {
+ return;
}
if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if ((aio = wm->aio) != NULL) {
- wm->aio = NULL;
+ if ((aio = nni_list_first(&ws->recvq)) != NULL) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ENOMEM);
}
- ws_msg_fini(wm);
+ ws_close(ws, WS_CLOSE_INTERNAL);
return;
}
@@ -820,34 +809,143 @@ ws_start_read(nni_ws *ws)
}
static void
-ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop)
+ws_read_finish_str(nni_ws *ws)
{
- ws_msg *wm = nni_list_first(&ws->rxmsgs);
+ for (;;) {
+ nni_aio * aio;
+ nni_iov * iov;
+ unsigned niov;
+ ws_frame *frame;
- switch (frame->op) {
- case WS_CONT:
- if (wm == NULL) {
- ws_close(ws, WS_CLOSE_GOING_AWAY);
+ if ((aio = nni_list_first(&ws->recvq)) == NULL) {
return;
}
- if (nni_list_empty(&wm->frames)) {
+
+ if ((frame = nni_list_first(&ws->rxq)) == NULL) {
+ return;
+ }
+
+ // Discard 0 length frames -- in stream mode they are not used.
+ if (frame->len == 0) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
+ continue;
+ }
+
+ // We are completing this aio one way or the other.
+ nni_aio_list_remove(aio);
+ nni_aio_get_iov(aio, &niov, &iov);
+
+ while ((frame != NULL) && (niov != 0)) {
+ size_t n;
+
+ if ((n = frame->len) > iov->iov_len) {
+ // This eats the entire iov.
+ n = iov->iov_len;
+ }
+ iov->iov_buf = ((uint8_t *) iov->iov_buf) + n;
+ iov->iov_len -= n;
+ if (iov->iov_len == 0) {
+ iov++;
+ niov--;
+ }
+
+ if (frame->len == n) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
+ frame = nni_list_first(&ws->rxq);
+ } else {
+ frame->len -= n;
+ frame->buf += n;
+ }
+
+ nni_aio_bump_count(aio, n);
+ }
+
+ nni_aio_finish(aio, 0, nni_aio_count(aio));
+ }
+}
+
+static void
+ws_read_finish_msg(nni_ws *ws)
+{
+ nni_aio * aio;
+ size_t len;
+ ws_frame *frame;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+
+ // If we have no data, no waiter, or have not received the complete
+ // message yet, then there is nothing to do.
+ if (ws->inmsg || nni_list_empty(&ws->rxq) ||
+ ((aio = nni_list_first(&ws->recvq)) == NULL)) {
+ return;
+ }
+
+ // At this point, we have both a complete message in the queue (and
+ // there should not be any frames other than the for the message),
+ // and a waiting reader.
+ len = 0;
+ NNI_LIST_FOREACH (&ws->rxq, frame) {
+ len += frame->len;
+ }
+
+ nni_aio_list_remove(aio);
+
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ ws_close_error(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ while ((frame = nni_list_first(&ws->rxq)) != NULL) {
+ nni_list_remove(&ws->rxq, frame);
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ ws_frame_fini(frame);
+ }
+
+ nni_aio_set_msg(aio, msg);
+ nni_aio_bump_count(aio, nni_msg_len(msg));
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+ws_read_finish(nni_ws *ws)
+{
+ if (ws->isstream) {
+ ws_read_finish_str(ws);
+ } else {
+ ws_read_finish_msg(ws);
+ }
+}
+
+static void
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+{
+ switch (frame->op) {
+ case WS_CONT:
+ if (!ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
return;
}
+ if (frame->final) {
+ ws->inmsg = false;
+ }
ws->rxframe = NULL;
- nni_list_append(&wm->frames, frame);
+ nni_list_append(&ws->rxq, frame);
break;
case WS_BINARY:
- if (wm == NULL) {
- ws_close(ws, WS_CLOSE_GOING_AWAY);
- return;
- }
- if (!nni_list_empty(&wm->frames)) {
+ if (ws->inmsg) {
ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
return;
}
+ if (!frame->final) {
+ ws->inmsg = true;
+ }
ws->rxframe = NULL;
- nni_list_append(&wm->frames, frame);
+ nni_list_append(&ws->rxq, frame);
break;
case WS_TEXT:
// No support for text mode at present.
@@ -880,23 +978,13 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop)
return;
}
- // If this was the last (final) frame, then complete it. But
- // we have to look at the msg, since we might have got a
- // control frame.
- if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
- nni_list_remove(&ws->rxmsgs, wm);
- *wmp = wm;
- *aiop = wm->aio;
- nni_aio_list_remove(wm->aio);
- wm->aio = NULL;
- }
+ ws_read_finish(ws);
}
static void
ws_read_cb(void *arg)
{
- nni_ws * ws = arg;
- ws_msg * wm;
+ nni_ws * ws = arg;
nni_aio * aio = ws->rxaio;
ws_frame *frame;
int rv;
@@ -976,6 +1064,21 @@ ws_read_cb(void *arg)
nni_mtx_unlock(&ws->mtx);
return;
}
+ // For message mode, also check to make sure that the overall
+ // length of the message has not exceeded our recvmax.
+ // (Protect against an infinite stream of small messages!)
+ if ((!ws->isstream) && (ws->recvmax > 0)) {
+ size_t totlen = frame->len;
+ ws_frame *fr2;
+ NNI_LIST_FOREACH (&ws->rxq, fr2) {
+ totlen += fr2->len;
+ }
+ if (totlen > ws->recvmax) {
+ ws_close(ws, WS_CLOSE_TOO_BIG);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
// Check for masking. (We don't actually do the unmask
// here, because we don't have data yet.)
@@ -1023,134 +1126,40 @@ ws_read_cb(void *arg)
// At this point, we have a complete frame.
ws_unmask_frame(frame); // idempotent
- wm = NULL;
- aio = NULL;
- ws_read_frame_cb(ws, frame, &wm, &aio);
+ ws_read_frame_cb(ws, frame);
ws_start_read(ws);
nni_mtx_unlock(&ws->mtx);
-
- // Got a good message, so we have to do the work to send it up.
- if (wm != NULL) {
- size_t len = 0;
- nni_msg *msg;
- uint8_t *body;
- int rv;
-
- NNI_LIST_FOREACH (&wm->frames, frame) {
- len += frame->len;
- }
- if ((rv = nni_msg_alloc(&msg, len)) != 0) {
- nni_aio_finish_error(aio, rv);
- ws_msg_fini(wm);
- nni_ws_close_error(ws, WS_CLOSE_INTERNAL);
- return;
- }
- body = nni_msg_body(msg);
- NNI_LIST_FOREACH (&wm->frames, frame) {
- memcpy(body, frame->buf, frame->len);
- body += frame->len;
- }
- nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
- ws_msg_fini(wm);
- }
}
static void
ws_read_cancel(nni_aio *aio, void *arg, int rv)
{
nni_ws *ws = arg;
- ws_msg *wm;
nni_mtx_lock(&ws->mtx);
- if (!nni_aio_list_active(aio)) {
- nni_mtx_unlock(&ws->mtx);
- return;
- }
- wm = nni_aio_get_prov_extra(aio, 0);
- if (wm == nni_list_first(&ws->rxmsgs)) {
- // Cancellation will percolate back up.
- nni_aio_abort(ws->rxaio, rv);
- } else if (nni_list_active(&ws->rxmsgs, wm)) {
- nni_list_remove(&ws->rxmsgs, wm);
- ws_msg_fini(wm);
+ if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
-{
- ws_msg *wm;
- int rv;
-
- if (nni_aio_begin(aio) != 0) {
- return;
- }
- if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_mtx_lock(&ws->mtx);
- if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) {
- nni_mtx_unlock(&ws->mtx);
- ws_msg_fini(wm);
- nni_aio_finish_error(aio, rv);
- return;
- }
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->recvq, aio);
- nni_list_append(&ws->rxmsgs, wm);
- ws_start_read(ws);
-
- nni_mtx_unlock(&ws->mtx);
-}
-
-void
-nni_ws_close_error(nni_ws *ws, uint16_t code)
+static void
+ws_close_error(nni_ws *ws, uint16_t code)
{
nni_mtx_lock(&ws->mtx);
ws_close(ws, code);
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_close(nni_ws *ws)
-{
- nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
-}
-
-const char *
-nni_ws_request_headers(nni_ws *ws)
-{
- nni_mtx_lock(&ws->mtx);
- if (ws->reqhdrs == NULL) {
- ws->reqhdrs = nni_http_req_headers(ws->req);
- }
- nni_mtx_unlock(&ws->mtx);
- return (ws->reqhdrs);
-}
-
-const char *
-nni_ws_response_headers(nni_ws *ws)
-{
- nni_mtx_lock(&ws->mtx);
- if (ws->reshdrs == NULL) {
- ws->reshdrs = nni_http_res_headers(ws->res);
- }
- nni_mtx_unlock(&ws->mtx);
- return (ws->reshdrs);
-}
-
static void
ws_fini(void *arg)
{
- nni_ws *ws = arg;
- ws_msg *wm;
+ nni_ws * ws = arg;
+ ws_frame *frame;
+ nng_aio * aio;
- nni_ws_close(ws);
+ ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
// Give a chance for the close frame to drain.
if (ws->closeaio) {
@@ -1175,25 +1184,29 @@ ws_fini(void *arg)
}
nni_mtx_lock(&ws->mtx);
- while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
- nni_list_remove(&ws->rxmsgs, wm);
- if (wm->aio) {
- nni_aio_finish_error(wm->aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((frame = nni_list_first(&ws->rxq)) != NULL) {
+ nni_list_remove(&ws->rxq, frame);
+ ws_frame_fini(frame);
}
- while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
- nni_list_remove(&ws->txmsgs, wm);
- if (wm->aio) {
- nni_aio_finish_error(wm->aio, NNG_ECLOSED);
- }
- ws_msg_fini(wm);
+ while ((frame = nni_list_first(&ws->txq)) != NULL) {
+ nni_list_remove(&ws->txq, frame);
+ ws_frame_fini(frame);
}
- if (ws->rxframe) {
+ if (ws->rxframe != NULL) {
ws_frame_fini(ws->rxframe);
}
+ if (ws->txframe != NULL) {
+ ws_frame_fini(ws->txframe);
+ }
+
+ while (((aio = nni_list_first(&ws->recvq)) != NULL) ||
+ ((aio = nni_list_first(&ws->sendq)) != NULL)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
nni_mtx_unlock(&ws->mtx);
if (ws->http) {
@@ -1217,8 +1230,8 @@ ws_fini(void *arg)
NNI_FREE_STRUCT(ws);
}
-void
-nni_ws_fini(nni_ws *ws)
+static void
+ws_reap(nni_ws *ws)
{
nni_reap(&ws->reap, ws_fini, ws);
}
@@ -1232,8 +1245,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
nni_mtx_lock(&l->mtx);
nni_list_remove(&l->reply, ws);
if (nni_aio_result(aio) != 0) {
- nni_ws_fini(ws);
nni_mtx_unlock(&l->mtx);
+ ws_reap(ws);
return;
}
ws->ready = true;
@@ -1320,14 +1333,14 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
(!ws_contains_word(ptr, "upgrade")) ||
((ptr = GETH("Upgrade")) == NULL) ||
(strcmp(ptr, "websocket") != 0)) {
- nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
rv = NNG_EPROTO;
goto err;
}
if (d->proto != NULL) {
if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) ||
(!ws_contains_word(d->proto, ptr))) {
- nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
rv = NNG_EPROTO;
goto err;
}
@@ -1358,7 +1371,7 @@ err:
}
nni_mtx_unlock(&d->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
}
static void
@@ -1384,8 +1397,8 @@ ws_init(nni_ws **wsp)
return (NNG_ENOMEM);
}
nni_mtx_init(&ws->mtx);
- NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
- NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+ NNI_LIST_INIT(&ws->rxq, ws_frame, node);
+ NNI_LIST_INIT(&ws->txq, ws_frame, node);
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);
@@ -1401,17 +1414,25 @@ ws_init(nni_ws **wsp)
nni_aio_set_timeout(ws->closeaio, 100);
nni_aio_set_timeout(ws->httpaio, 2000);
+ ws->ops.s_close = ws_str_close;
+ ws->ops.s_free = ws_str_free;
+ ws->ops.s_send = ws_str_send;
+ ws->ops.s_recv = ws_str_recv;
+ ws->ops.s_getx = ws_str_getx;
+ ws->ops.s_setx = ws_str_setx;
+
ws->fragsize = 1 << 20; // we won't send a frame larger than this
*wsp = ws;
return (0);
}
-void
-nni_ws_listener_fini(nni_ws_listener *l)
+static void
+ws_listener_free(void *arg)
{
- ws_header *hdr;
+ nni_ws_listener *l = arg;
+ ws_header * hdr;
- nni_ws_listener_close(l);
+ ws_listener_close(l);
nni_mtx_lock(&l->mtx);
while (!nni_list_empty(&l->reply)) {
@@ -1437,7 +1458,7 @@ nni_ws_listener_fini(nni_ws_listener *l)
NNI_FREE_STRUCT(hdr);
}
if (l->url) {
- nni_url_free(l->url);
+ nng_url_free(l->url);
}
NNI_FREE_STRUCT(l);
}
@@ -1456,6 +1477,7 @@ ws_handler(nni_aio *aio)
uint16_t status;
int rv;
char key[29];
+ ws_header * hdr;
req = nni_aio_get_input(aio, 0);
h = nni_aio_get_input(aio, 1);
@@ -1542,6 +1564,20 @@ ws_handler(nni_aio *aio)
goto err;
}
+ // Set any user supplied headers. This is better than using a hook
+ // for most things, because it is loads easier.
+ NNI_LIST_FOREACH (&l->headers, hdr) {
+ if (SETH(hdr->name, hdr->value) != 0) {
+ status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_free(res);
+ goto err;
+ }
+ }
+
+ // The hook function gives us the ability to intercept the HTTP
+ // response altogether. Its best not to do this unless you really
+ // need to, because it's much more complex. But if you want to set
+ // up an HTTP Authorization handler this might be the only choice.
if (l->hookfn != NULL) {
rv = l->hookfn(l->hookarg, req, res);
if (rv != 0) {
@@ -1583,8 +1619,9 @@ ws_handler(nni_aio *aio)
ws->res = res;
ws->server = true;
ws->maxframe = l->maxframe;
-
- // XXX: Inherit fragmentation? (Frag is limited for now).
+ ws->fragsize = l->fragsize;
+ ws->recvmax = l->recvmax;
+ ws->isstream = l->isstream;
nni_list_append(&l->reply, ws);
nni_aio_set_data(ws->httpaio, 0, l);
@@ -1603,71 +1640,6 @@ err:
}
}
-int
-nni_ws_listener_init(nni_ws_listener **wslp, nni_url *url)
-{
- nni_ws_listener *l;
- int rv;
- char * host;
-
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&l->mtx);
- nni_cv_init(&l->cv, &l->mtx);
- nni_aio_list_init(&l->aios);
-
- NNI_LIST_INIT(&l->pend, nni_ws, node);
- NNI_LIST_INIT(&l->reply, nni_ws, node);
-
- // make a private copy of the url structure.
- if ((rv = nni_url_clone(&l->url, url)) != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- host = l->url->u_hostname;
- if (strlen(host) == 0) {
- host = NULL;
- }
- rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler);
- if (rv != 0) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) ||
- ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) ||
- ((rv = nni_http_server_init(&l->server, url)) != 0)) {
- nni_ws_listener_fini(l);
- return (rv);
- }
-
- l->maxframe = 0;
- *wslp = l;
- return (0);
-}
-
-int
-nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
-{
- int rv = 0;
- char *ns;
- nni_mtx_lock(&l->mtx);
- if (l->started) {
- rv = NNG_EBUSY;
- } else if ((ns = nni_strdup(proto)) == NULL) {
- rv = NNG_ENOMEM;
- } else {
- if (l->proto != NULL) {
- nni_strfree(l->proto);
- }
- l->proto = ns;
- }
- nni_mtx_unlock(&l->mtx);
- return (rv);
-}
-
static void
ws_accept_cancel(nni_aio *aio, void *arg, int rv)
{
@@ -1681,11 +1653,12 @@ ws_accept_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
+static void
+ws_listener_accept(void *arg, nni_aio *aio)
{
- nni_ws *ws;
- int rv;
+ nni_ws_listener *l = arg;
+ nni_ws * ws;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1717,10 +1690,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ws_listener_close(nni_ws_listener *l)
+static void
+ws_listener_close(void *arg)
{
- nni_ws *ws;
+ nni_ws_listener *l = arg;
+ nni_ws * ws;
nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_mtx_unlock(&l->mtx);
@@ -1733,18 +1707,30 @@ nni_ws_listener_close(nni_ws_listener *l)
l->started = false;
}
NNI_LIST_FOREACH (&l->pend, ws) {
- nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ ws_close_error(ws, WS_CLOSE_GOING_AWAY);
}
NNI_LIST_FOREACH (&l->reply, ws) {
- nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ ws_close_error(ws, WS_CLOSE_GOING_AWAY);
}
nni_mtx_unlock(&l->mtx);
}
-int
-nni_ws_listener_listen(nni_ws_listener *l)
+// XXX: Consider replacing this with an option.
+void
+nni_ws_listener_hook(
+ nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
{
- int rv;
+ nni_mtx_lock(&l->mtx);
+ l->hookfn = hookfn;
+ l->hookarg = hookarg;
+ nni_mtx_unlock(&l->mtx);
+}
+
+static int
+ws_listener_listen(void *arg)
+{
+ nni_ws_listener *l = arg;
+ int rv;
nni_mtx_lock(&l->mtx);
if (l->closed) {
@@ -1777,42 +1763,290 @@ nni_ws_listener_listen(nni_ws_listener *l)
return (0);
}
-void
-nni_ws_listener_hook(
- nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
+static int
+ws_listener_set_size(
+ nni_ws_listener *l, size_t *valp, const void *buf, size_t sz, nni_type t)
{
- nni_mtx_lock(&l->mtx);
- l->hookfn = hookfn;
- l->hookarg = hookarg;
- nni_mtx_unlock(&l->mtx);
+ size_t val;
+ int rv;
+
+ // Max size is limited to 4 GB, but you really never want to have
+ // to have a larger value. If you think you need that, you're doing it
+ // wrong. You *can* set the size to 0 for unlimited.
+ if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ *valp = val;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
}
-int
-nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls)
+static int
+ws_listener_get_size(
+ nni_ws_listener *l, size_t *valp, void *buf, size_t *szp, nni_type t)
{
- int rv;
+ size_t val;
nni_mtx_lock(&l->mtx);
- rv = nni_http_server_set_tls(l->server, tls);
+ val = *valp;
nni_mtx_unlock(&l->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+ws_listener_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->maxframe, buf, sz, t));
+}
+
+static int
+ws_listener_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->maxframe, buf, szp, t));
+}
+
+static int
+ws_listener_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->fragsize, buf, sz, t));
+}
+
+static int
+ws_listener_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->fragsize, buf, szp, t));
+}
+
+static int
+ws_listener_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_set_size(l, &l->recvmax, buf, sz, t));
+}
+
+static int
+ws_listener_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ return (ws_listener_get_size(l, &l->recvmax, buf, szp, t));
+}
+
+static int
+ws_listener_set_res_headers(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ rv = ws_set_headers(&l->headers, buf);
+ nni_mtx_unlock(&l->mtx);
+ }
return (rv);
}
-int
-nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp)
+static int
+ws_listener_set_proto(void *arg, const void *buf, size_t sz, nni_type t)
{
- int rv;
+ nni_ws_listener *l = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ char *ns;
+ if ((ns = nni_strdup(buf)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ nni_mtx_lock(&l->mtx);
+ if (l->proto != NULL) {
+ nni_strfree(l->proto);
+ }
+ l->proto = ns;
+ nni_mtx_unlock(&l->mtx);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
nni_mtx_lock(&l->mtx);
- rv = nni_http_server_get_tls(l->server, tlsp);
+ rv = nni_copyout_str(l->proto != NULL ? l->proto : "", buf, szp, t);
nni_mtx_unlock(&l->mtx);
return (rv);
}
-void
-nni_ws_listener_set_maxframe(nni_ws_listener *l, size_t maxframe)
+static int
+ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
{
+ nni_ws_listener *l = arg;
+ int rv;
+ bool b;
+
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ l->isstream = !b;
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
nni_mtx_lock(&l->mtx);
- l->maxframe = maxframe;
+ rv = nni_copyout_str(l->url->u_rawurl, buf, szp, t);
nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static const nni_option ws_listener_options[] = {
+ {
+ .o_name = NNI_OPT_WS_MSGMODE,
+ .o_set = ws_listener_set_msgmode,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_set = ws_listener_set_maxframe,
+ .o_get = ws_listener_get_maxframe,
+ },
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_set = ws_listener_set_fragsize,
+ .o_get = ws_listener_get_fragsize,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_set = ws_listener_set_recvmax,
+ .o_get = ws_listener_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_set = ws_listener_set_res_headers,
+ // XXX: Get not implemented yet; likely of marginal value.
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_set = ws_listener_set_proto,
+ .o_get = ws_listener_get_proto,
+ },
+ {
+ .o_name = NNG_OPT_URL,
+ .o_get = ws_listener_get_url,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_listener_set_header(nni_ws_listener *l, const char *name, const void *buf,
+ size_t sz, nni_type t)
+{
+ int rv;
+ name += strlen(NNG_OPT_WS_RESPONSE_HEADER);
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&l->mtx);
+ rv = ws_set_header(&l->headers, name, buf);
+ nni_mtx_unlock(&l->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ rv = nni_setopt(ws_listener_options, name, l, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_server_setx(l->server, name, buf, sz, t);
+ }
+
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_listener_set_header(l, name, buf, sz, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_listener *l = arg;
+ int rv;
+
+ rv = nni_getopt(ws_listener_options, name, l, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_server_getx(l->server, name, buf, szp, t);
+ }
+ return (rv);
+}
+
+int
+nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url)
+{
+ nni_ws_listener *l;
+ int rv;
+ char * host;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ nni_aio_list_init(&l->aios);
+
+ NNI_LIST_INIT(&l->pend, nni_ws, node);
+ NNI_LIST_INIT(&l->reply, nni_ws, node);
+
+ // make a private copy of the url structure.
+ if ((rv = nng_url_clone(&l->url, url)) != 0) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ host = l->url->u_hostname;
+ if (strlen(host) == 0) {
+ host = NULL;
+ }
+ rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler);
+ if (rv != 0) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) ||
+ ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) ||
+ ((rv = nni_http_server_init(&l->server, url)) != 0)) {
+ ws_listener_free(l);
+ return (rv);
+ }
+
+ l->fragsize = WS_DEF_MAXTXFRAME;
+ l->maxframe = WS_DEF_MAXRXFRAME;
+ l->recvmax = WS_DEF_RECVMAX;
+ l->isstream = false;
+ l->ops.sl_free = ws_listener_free;
+ l->ops.sl_close = ws_listener_close;
+ l->ops.sl_accept = ws_listener_accept;
+ l->ops.sl_listen = ws_listener_listen;
+ l->ops.sl_setx = ws_listener_setx;
+ l->ops.sl_getx = ws_listener_getx;
+ *wslp = (void *) l;
+ return (0);
}
void
@@ -1846,7 +2080,7 @@ ws_conn_cb(void *arg)
nni_cv_wake(&d->cv);
}
nni_mtx_unlock(&d->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
} else {
nni_mtx_unlock(&d->mtx);
}
@@ -1861,7 +2095,7 @@ ws_conn_cb(void *arg)
// This request was canceled for some reason.
nni_http_conn_fini(http);
nni_mtx_unlock(&ws->mtx);
- nni_ws_fini(ws);
+ ws_reap(ws);
return;
}
@@ -1908,13 +2142,14 @@ err:
if (req != NULL) {
nni_http_req_free(req);
}
- nni_ws_fini(ws);
+ ws_reap(ws);
}
-void
-nni_ws_dialer_fini(nni_ws_dialer *d)
+static void
+ws_dialer_free(void *arg)
{
- ws_header *hdr;
+ nni_ws_dialer *d = arg;
+ ws_header * hdr;
nni_mtx_lock(&d->mtx);
while (!nni_list_empty(&d->wspend)) {
@@ -1933,65 +2168,18 @@ nni_ws_dialer_fini(nni_ws_dialer *d)
nni_http_client_fini(d->client);
}
if (d->url) {
- nni_url_free(d->url);
+ nng_url_free(d->url);
}
nni_cv_fini(&d->cv);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
-int
-nni_ws_dialer_init(nni_ws_dialer **dp, nni_url *url)
-{
- nni_ws_dialer *d;
- int rv;
-
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
- NNI_LIST_INIT(&d->headers, ws_header, node);
- NNI_LIST_INIT(&d->wspend, nni_ws, node);
- nni_mtx_init(&d->mtx);
- nni_cv_init(&d->cv, &d->mtx);
-
- if ((rv = nni_url_clone(&d->url, url)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
-
- if ((rv = nni_http_client_init(&d->client, url)) != 0) {
- nni_ws_dialer_fini(d);
- return (rv);
- }
- d->maxframe = 0;
- *dp = d;
- return (0);
-}
-
-int
-nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls)
-{
- int rv;
- nni_mtx_lock(&d->mtx);
- rv = nni_http_client_set_tls(d->client, tls);
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
-int
-nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp)
-{
- int rv;
- nni_mtx_lock(&d->mtx);
- rv = nni_http_client_get_tls(d->client, tlsp);
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
-void
-nni_ws_dialer_close(nni_ws_dialer *d)
+static void
+ws_dialer_close(void *arg)
{
- nni_ws *ws;
+ nni_ws_dialer *d = arg;
+ nni_ws * ws;
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
@@ -2005,24 +2193,6 @@ nni_ws_dialer_close(nni_ws_dialer *d)
nni_mtx_unlock(&d->mtx);
}
-int
-nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
-{
- int rv = 0;
- char *ns;
- nni_mtx_lock(&d->mtx);
- if ((ns = nni_strdup(proto)) == NULL) {
- rv = NNG_ENOMEM;
- } else {
- if (d->proto != NULL) {
- nni_strfree(d->proto);
- }
- d->proto = ns;
- }
- nni_mtx_unlock(&d->mtx);
- return (rv);
-}
-
static void
ws_dial_cancel(nni_aio *aio, void *arg, int rv)
{
@@ -2038,11 +2208,12 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&ws->mtx);
}
-void
-nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
+static void
+ws_dialer_dial(void *arg, nni_aio *aio)
{
- nni_ws *ws;
- int rv;
+ nni_ws_dialer *d = arg;
+ nni_ws * ws;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -2055,72 +2226,275 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
if (d->closed) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
- ws_fini(ws);
+ ws_reap(ws);
return;
}
if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- ws_fini(ws);
+ ws_reap(ws);
return;
}
ws->dialer = d;
ws->useraio = aio;
ws->server = false;
ws->maxframe = d->maxframe;
+ ws->isstream = d->isstream;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
static int
-ws_set_header(nni_list *l, const char *n, const char *v)
+ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t)
{
- ws_header *hdr;
- char * nv;
+ nni_ws_dialer *d = arg;
+ int rv;
+ bool b;
- if ((nv = nni_strdup(v)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ d->isstream = !b;
+ nni_mtx_unlock(&d->mtx);
}
+ return (rv);
+}
- NNI_LIST_FOREACH (l, hdr) {
- if (nni_strcasecmp(hdr->name, n) == 0) {
- nni_strfree(hdr->value);
- hdr->value = nv;
- return (0);
- }
- }
+static int
+ws_dialer_set_size(
+ nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t)
+{
+ size_t val;
+ int rv;
- if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) {
- nni_strfree(nv);
- return (NNG_ENOMEM);
- }
- if ((hdr->name = nni_strdup(n)) == NULL) {
- nni_strfree(nv);
- NNI_FREE_STRUCT(hdr);
- return (NNG_ENOMEM);
+ // Max size is limited to 4 GB, but you really never want to have
+ // to have a larger value. If you think you need that, you're doing it
+ // wrong. You *can* set the size to 0 for unlimited.
+ if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ *valp = val;
+ nni_mtx_unlock(&d->mtx);
}
- hdr->value = nv;
- nni_list_append(l, hdr);
- return (0);
+ return (rv);
}
-int
-nni_ws_dialer_header(nni_ws_dialer *d, const char *n, const char *v)
+static int
+ws_dialer_get_size(
+ nni_ws_dialer *d, size_t *valp, void *buf, size_t *szp, nni_type t)
{
- int rv;
+ size_t val;
nni_mtx_lock(&d->mtx);
- rv = ws_set_header(&d->headers, n, v);
+ val = *valp;
nni_mtx_unlock(&d->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+ws_dialer_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->maxframe, buf, sz, t));
+}
+
+static int
+ws_dialer_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->maxframe, buf, szp, t));
+}
+
+static int
+ws_dialer_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->fragsize, buf, sz, t));
+}
+
+static int
+ws_dialer_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->fragsize, buf, szp, t));
+}
+
+static int
+ws_dialer_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_set_size(d, &d->recvmax, buf, sz, t));
+}
+
+static int
+ws_dialer_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ return (ws_dialer_get_size(d, &d->recvmax, buf, szp, t));
+}
+
+static int
+ws_dialer_set_req_headers(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ rv = ws_set_headers(&d->headers, buf);
+ nni_mtx_unlock(&d->mtx);
+ }
return (rv);
}
-void
-nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe)
+static int
+ws_dialer_set_proto(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ char *ns;
+ if ((ns = nni_strdup(buf)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ nni_mtx_lock(&d->mtx);
+ if (d->proto != NULL) {
+ nni_strfree(d->proto);
+ }
+ d->proto = ns;
+ nni_mtx_unlock(&d->mtx);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t)
{
+ nni_ws_dialer *d = arg;
+ int rv;
nni_mtx_lock(&d->mtx);
- d->maxframe = maxframe;
+ rv = nni_copyout_str(d->proto != NULL ? d->proto : "", buf, szp, t);
nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static const nni_option ws_dialer_options[] = {
+ {
+ .o_name = NNI_OPT_WS_MSGMODE,
+ .o_set = ws_dialer_set_msgmode,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_set = ws_dialer_set_maxframe,
+ .o_get = ws_dialer_get_maxframe,
+ },
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_set = ws_dialer_set_fragsize,
+ .o_get = ws_dialer_get_fragsize,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_set = ws_dialer_set_recvmax,
+ .o_get = ws_dialer_get_recvmax,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_set = ws_dialer_set_req_headers,
+ // XXX: Get not implemented yet; likely of marginal value.
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_set = ws_dialer_set_proto,
+ .o_get = ws_dialer_get_proto,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_dialer_set_header(
+ nni_ws_dialer *d, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ name += strlen(NNG_OPT_WS_REQUEST_HEADER);
+ if ((rv = ws_check_string(buf, sz, t)) == 0) {
+ nni_mtx_lock(&d->mtx);
+ rv = ws_set_header(&d->headers, name, buf);
+ nni_mtx_unlock(&d->mtx);
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ rv = nni_setopt(ws_dialer_options, name, d, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_client_setx(d->client, name, buf, sz, t);
+ }
+
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_REQUEST_HEADER)) {
+ rv = ws_dialer_set_header(d, name, buf, sz, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_dialer_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws_dialer *d = arg;
+ int rv;
+
+ rv = nni_getopt(ws_dialer_options, name, d, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_http_client_getx(d->client, name, buf, szp, t);
+ }
+ return (rv);
+}
+
+int
+nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
+{
+ nni_ws_dialer *d;
+ int rv;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&d->headers, ws_header, node);
+ NNI_LIST_INIT(&d->wspend, nni_ws, node);
+ nni_mtx_init(&d->mtx);
+ nni_cv_init(&d->cv, &d->mtx);
+
+ if ((rv = nng_url_clone(&d->url, url)) != 0) {
+ ws_dialer_free(d);
+ return (rv);
+ }
+
+ if ((rv = nni_http_client_init(&d->client, url)) != 0) {
+ ws_dialer_free(d);
+ return (rv);
+ }
+ d->isstream = true;
+ d->recvmax = WS_DEF_RECVMAX;
+ d->maxframe = WS_DEF_MAXRXFRAME;
+ d->fragsize = WS_DEF_MAXTXFRAME;
+
+ d->ops.sd_free = ws_dialer_free;
+ d->ops.sd_close = ws_dialer_close;
+ d->ops.sd_dial = ws_dialer_dial;
+ d->ops.sd_setx = ws_dialer_setx;
+ d->ops.sd_getx = ws_dialer_getx;
+ *dp = (void *) d;
+ return (0);
}
// Dialer does not get a hook chance, as it can examine the request
@@ -2130,3 +2504,293 @@ nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe)
// The implementation will send periodic PINGs, and respond with
// PONGs.
+
+static void
+ws_str_free(void *arg)
+{
+ nni_ws *ws = arg;
+ nni_reap(&ws->reap, ws_fini, ws);
+}
+
+static void
+ws_str_close(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
+}
+
+static void
+ws_str_send(void *arg, nni_aio *aio)
+{
+ nni_ws * ws = arg;
+ int rv;
+ ws_frame *frame;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ if (!ws->isstream) {
+ nni_msg *msg;
+ unsigned niov;
+ nni_iov iov[2];
+ if ((msg = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
+ return;
+ }
+ niov = 0;
+ if (nng_msg_header_len(msg) > 0) {
+ iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ niov++;
+ }
+ iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ niov++;
+
+ // Scribble into the iov for now.
+ nni_aio_set_iov(aio, niov, iov);
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ nni_aio_finish_error(aio, NNG_ENOMEM);
+ return;
+ }
+ frame->aio = aio;
+ if ((rv = ws_frame_prep_tx(ws, frame)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
+ return;
+ }
+
+ nni_mtx_lock(&ws->mtx);
+
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ ws_frame_fini(frame);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, rv);
+ ws_frame_fini(frame);
+ return;
+ }
+ nni_aio_set_prov_extra(aio, 0, frame);
+ nni_list_append(&ws->sendq, aio);
+ nni_list_append(&ws->txq, frame);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_str_recv(void *arg, nng_aio *aio)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&ws->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&ws->recvq, aio);
+ if (nni_list_first(&ws->recvq) == aio) {
+ ws_read_finish_msg(ws);
+ }
+ ws_start_read(ws);
+
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static int
+ws_get_request_headers(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->reqhdrs == NULL) {
+ ws->reqhdrs = nni_http_req_headers(ws->req);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return (nni_copyout_str(ws->reqhdrs, buf, szp, t));
+}
+
+static int
+ws_get_response_headers(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->reshdrs == NULL) {
+ ws->reshdrs = nni_http_res_headers(ws->res);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return (nni_copyout_str(ws->reshdrs, buf, szp, t));
+}
+
+static int
+ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t));
+}
+
+static const nni_option ws_options[] = {
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_get = ws_get_request_headers,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_get = ws_get_response_headers,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_URI,
+ .o_get = ws_get_request_uri,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+ws_str_setx(void *arg, const char *nm, const void *buf, size_t sz, nni_type t)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ // Headers can only be set.
+ nni_mtx_lock(&ws->mtx);
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ rv = nni_http_conn_setopt(ws->http, nm, buf, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_setopt(ws_options, nm, ws, buf, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER) ||
+ startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) {
+ return (NNG_EREADONLY);
+ }
+ }
+
+ return (rv);
+}
+
+static int
+ws_get_req_header(
+ nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ const char *s;
+ nm += strlen(NNG_OPT_WS_REQUEST_HEADER);
+ s = nni_http_req_get_header(ws->req, nm);
+ if (s == NULL) {
+ return (NNG_ENOENT);
+ }
+ return (nni_copyout_str(s, buf, szp, t));
+}
+
+static int
+ws_get_res_header(
+ nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ const char *s;
+ nm += strlen(NNG_OPT_WS_RESPONSE_HEADER);
+ s = nni_http_res_get_header(ws->res, nm);
+ if (s == NULL) {
+ return (NNG_ENOENT);
+ }
+ return (nni_copyout_str(s, buf, szp, t));
+}
+
+static int
+ws_str_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ nni_ws *ws = arg;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+ if (ws->closed) {
+ nni_mtx_unlock(&ws->mtx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ rv = nni_http_conn_getopt(ws->http, nm, buf, szp, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_getopt(ws_options, nm, ws, buf, szp, t);
+ }
+ // Check for generic headers...
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER)) {
+ rv = ws_get_req_header(ws, nm, buf, szp, t);
+ } else if (startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_get_res_header(ws, nm, buf, szp, t);
+ }
+ }
+ return (rv);
+}
+
+static int
+ws_check_size(const void *buf, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t));
+}
+
+static const nni_chkoption ws_chkopts[] = {
+ {
+ .o_name = NNG_OPT_WS_SENDMAXFRAME,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_WS_RECVMAXFRAME,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_check = ws_check_size,
+ },
+ {
+ .o_name = NNG_OPT_WS_PROTOCOL,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_WS_REQUEST_HEADERS,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NNG_OPT_WS_RESPONSE_HEADERS,
+ .o_check = ws_check_string,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+int
+nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ int rv;
+
+ rv = nni_chkopt(ws_chkopts, name, data, sz, t);
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_stream_checkopt("tcp", name, data, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ rv = nni_stream_checkopt("tls+tcp", name, data, sz, t);
+ }
+ if (rv == NNG_ENOTSUP) {
+ if (startswith(name, NNG_OPT_WS_REQUEST_HEADER) ||
+ startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) {
+ rv = ws_check_string(data, sz, t);
+ }
+ }
+ // Potentially, add checks for header options.
+ return (rv);
+}