diff options
Diffstat (limited to 'src/supplemental/websocket/websocket.c')
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 2066 |
1 files changed, 1365 insertions, 701 deletions
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 3d3a68cb..4cecf430 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -18,11 +18,25 @@ #include "supplemental/http/http_api.h" #include "supplemental/sha1/sha1.h" +#include <nng/transport/ws/websocket.h> + #include "websocket.h" +// This should be removed or handled differently in the future. +typedef int (*nni_ws_listen_hook)(void *, nng_http_req *, nng_http_res *); + +// We have chosen to be a bit more stringent in the size of the frames that +// we send, while we more generously allow larger incoming frames. These +// may be tuned by options. +#define WS_DEF_RECVMAX (1U << 20) // 1MB Message limit (message mode only) +#define WS_DEF_MAXRXFRAME (1U << 20) // 1MB Frame size (recv) +#define WS_DEF_MAXTXFRAME (1U << 16) // 64KB Frame size (send) + +// Alias for checking the prefix of a string. +#define startswith(s, t) (strncmp(s, t, strlen(t)) == 0) + // Pre-defined types for some prototypes. These are from other subsystems. typedef struct ws_frame ws_frame; -typedef struct ws_msg ws_msg; typedef struct ws_header { nni_list_node node; @@ -31,17 +45,20 @@ typedef struct ws_header { } ws_header; struct nni_ws { + nng_stream ops; nni_list_node node; nni_reap_item reap; bool server; bool closed; bool ready; bool wclose; + bool isstream; + bool inmsg; nni_mtx mtx; - nni_list txmsgs; - nni_list rxmsgs; nni_list sendq; nni_list recvq; + nni_list txq; + nni_list rxq; ws_frame * txframe; ws_frame * rxframe; nni_aio * txaio; // physical aios @@ -57,26 +74,31 @@ struct nni_ws { char * reshdrs; size_t maxframe; size_t fragsize; + size_t recvmax; // largest message size nni_ws_listener *listener; nni_ws_dialer * dialer; }; struct nni_ws_listener { - nni_http_server * server; - char * proto; - nni_mtx mtx; - nni_cv cv; - nni_list pend; - nni_list reply; - nni_list aios; - nni_url * url; - bool started; - bool closed; - nni_http_handler * handler; - nni_ws_listen_hook hookfn; - void * hookarg; - nni_list headers; // response headers - size_t maxframe; + nng_stream_listener ops; + nni_http_server * server; + char * proto; + nni_mtx mtx; + nni_cv cv; + nni_list pend; + nni_list reply; + nni_list aios; + nng_url * url; + bool started; + bool closed; + bool isstream; + nni_http_handler * handler; + nni_ws_listen_hook hookfn; + void * hookarg; + nni_list headers; // response headers + size_t maxframe; + size_t fragsize; + size_t recvmax; // largest message size }; // The dialer tracks user aios in two lists. The first list is for aios @@ -86,18 +108,21 @@ struct nni_ws_listener { // completion of an earlier connection. (We don't want to establish // requests when we already have connects negotiating.) struct nni_ws_dialer { - nni_http_req * req; - nni_http_res * res; - nni_http_client *client; - nni_mtx mtx; - nni_cv cv; - char * proto; - nni_url * url; - nni_list wspend; // ws structures still negotiating - bool closed; - nng_sockaddr sa; - nni_list headers; // request headers - size_t maxframe; + nng_stream_dialer ops; + nni_http_req * req; + nni_http_res * res; + nni_http_client * client; + nni_mtx mtx; + nni_cv cv; + char * proto; + nng_url * url; + nni_list wspend; // ws structures still negotiating + bool closed; + bool isstream; + nni_list headers; // request headers + size_t maxframe; + size_t fragsize; + size_t recvmax; }; typedef enum ws_type { @@ -133,16 +158,7 @@ struct ws_frame { bool masked; size_t bufsz; // allocated size uint8_t * buf; - ws_msg * wmsg; -}; - -struct ws_msg { - nni_list frames; - nni_list_node node; - nni_ws * ws; - nni_aio * aio; - uint8_t * buf; - size_t bufsz; + nng_aio * aio; }; static void ws_send_close(nni_ws *ws, uint16_t code); @@ -150,6 +166,127 @@ static void ws_conn_cb(void *); static void ws_close_cb(void *); static void ws_read_cb(void *); static void ws_write_cb(void *); +static void ws_close_error(nni_ws *ws, uint16_t code); + +static void ws_str_free(void *); +static void ws_str_close(void *); +static void ws_str_send(void *, nng_aio *); +static void ws_str_recv(void *, nng_aio *); +static int ws_str_getx(void *, const char *, void *, size_t *, nni_type); +static int ws_str_setx(void *, const char *, const void *, size_t, nni_type); + +static void ws_listener_close(void *); +static void ws_listener_free(void *); + +static int +ws_check_string(const void *v, size_t sz, nni_opt_type t) +{ + if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { + return (NNG_EBADTYPE); + } + if (nni_strnlen(v, sz) >= sz) { + return (NNG_EINVAL); + } + return (0); +} + +static int +ws_set_header_ext(nni_list *l, const char *n, const char *v, bool strip_dups) +{ + ws_header *hdr; + char * nv; + + if ((nv = nni_strdup(v)) == NULL) { + return (NNG_ENOMEM); + } + + if (strip_dups) { + NNI_LIST_FOREACH (l, hdr) { + if (nni_strcasecmp(hdr->name, n) == 0) { + nni_strfree(hdr->value); + hdr->value = nv; + return (0); + } + } + } + + if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) { + nni_strfree(nv); + return (NNG_ENOMEM); + } + if ((hdr->name = nni_strdup(n)) == NULL) { + nni_strfree(nv); + NNI_FREE_STRUCT(hdr); + return (NNG_ENOMEM); + } + hdr->value = nv; + nni_list_append(l, hdr); + return (0); +} + +static int +ws_set_header(nni_list *l, const char *n, const char *v) +{ + return (ws_set_header_ext(l, n, v, true)); +} + +static int +ws_set_headers(nni_list *l, const char *str) +{ + char * dupstr; + size_t duplen; + char * n; + char * v; + char * nl; + int rv; + + if ((dupstr = nni_strdup(str)) == NULL) { + return (NNG_ENOMEM); + } + duplen = strlen(dupstr) + 1; // so we can free it later + + n = dupstr; + for (;;) { + if ((v = strchr(n, ':')) == NULL) { + // Note that this also means that if + // a bare word is present, we ignore it. + break; + } + *v = '\0'; + v++; + while (*v == ' ') { + // Skip leading whitespace. Not strictly + // necessary, but still a good idea. + v++; + } + nl = v; + // Find the end of the line -- should be CRLF, but can + // also be unterminated or just LF if user + while ((*nl != '\0') && (*nl != '\r') && (*nl != '\n')) { + nl++; + } + while ((*nl == '\r') || (*nl == '\n')) { + *nl = '\0'; + nl++; + } + + // Note that this can lead to a partial failure. As this + // is most likely ENOMEM, don't worry too much about it. + // This method does *not* eliminate duplicates. + if ((rv = ws_set_header_ext(l, n, v, false)) != 0) { + goto done; + } + + // Advance to the next name. + n = nl; + } + + rv = 0; + +done: + nni_free(dupstr, duplen); + return (rv); +} // This looks, case independently for a word in a list, which is either // space or comma separated. @@ -204,30 +341,13 @@ ws_make_accept(const char *key, char *accept) static void ws_frame_fini(ws_frame *frame) { - if (frame->bufsz) { + if (frame->bufsz != 0) { nni_free(frame->buf, frame->bufsz); } NNI_FREE_STRUCT(frame); } static void -ws_msg_fini(ws_msg *wm) -{ - ws_frame *frame; - - NNI_ASSERT(!nni_list_node_active(&wm->node)); - while ((frame = nni_list_first(&wm->frames)) != NULL) { - nni_list_remove(&wm->frames, frame); - ws_frame_fini(frame); - } - if (wm->bufsz != 0) { - nni_free(wm->buf, wm->bufsz); - } - - NNI_FREE_STRUCT(wm); -} - -static void ws_mask_frame(ws_frame *frame) { uint32_t r; @@ -263,31 +383,19 @@ ws_unmask_frame(ws_frame *frame) static int ws_msg_init_control( - ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len) + ws_frame **framep, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len) { - ws_msg * wm; ws_frame *frame; if (len > 125) { return (NNG_EINVAL); } - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); - } - wm->buf = NULL; - wm->bufsz = 0; - if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - ws_msg_fini(wm); return (NNG_ENOMEM); } - NNI_LIST_INIT(&wm->frames, ws_frame, node); memcpy(frame->sdata, buf, len); - - nni_list_append(&wm->frames, frame); - frame->wmsg = wm; frame->len = len; frame->final = true; frame->op = op; @@ -303,114 +411,100 @@ ws_msg_init_control( ws_mask_frame(frame); } - wm->aio = NULL; - wm->ws = ws; - *wmp = wm; + *framep = frame; return (0); } static int -ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) +ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) { - ws_msg * wm; + nng_aio *aio = frame->aio; + nni_iov *iov; + unsigned niov; size_t len; - size_t maxfrag = ws->fragsize; // make this tunable. (1MB default) uint8_t *buf; - uint8_t op; - - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&wm->frames, ws_frame, node); - - len = nni_msg_len(msg) + nni_msg_header_len(msg); - wm->bufsz = len; - if ((wm->buf = nni_alloc(len)) == NULL) { - NNI_FREE_STRUCT(wm); - return (NNG_ENOMEM); - } - buf = wm->buf; - memcpy(buf, nni_msg_header(msg), nni_msg_header_len(msg)); - memcpy(buf + nni_msg_header_len(msg), nni_msg_body(msg), - nni_msg_len(msg)); - - op = WS_BINARY; // to start -- no support for sending TEXT frames - - // do ... while because we want at least one frame (even for empty - // messages.) Headers get their own frame, if present. Best bet - // is to try not to have a header when coming here. - do { - ws_frame *frame; - if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - ws_msg_fini(wm); + // Figure out how much we need for the entire aio. + frame->len = 0; + nni_aio_get_iov(aio, &niov, &iov); + for (unsigned i = 0; i < niov; i++) { + frame->len += iov[i].iov_len; + } + + if ((frame->len > ws->fragsize) && (ws->fragsize > 0)) { + // Limit it to a single frame per policy (fragsize), as needed. + frame->len = ws->fragsize; + // For stream mode, we constrain ourselves to one frame + // per message. Submitter may see a partial transmit, and + // should resubmit as needed. For message mode, we will + // continue to resubmit. + frame->final = ws->isstream ? true : false; + } else { + // It all fits in this frame (which might not be the first), + // so we're done. + frame->final = true; + } + // Potentially allocate space for the data if we need to. + // Note that an empty message is legal. + if ((frame->bufsz < frame->len) && (frame->len > 0)) { + frame->buf = nni_alloc(frame->len); + if (frame->buf == NULL) { return (NNG_ENOMEM); } - nni_list_append(&wm->frames, frame); - frame->wmsg = wm; - frame->len = len > maxfrag ? maxfrag : len; - frame->buf = buf; - frame->op = op; - - buf += frame->len; - len -= frame->len; - op = WS_CONT; - - if (len == 0) { - frame->final = true; - } - frame->head[0] = frame->op; - frame->hlen = 2; - if (frame->final) { - frame->head[0] |= 0x80; // final frame bit - } - if (frame->len < 126) { - frame->head[1] = frame->len & 0x7f; - } else if (frame->len < 65536) { - frame->head[1] = 126; - NNI_PUT16(frame->head + 2, (frame->len & 0xffff)); - frame->hlen += 2; - } else { - frame->head[1] = 127; - NNI_PUT64(frame->head + 2, (uint64_t) frame->len); - frame->hlen += 8; - } + } + buf = frame->buf; - if (ws->server) { - frame->masked = false; - } else { - ws_mask_frame(frame); + // Now copy the data into the frame. + len = frame->len; + while (len != 0) { + size_t n = len; + if (n > iov->iov_len) { + n = iov->iov_len; } + memcpy(buf, iov->iov_buf, n); + iov++; + len -= n; + buf += n; + } - } while (len); - - wm->aio = aio; - wm->ws = ws; - *wmp = wm; - return (0); -} + if (nni_aio_count(aio) == 0) { + // This is the first frame. + frame->op = WS_BINARY; + } else { + frame->op = WS_CONT; + } -static int -ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio) -{ - ws_msg *wm; + // Populate the frame header. + frame->head[0] = frame->op; + frame->hlen = 2; + if (frame->final) { + frame->head[0] |= 0x80; // final frame bit + } + if (frame->len < 126) { + frame->head[1] = frame->len & 0x7f; + } else if (frame->len < 65536) { + frame->head[1] = 126; + NNI_PUT16(frame->head + 2, (frame->len & 0xffff)); + frame->hlen += 2; + } else { + frame->head[1] = 127; + NNI_PUT64(frame->head + 2, (uint64_t) frame->len); + frame->hlen += 8; + } - if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { - return (NNG_ENOMEM); + // If we are on the client, then we need to mask the frame. + frame->masked = false; + if (!ws->server) { + ws_mask_frame(frame); } - NNI_LIST_INIT(&wm->frames, ws_frame, node); - wm->aio = aio; - wm->ws = ws; - *wmp = wm; return (0); } static void ws_close_cb(void *arg) { - nni_ws * ws = arg; - ws_msg * wm; - nni_aio *aio; + nni_ws * ws = arg; + ws_frame *frame; nni_aio_close(ws->txaio); nni_aio_close(ws->rxaio); @@ -422,31 +516,13 @@ ws_close_cb(void *arg) nni_http_conn_close(ws->http); - // This list (receive) should be empty. - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - ws_msg_fini(wm); - } - - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + if (frame->aio != NULL) { + nni_aio_list_remove(frame->aio); + nni_aio_finish_error(frame->aio, NNG_ECLOSED); } - ws_msg_fini(wm); - } - ws->txframe = NULL; - - if (ws->rxframe != NULL) { - ws_frame_fini(ws->rxframe); - ws->rxframe = NULL; + ws_frame_fini(frame); } // Any txframe should have been killed with its wmsg. @@ -456,19 +532,13 @@ ws_close_cb(void *arg) static void ws_close(nni_ws *ws, uint16_t code) { - ws_msg *wm; + nng_aio *aio; // Receive stuff gets aborted always. No further receives // once we get a close. - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_aio *aio; - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((aio = nni_list_first(&ws->recvq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } // If were closing "gracefully", then don't abort in-flight @@ -487,7 +557,6 @@ static void ws_start_write(nni_ws *ws) { ws_frame *frame; - ws_msg * wm; nni_iov iov[2]; int niov; @@ -495,13 +564,11 @@ ws_start_write(nni_ws *ws) return; // busy } - if ((wm = nni_list_first(&ws->txmsgs)) == NULL) { - // Nothing to send. - return; + if ((frame = nni_list_first(&ws->txq)) == NULL) { + return; // nothing to send } - - frame = nni_list_first(&wm->frames); NNI_ASSERT(frame != NULL); + nni_list_remove(&ws->txq, frame); // Push it out. ws->txframe = frame; @@ -534,7 +601,6 @@ ws_write_cb(void *arg) { nni_ws * ws = arg; ws_frame *frame; - ws_msg * wm; nni_aio * aio; int rv; @@ -545,17 +611,20 @@ ws_write_cb(void *arg) return; } ws->txframe = NULL; + if (frame->op == WS_CLOSE) { // If this was a close frame, we are done. // No other messages may succeed.. - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; + ws->txframe = NULL; + ws_frame_fini(frame); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + if ((aio = frame->aio) != NULL) { + frame->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); + ws_frame_fini(frame); } - ws_msg_fini(wm); } if (ws->wclose) { ws->wclose = false; @@ -565,54 +634,55 @@ ws_write_cb(void *arg) return; } - wm = frame->wmsg; - aio = wm->aio; - + aio = frame->aio; if ((rv = nni_aio_result(ws->txaio)) != 0) { - - nni_list_remove(&ws->txmsgs, wm); - ws_msg_fini(wm); + frame->aio = NULL; if (aio != NULL) { - wm->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } - + ws_frame_fini(frame); ws->closed = true; nni_http_conn_close(ws->http); nni_mtx_unlock(&ws->mtx); return; } - // good frame, was it the last - nni_list_remove(&wm->frames, frame); - ws_frame_fini(frame); - - // if we still have more frames to transmit, then schedule it. - if (!nni_list_empty(&wm->frames)) { - ws_start_write(ws); - nni_mtx_unlock(&ws->mtx); - return; + if (aio != NULL) { + nni_aio_iov_advance(aio, frame->len); + nni_aio_bump_count(aio, frame->len); + if (frame->final) { + frame->aio = NULL; + nni_aio_list_remove(aio); + } else { + // Clear the aio so that we won't attempt to finish + // it outside the lock + aio = NULL; + } } - if (aio != NULL) { - wm->aio = NULL; - nni_aio_list_remove(aio); + if (frame->final) { + ws_frame_fini(frame); + } else { + // This one cannot fail here, since we only do allocation + // at initial scheduling. + ws_frame_prep_tx(ws, frame); + // Schedule at end. This permits other frames to interleave. + nni_list_append(&ws->txq, frame); } - nni_list_remove(&ws->txmsgs, wm); - // Write the next frame. ws_start_write(ws); nni_mtx_unlock(&ws->mtx); - // discard while not holding lock (just deallocations) - ws_msg_fini(wm); - + // We attempt to finish the operation synchronously, outside the lock. if (aio != NULL) { - nng_msg *msg = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); - nni_msg_free(msg); + nni_msg *msg; + // Successful send, don't leak the message! + if ((msg = nni_aio_get_msg(aio)) != NULL) { + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + } + nni_aio_finish_synch(aio, 0, nni_aio_count(aio)); } } @@ -620,7 +690,6 @@ static void ws_write_cancel(nni_aio *aio, void *arg, int rv) { nni_ws * ws = arg; - ws_msg * wm; ws_frame *frame; // Is this aio active? We can tell by looking at the active tx frame. @@ -630,17 +699,17 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&ws->mtx); return; } - wm = nni_aio_get_prov_extra(aio, 0); - if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) { + frame = nni_aio_get_prov_extra(aio, 0); + if (frame == ws->txframe) { nni_aio_abort(ws->txaio, rv); // We will wait for callback on the txaio to finish aio. - } else if (nni_list_active(&ws->txmsgs, wm)) { + } else { // If scheduled, just need to remove node and complete it. - nni_list_remove(&ws->txmsgs, wm); - wm->aio = NULL; + nni_list_remove(&ws->txq, frame); + frame->aio = NULL; nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); + ws_frame_fini(frame); } nni_mtx_unlock(&ws->mtx); } @@ -648,10 +717,10 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv) static void ws_send_close(nni_ws *ws, uint16_t code) { - ws_msg * wm; - uint8_t buf[sizeof(uint16_t)]; - int rv; - nni_aio *aio; + ws_frame *frame; + uint8_t buf[sizeof(uint16_t)]; + int rv; + nni_aio * aio; NNI_PUT16(buf, code); @@ -665,125 +734,45 @@ ws_send_close(nni_ws *ws, uint16_t code) return; } ws->wclose = true; - rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf)); + rv = ws_msg_init_control(&frame, ws, WS_CLOSE, buf, sizeof(buf)); if (rv != 0) { ws->wclose = false; nni_aio_finish_error(aio, rv); return; } - // Close frames get priority! if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) { ws->wclose = false; nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); return; } - nni_list_prepend(&ws->txmsgs, wm); + // This gets inserted at the head. + nni_list_prepend(&ws->txq, frame); ws_start_write(ws); } static void ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len) { - ws_msg *wm; + ws_frame *frame; // Note that we do not care if this works or not. So no AIO needed. if ((ws->closed) || - (ws_msg_init_control(&wm, ws, op, buf, len) != 0)) { + (ws_msg_init_control(&frame, ws, op, buf, len) != 0)) { return; } // Control frames at head of list. (Note that this may preempt // the close frame or other ping/pong requests. Oh well.) - nni_list_prepend(&ws->txmsgs, wm); + nni_list_prepend(&ws->txq, frame); ws_start_write(ws); } -static const nni_option ws_options[] = { - { - .o_name = NULL, - }, -}; - -int -nni_ws_getopt(nni_ws *ws, const char *name, void *buf, size_t *szp, nni_type t) -{ - int rv; - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - return (NNG_ECLOSED); - } - rv = nni_http_conn_getopt(ws->http, name, buf, szp, t); - if (rv == NNG_ENOTSUP) { - rv = nni_getopt(ws_options, name, ws, buf, szp, t); - } - nni_mtx_unlock(&ws->mtx); - return (rv); -} - -int -nni_ws_setopt( - nni_ws *ws, const char *name, const void *buf, size_t sz, nni_type t) -{ - int rv; - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - return (NNG_ECLOSED); - } - rv = nni_http_conn_setopt(ws->http, name, buf, sz, t); - if (rv == NNG_ENOTSUP) { - rv = nni_setopt(ws_options, name, ws, buf, sz, t); - } - nni_mtx_unlock(&ws->mtx); - return (rv); -} - -void -nni_ws_send_msg(nni_ws *ws, nni_aio *aio) -{ - ws_msg * wm; - nni_msg *msg; - int rv; - - msg = nni_aio_get_msg(aio); - - if (nni_aio_begin(aio) != 0) { - return; - } - if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - - nni_mtx_lock(&ws->mtx); - if (ws->closed) { - nni_mtx_unlock(&ws->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - ws_msg_fini(wm); - return; - } - if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) { - nni_mtx_unlock(&ws->mtx); - nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); - return; - } - nni_aio_set_prov_extra(aio, 0, wm); - nni_list_append(&ws->sendq, aio); - nni_list_append(&ws->txmsgs, wm); - ws_start_write(ws); - nni_mtx_unlock(&ws->mtx); -} - static void ws_start_read(nni_ws *ws) { ws_frame *frame; - ws_msg * wm; nni_aio * aio; nni_iov iov; @@ -791,18 +780,18 @@ ws_start_read(nni_ws *ws) return; // already reading or closed } - if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) { - return; // no body expecting a message. + // If nobody is waiting for recv, and we already have a data + // frame, stop reading. This keeps us from buffering infinitely. + if (nni_list_empty(&ws->recvq) && !nni_list_empty(&ws->rxq)) { + return; } if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if ((aio = wm->aio) != NULL) { - wm->aio = NULL; + if ((aio = nni_list_first(&ws->recvq)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ENOMEM); } - ws_msg_fini(wm); + ws_close(ws, WS_CLOSE_INTERNAL); return; } @@ -820,34 +809,143 @@ ws_start_read(nni_ws *ws) } static void -ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop) +ws_read_finish_str(nni_ws *ws) { - ws_msg *wm = nni_list_first(&ws->rxmsgs); + for (;;) { + nni_aio * aio; + nni_iov * iov; + unsigned niov; + ws_frame *frame; - switch (frame->op) { - case WS_CONT: - if (wm == NULL) { - ws_close(ws, WS_CLOSE_GOING_AWAY); + if ((aio = nni_list_first(&ws->recvq)) == NULL) { return; } - if (nni_list_empty(&wm->frames)) { + + if ((frame = nni_list_first(&ws->rxq)) == NULL) { + return; + } + + // Discard 0 length frames -- in stream mode they are not used. + if (frame->len == 0) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); + continue; + } + + // We are completing this aio one way or the other. + nni_aio_list_remove(aio); + nni_aio_get_iov(aio, &niov, &iov); + + while ((frame != NULL) && (niov != 0)) { + size_t n; + + if ((n = frame->len) > iov->iov_len) { + // This eats the entire iov. + n = iov->iov_len; + } + iov->iov_buf = ((uint8_t *) iov->iov_buf) + n; + iov->iov_len -= n; + if (iov->iov_len == 0) { + iov++; + niov--; + } + + if (frame->len == n) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); + frame = nni_list_first(&ws->rxq); + } else { + frame->len -= n; + frame->buf += n; + } + + nni_aio_bump_count(aio, n); + } + + nni_aio_finish(aio, 0, nni_aio_count(aio)); + } +} + +static void +ws_read_finish_msg(nni_ws *ws) +{ + nni_aio * aio; + size_t len; + ws_frame *frame; + nni_msg * msg; + int rv; + uint8_t * body; + + // If we have no data, no waiter, or have not received the complete + // message yet, then there is nothing to do. + if (ws->inmsg || nni_list_empty(&ws->rxq) || + ((aio = nni_list_first(&ws->recvq)) == NULL)) { + return; + } + + // At this point, we have both a complete message in the queue (and + // there should not be any frames other than the for the message), + // and a waiting reader. + len = 0; + NNI_LIST_FOREACH (&ws->rxq, frame) { + len += frame->len; + } + + nni_aio_list_remove(aio); + + if ((rv = nni_msg_alloc(&msg, len)) != 0) { + nni_aio_finish_error(aio, rv); + ws_close_error(ws, WS_CLOSE_INTERNAL); + return; + } + body = nni_msg_body(msg); + while ((frame = nni_list_first(&ws->rxq)) != NULL) { + nni_list_remove(&ws->rxq, frame); + memcpy(body, frame->buf, frame->len); + body += frame->len; + ws_frame_fini(frame); + } + + nni_aio_set_msg(aio, msg); + nni_aio_bump_count(aio, nni_msg_len(msg)); + nni_aio_finish(aio, 0, nni_msg_len(msg)); +} + +static void +ws_read_finish(nni_ws *ws) +{ + if (ws->isstream) { + ws_read_finish_str(ws); + } else { + ws_read_finish_msg(ws); + } +} + +static void +ws_read_frame_cb(nni_ws *ws, ws_frame *frame) +{ + switch (frame->op) { + case WS_CONT: + if (!ws->inmsg) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); return; } + if (frame->final) { + ws->inmsg = false; + } ws->rxframe = NULL; - nni_list_append(&wm->frames, frame); + nni_list_append(&ws->rxq, frame); break; case WS_BINARY: - if (wm == NULL) { - ws_close(ws, WS_CLOSE_GOING_AWAY); - return; - } - if (!nni_list_empty(&wm->frames)) { + if (ws->inmsg) { ws_close(ws, WS_CLOSE_PROTOCOL_ERR); return; } + if (!frame->final) { + ws->inmsg = true; + } ws->rxframe = NULL; - nni_list_append(&wm->frames, frame); + nni_list_append(&ws->rxq, frame); break; case WS_TEXT: // No support for text mode at present. @@ -880,23 +978,13 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame, ws_msg **wmp, nni_aio **aiop) return; } - // If this was the last (final) frame, then complete it. But - // we have to look at the msg, since we might have got a - // control frame. - if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) { - nni_list_remove(&ws->rxmsgs, wm); - *wmp = wm; - *aiop = wm->aio; - nni_aio_list_remove(wm->aio); - wm->aio = NULL; - } + ws_read_finish(ws); } static void ws_read_cb(void *arg) { - nni_ws * ws = arg; - ws_msg * wm; + nni_ws * ws = arg; nni_aio * aio = ws->rxaio; ws_frame *frame; int rv; @@ -976,6 +1064,21 @@ ws_read_cb(void *arg) nni_mtx_unlock(&ws->mtx); return; } + // For message mode, also check to make sure that the overall + // length of the message has not exceeded our recvmax. + // (Protect against an infinite stream of small messages!) + if ((!ws->isstream) && (ws->recvmax > 0)) { + size_t totlen = frame->len; + ws_frame *fr2; + NNI_LIST_FOREACH (&ws->rxq, fr2) { + totlen += fr2->len; + } + if (totlen > ws->recvmax) { + ws_close(ws, WS_CLOSE_TOO_BIG); + nni_mtx_unlock(&ws->mtx); + return; + } + } // Check for masking. (We don't actually do the unmask // here, because we don't have data yet.) @@ -1023,134 +1126,40 @@ ws_read_cb(void *arg) // At this point, we have a complete frame. ws_unmask_frame(frame); // idempotent - wm = NULL; - aio = NULL; - ws_read_frame_cb(ws, frame, &wm, &aio); + ws_read_frame_cb(ws, frame); ws_start_read(ws); nni_mtx_unlock(&ws->mtx); - - // Got a good message, so we have to do the work to send it up. - if (wm != NULL) { - size_t len = 0; - nni_msg *msg; - uint8_t *body; - int rv; - - NNI_LIST_FOREACH (&wm->frames, frame) { - len += frame->len; - } - if ((rv = nni_msg_alloc(&msg, len)) != 0) { - nni_aio_finish_error(aio, rv); - ws_msg_fini(wm); - nni_ws_close_error(ws, WS_CLOSE_INTERNAL); - return; - } - body = nni_msg_body(msg); - NNI_LIST_FOREACH (&wm->frames, frame) { - memcpy(body, frame->buf, frame->len); - body += frame->len; - } - nni_aio_set_msg(aio, msg); - nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); - ws_msg_fini(wm); - } } static void ws_read_cancel(nni_aio *aio, void *arg, int rv) { nni_ws *ws = arg; - ws_msg *wm; nni_mtx_lock(&ws->mtx); - if (!nni_aio_list_active(aio)) { - nni_mtx_unlock(&ws->mtx); - return; - } - wm = nni_aio_get_prov_extra(aio, 0); - if (wm == nni_list_first(&ws->rxmsgs)) { - // Cancellation will percolate back up. - nni_aio_abort(ws->rxaio, rv); - } else if (nni_list_active(&ws->rxmsgs, wm)) { - nni_list_remove(&ws->rxmsgs, wm); - ws_msg_fini(wm); + if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ws->mtx); } -void -nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) -{ - ws_msg *wm; - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) { - nni_aio_finish_error(aio, rv); - return; - } - nni_mtx_lock(&ws->mtx); - if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) { - nni_mtx_unlock(&ws->mtx); - ws_msg_fini(wm); - nni_aio_finish_error(aio, rv); - return; - } - nni_aio_set_prov_extra(aio, 0, wm); - nni_list_append(&ws->recvq, aio); - nni_list_append(&ws->rxmsgs, wm); - ws_start_read(ws); - - nni_mtx_unlock(&ws->mtx); -} - -void -nni_ws_close_error(nni_ws *ws, uint16_t code) +static void +ws_close_error(nni_ws *ws, uint16_t code) { nni_mtx_lock(&ws->mtx); ws_close(ws, code); nni_mtx_unlock(&ws->mtx); } -void -nni_ws_close(nni_ws *ws) -{ - nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); -} - -const char * -nni_ws_request_headers(nni_ws *ws) -{ - nni_mtx_lock(&ws->mtx); - if (ws->reqhdrs == NULL) { - ws->reqhdrs = nni_http_req_headers(ws->req); - } - nni_mtx_unlock(&ws->mtx); - return (ws->reqhdrs); -} - -const char * -nni_ws_response_headers(nni_ws *ws) -{ - nni_mtx_lock(&ws->mtx); - if (ws->reshdrs == NULL) { - ws->reshdrs = nni_http_res_headers(ws->res); - } - nni_mtx_unlock(&ws->mtx); - return (ws->reshdrs); -} - static void ws_fini(void *arg) { - nni_ws *ws = arg; - ws_msg *wm; + nni_ws * ws = arg; + ws_frame *frame; + nng_aio * aio; - nni_ws_close(ws); + ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); // Give a chance for the close frame to drain. if (ws->closeaio) { @@ -1175,25 +1184,29 @@ ws_fini(void *arg) } nni_mtx_lock(&ws->mtx); - while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { - nni_list_remove(&ws->rxmsgs, wm); - if (wm->aio) { - nni_aio_finish_error(wm->aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((frame = nni_list_first(&ws->rxq)) != NULL) { + nni_list_remove(&ws->rxq, frame); + ws_frame_fini(frame); } - while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { - nni_list_remove(&ws->txmsgs, wm); - if (wm->aio) { - nni_aio_finish_error(wm->aio, NNG_ECLOSED); - } - ws_msg_fini(wm); + while ((frame = nni_list_first(&ws->txq)) != NULL) { + nni_list_remove(&ws->txq, frame); + ws_frame_fini(frame); } - if (ws->rxframe) { + if (ws->rxframe != NULL) { ws_frame_fini(ws->rxframe); } + if (ws->txframe != NULL) { + ws_frame_fini(ws->txframe); + } + + while (((aio = nni_list_first(&ws->recvq)) != NULL) || + ((aio = nni_list_first(&ws->sendq)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); if (ws->http) { @@ -1217,8 +1230,8 @@ ws_fini(void *arg) NNI_FREE_STRUCT(ws); } -void -nni_ws_fini(nni_ws *ws) +static void +ws_reap(nni_ws *ws) { nni_reap(&ws->reap, ws_fini, ws); } @@ -1232,8 +1245,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio) nni_mtx_lock(&l->mtx); nni_list_remove(&l->reply, ws); if (nni_aio_result(aio) != 0) { - nni_ws_fini(ws); nni_mtx_unlock(&l->mtx); + ws_reap(ws); return; } ws->ready = true; @@ -1320,14 +1333,14 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) (!ws_contains_word(ptr, "upgrade")) || ((ptr = GETH("Upgrade")) == NULL) || (strcmp(ptr, "websocket") != 0)) { - nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); rv = NNG_EPROTO; goto err; } if (d->proto != NULL) { if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) || (!ws_contains_word(d->proto, ptr))) { - nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); rv = NNG_EPROTO; goto err; } @@ -1358,7 +1371,7 @@ err: } nni_mtx_unlock(&d->mtx); - nni_ws_fini(ws); + ws_reap(ws); } static void @@ -1384,8 +1397,8 @@ ws_init(nni_ws **wsp) return (NNG_ENOMEM); } nni_mtx_init(&ws->mtx); - NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node); - NNI_LIST_INIT(&ws->txmsgs, ws_msg, node); + NNI_LIST_INIT(&ws->rxq, ws_frame, node); + NNI_LIST_INIT(&ws->txq, ws_frame, node); nni_aio_list_init(&ws->sendq); nni_aio_list_init(&ws->recvq); @@ -1401,17 +1414,25 @@ ws_init(nni_ws **wsp) nni_aio_set_timeout(ws->closeaio, 100); nni_aio_set_timeout(ws->httpaio, 2000); + ws->ops.s_close = ws_str_close; + ws->ops.s_free = ws_str_free; + ws->ops.s_send = ws_str_send; + ws->ops.s_recv = ws_str_recv; + ws->ops.s_getx = ws_str_getx; + ws->ops.s_setx = ws_str_setx; + ws->fragsize = 1 << 20; // we won't send a frame larger than this *wsp = ws; return (0); } -void -nni_ws_listener_fini(nni_ws_listener *l) +static void +ws_listener_free(void *arg) { - ws_header *hdr; + nni_ws_listener *l = arg; + ws_header * hdr; - nni_ws_listener_close(l); + ws_listener_close(l); nni_mtx_lock(&l->mtx); while (!nni_list_empty(&l->reply)) { @@ -1437,7 +1458,7 @@ nni_ws_listener_fini(nni_ws_listener *l) NNI_FREE_STRUCT(hdr); } if (l->url) { - nni_url_free(l->url); + nng_url_free(l->url); } NNI_FREE_STRUCT(l); } @@ -1456,6 +1477,7 @@ ws_handler(nni_aio *aio) uint16_t status; int rv; char key[29]; + ws_header * hdr; req = nni_aio_get_input(aio, 0); h = nni_aio_get_input(aio, 1); @@ -1542,6 +1564,20 @@ ws_handler(nni_aio *aio) goto err; } + // Set any user supplied headers. This is better than using a hook + // for most things, because it is loads easier. + NNI_LIST_FOREACH (&l->headers, hdr) { + if (SETH(hdr->name, hdr->value) != 0) { + status = NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_free(res); + goto err; + } + } + + // The hook function gives us the ability to intercept the HTTP + // response altogether. Its best not to do this unless you really + // need to, because it's much more complex. But if you want to set + // up an HTTP Authorization handler this might be the only choice. if (l->hookfn != NULL) { rv = l->hookfn(l->hookarg, req, res); if (rv != 0) { @@ -1583,8 +1619,9 @@ ws_handler(nni_aio *aio) ws->res = res; ws->server = true; ws->maxframe = l->maxframe; - - // XXX: Inherit fragmentation? (Frag is limited for now). + ws->fragsize = l->fragsize; + ws->recvmax = l->recvmax; + ws->isstream = l->isstream; nni_list_append(&l->reply, ws); nni_aio_set_data(ws->httpaio, 0, l); @@ -1603,71 +1640,6 @@ err: } } -int -nni_ws_listener_init(nni_ws_listener **wslp, nni_url *url) -{ - nni_ws_listener *l; - int rv; - char * host; - - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&l->mtx); - nni_cv_init(&l->cv, &l->mtx); - nni_aio_list_init(&l->aios); - - NNI_LIST_INIT(&l->pend, nni_ws, node); - NNI_LIST_INIT(&l->reply, nni_ws, node); - - // make a private copy of the url structure. - if ((rv = nni_url_clone(&l->url, url)) != 0) { - nni_ws_listener_fini(l); - return (rv); - } - - host = l->url->u_hostname; - if (strlen(host) == 0) { - host = NULL; - } - rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler); - if (rv != 0) { - nni_ws_listener_fini(l); - return (rv); - } - - if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) || - ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) || - ((rv = nni_http_server_init(&l->server, url)) != 0)) { - nni_ws_listener_fini(l); - return (rv); - } - - l->maxframe = 0; - *wslp = l; - return (0); -} - -int -nni_ws_listener_proto(nni_ws_listener *l, const char *proto) -{ - int rv = 0; - char *ns; - nni_mtx_lock(&l->mtx); - if (l->started) { - rv = NNG_EBUSY; - } else if ((ns = nni_strdup(proto)) == NULL) { - rv = NNG_ENOMEM; - } else { - if (l->proto != NULL) { - nni_strfree(l->proto); - } - l->proto = ns; - } - nni_mtx_unlock(&l->mtx); - return (rv); -} - static void ws_accept_cancel(nni_aio *aio, void *arg, int rv) { @@ -1681,11 +1653,12 @@ ws_accept_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&l->mtx); } -void -nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) +static void +ws_listener_accept(void *arg, nni_aio *aio) { - nni_ws *ws; - int rv; + nni_ws_listener *l = arg; + nni_ws * ws; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -1717,10 +1690,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) nni_mtx_unlock(&l->mtx); } -void -nni_ws_listener_close(nni_ws_listener *l) +static void +ws_listener_close(void *arg) { - nni_ws *ws; + nni_ws_listener *l = arg; + nni_ws * ws; nni_mtx_lock(&l->mtx); if (l->closed) { nni_mtx_unlock(&l->mtx); @@ -1733,18 +1707,30 @@ nni_ws_listener_close(nni_ws_listener *l) l->started = false; } NNI_LIST_FOREACH (&l->pend, ws) { - nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + ws_close_error(ws, WS_CLOSE_GOING_AWAY); } NNI_LIST_FOREACH (&l->reply, ws) { - nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + ws_close_error(ws, WS_CLOSE_GOING_AWAY); } nni_mtx_unlock(&l->mtx); } -int -nni_ws_listener_listen(nni_ws_listener *l) +// XXX: Consider replacing this with an option. +void +nni_ws_listener_hook( + nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg) { - int rv; + nni_mtx_lock(&l->mtx); + l->hookfn = hookfn; + l->hookarg = hookarg; + nni_mtx_unlock(&l->mtx); +} + +static int +ws_listener_listen(void *arg) +{ + nni_ws_listener *l = arg; + int rv; nni_mtx_lock(&l->mtx); if (l->closed) { @@ -1777,42 +1763,290 @@ nni_ws_listener_listen(nni_ws_listener *l) return (0); } -void -nni_ws_listener_hook( - nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg) +static int +ws_listener_set_size( + nni_ws_listener *l, size_t *valp, const void *buf, size_t sz, nni_type t) { - nni_mtx_lock(&l->mtx); - l->hookfn = hookfn; - l->hookarg = hookarg; - nni_mtx_unlock(&l->mtx); + size_t val; + int rv; + + // Max size is limited to 4 GB, but you really never want to have + // to have a larger value. If you think you need that, you're doing it + // wrong. You *can* set the size to 0 for unlimited. + if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&l->mtx); + *valp = val; + nni_mtx_unlock(&l->mtx); + } + return (rv); } -int -nni_ws_listener_set_tls(nni_ws_listener *l, nng_tls_config *tls) +static int +ws_listener_get_size( + nni_ws_listener *l, size_t *valp, void *buf, size_t *szp, nni_type t) { - int rv; + size_t val; nni_mtx_lock(&l->mtx); - rv = nni_http_server_set_tls(l->server, tls); + val = *valp; nni_mtx_unlock(&l->mtx); + return (nni_copyout_size(val, buf, szp, t)); +} + +static int +ws_listener_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->maxframe, buf, sz, t)); +} + +static int +ws_listener_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->maxframe, buf, szp, t)); +} + +static int +ws_listener_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->fragsize, buf, sz, t)); +} + +static int +ws_listener_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->fragsize, buf, szp, t)); +} + +static int +ws_listener_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_set_size(l, &l->recvmax, buf, sz, t)); +} + +static int +ws_listener_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + return (ws_listener_get_size(l, &l->recvmax, buf, szp, t)); +} + +static int +ws_listener_set_res_headers(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + rv = ws_set_headers(&l->headers, buf); + nni_mtx_unlock(&l->mtx); + } return (rv); } -int -nni_ws_listener_get_tls(nni_ws_listener *l, nng_tls_config **tlsp) +static int +ws_listener_set_proto(void *arg, const void *buf, size_t sz, nni_type t) { - int rv; + nni_ws_listener *l = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + char *ns; + if ((ns = nni_strdup(buf)) == NULL) { + rv = NNG_ENOMEM; + } else { + nni_mtx_lock(&l->mtx); + if (l->proto != NULL) { + nni_strfree(l->proto); + } + l->proto = ns; + nni_mtx_unlock(&l->mtx); + } + } + return (rv); +} + +static int +ws_listener_get_proto(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; nni_mtx_lock(&l->mtx); - rv = nni_http_server_get_tls(l->server, tlsp); + rv = nni_copyout_str(l->proto != NULL ? l->proto : "", buf, szp, t); nni_mtx_unlock(&l->mtx); return (rv); } -void -nni_ws_listener_set_maxframe(nni_ws_listener *l, size_t maxframe) +static int +ws_listener_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) { + nni_ws_listener *l = arg; + int rv; + bool b; + + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + l->isstream = !b; + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_get_url(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + nni_mtx_lock(&l->mtx); - l->maxframe = maxframe; + rv = nni_copyout_str(l->url->u_rawurl, buf, szp, t); nni_mtx_unlock(&l->mtx); + return (rv); +} + +static const nni_option ws_listener_options[] = { + { + .o_name = NNI_OPT_WS_MSGMODE, + .o_set = ws_listener_set_msgmode, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_set = ws_listener_set_maxframe, + .o_get = ws_listener_get_maxframe, + }, + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_set = ws_listener_set_fragsize, + .o_get = ws_listener_get_fragsize, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_set = ws_listener_set_recvmax, + .o_get = ws_listener_get_recvmax, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_set = ws_listener_set_res_headers, + // XXX: Get not implemented yet; likely of marginal value. + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_set = ws_listener_set_proto, + .o_get = ws_listener_get_proto, + }, + { + .o_name = NNG_OPT_URL, + .o_get = ws_listener_get_url, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_listener_set_header(nni_ws_listener *l, const char *name, const void *buf, + size_t sz, nni_type t) +{ + int rv; + name += strlen(NNG_OPT_WS_RESPONSE_HEADER); + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&l->mtx); + rv = ws_set_header(&l->headers, name, buf); + nni_mtx_unlock(&l->mtx); + } + return (rv); +} + +static int +ws_listener_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + rv = nni_setopt(ws_listener_options, name, l, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_server_setx(l->server, name, buf, sz, t); + } + + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_listener_set_header(l, name, buf, sz, t); + } + } + return (rv); +} + +static int +ws_listener_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_ws_listener *l = arg; + int rv; + + rv = nni_getopt(ws_listener_options, name, l, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_server_getx(l->server, name, buf, szp, t); + } + return (rv); +} + +int +nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url) +{ + nni_ws_listener *l; + int rv; + char * host; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + nni_cv_init(&l->cv, &l->mtx); + nni_aio_list_init(&l->aios); + + NNI_LIST_INIT(&l->pend, nni_ws, node); + NNI_LIST_INIT(&l->reply, nni_ws, node); + + // make a private copy of the url structure. + if ((rv = nng_url_clone(&l->url, url)) != 0) { + ws_listener_free(l); + return (rv); + } + + host = l->url->u_hostname; + if (strlen(host) == 0) { + host = NULL; + } + rv = nni_http_handler_init(&l->handler, url->u_path, ws_handler); + if (rv != 0) { + ws_listener_free(l); + return (rv); + } + + if (((rv = nni_http_handler_set_host(l->handler, host)) != 0) || + ((rv = nni_http_handler_set_data(l->handler, l, 0)) != 0) || + ((rv = nni_http_server_init(&l->server, url)) != 0)) { + ws_listener_free(l); + return (rv); + } + + l->fragsize = WS_DEF_MAXTXFRAME; + l->maxframe = WS_DEF_MAXRXFRAME; + l->recvmax = WS_DEF_RECVMAX; + l->isstream = false; + l->ops.sl_free = ws_listener_free; + l->ops.sl_close = ws_listener_close; + l->ops.sl_accept = ws_listener_accept; + l->ops.sl_listen = ws_listener_listen; + l->ops.sl_setx = ws_listener_setx; + l->ops.sl_getx = ws_listener_getx; + *wslp = (void *) l; + return (0); } void @@ -1846,7 +2080,7 @@ ws_conn_cb(void *arg) nni_cv_wake(&d->cv); } nni_mtx_unlock(&d->mtx); - nni_ws_fini(ws); + ws_reap(ws); } else { nni_mtx_unlock(&d->mtx); } @@ -1861,7 +2095,7 @@ ws_conn_cb(void *arg) // This request was canceled for some reason. nni_http_conn_fini(http); nni_mtx_unlock(&ws->mtx); - nni_ws_fini(ws); + ws_reap(ws); return; } @@ -1908,13 +2142,14 @@ err: if (req != NULL) { nni_http_req_free(req); } - nni_ws_fini(ws); + ws_reap(ws); } -void -nni_ws_dialer_fini(nni_ws_dialer *d) +static void +ws_dialer_free(void *arg) { - ws_header *hdr; + nni_ws_dialer *d = arg; + ws_header * hdr; nni_mtx_lock(&d->mtx); while (!nni_list_empty(&d->wspend)) { @@ -1933,65 +2168,18 @@ nni_ws_dialer_fini(nni_ws_dialer *d) nni_http_client_fini(d->client); } if (d->url) { - nni_url_free(d->url); + nng_url_free(d->url); } nni_cv_fini(&d->cv); nni_mtx_fini(&d->mtx); NNI_FREE_STRUCT(d); } -int -nni_ws_dialer_init(nni_ws_dialer **dp, nni_url *url) -{ - nni_ws_dialer *d; - int rv; - - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { - return (NNG_ENOMEM); - } - NNI_LIST_INIT(&d->headers, ws_header, node); - NNI_LIST_INIT(&d->wspend, nni_ws, node); - nni_mtx_init(&d->mtx); - nni_cv_init(&d->cv, &d->mtx); - - if ((rv = nni_url_clone(&d->url, url)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - - if ((rv = nni_http_client_init(&d->client, url)) != 0) { - nni_ws_dialer_fini(d); - return (rv); - } - d->maxframe = 0; - *dp = d; - return (0); -} - -int -nni_ws_dialer_set_tls(nni_ws_dialer *d, nng_tls_config *tls) -{ - int rv; - nni_mtx_lock(&d->mtx); - rv = nni_http_client_set_tls(d->client, tls); - nni_mtx_unlock(&d->mtx); - return (rv); -} - -int -nni_ws_dialer_get_tls(nni_ws_dialer *d, nng_tls_config **tlsp) -{ - int rv; - nni_mtx_lock(&d->mtx); - rv = nni_http_client_get_tls(d->client, tlsp); - nni_mtx_unlock(&d->mtx); - return (rv); -} - -void -nni_ws_dialer_close(nni_ws_dialer *d) +static void +ws_dialer_close(void *arg) { - nni_ws *ws; + nni_ws_dialer *d = arg; + nni_ws * ws; nni_mtx_lock(&d->mtx); if (d->closed) { nni_mtx_unlock(&d->mtx); @@ -2005,24 +2193,6 @@ nni_ws_dialer_close(nni_ws_dialer *d) nni_mtx_unlock(&d->mtx); } -int -nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) -{ - int rv = 0; - char *ns; - nni_mtx_lock(&d->mtx); - if ((ns = nni_strdup(proto)) == NULL) { - rv = NNG_ENOMEM; - } else { - if (d->proto != NULL) { - nni_strfree(d->proto); - } - d->proto = ns; - } - nni_mtx_unlock(&d->mtx); - return (rv); -} - static void ws_dial_cancel(nni_aio *aio, void *arg, int rv) { @@ -2038,11 +2208,12 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&ws->mtx); } -void -nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) +static void +ws_dialer_dial(void *arg, nni_aio *aio) { - nni_ws *ws; - int rv; + nni_ws_dialer *d = arg; + nni_ws * ws; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -2055,72 +2226,275 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) if (d->closed) { nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); - ws_fini(ws); + ws_reap(ws); return; } if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) { nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); - ws_fini(ws); + ws_reap(ws); return; } ws->dialer = d; ws->useraio = aio; ws->server = false; ws->maxframe = d->maxframe; + ws->isstream = d->isstream; nni_list_append(&d->wspend, ws); nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); } static int -ws_set_header(nni_list *l, const char *n, const char *v) +ws_dialer_set_msgmode(void *arg, const void *buf, size_t sz, nni_type t) { - ws_header *hdr; - char * nv; + nni_ws_dialer *d = arg; + int rv; + bool b; - if ((nv = nni_strdup(v)) == NULL) { - return (NNG_ENOMEM); + if ((rv = nni_copyin_bool(&b, buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + d->isstream = !b; + nni_mtx_unlock(&d->mtx); } + return (rv); +} - NNI_LIST_FOREACH (l, hdr) { - if (nni_strcasecmp(hdr->name, n) == 0) { - nni_strfree(hdr->value); - hdr->value = nv; - return (0); - } - } +static int +ws_dialer_set_size( + nni_ws_dialer *d, size_t *valp, const void *buf, size_t sz, nni_type t) +{ + size_t val; + int rv; - if ((hdr = NNI_ALLOC_STRUCT(hdr)) == NULL) { - nni_strfree(nv); - return (NNG_ENOMEM); - } - if ((hdr->name = nni_strdup(n)) == NULL) { - nni_strfree(nv); - NNI_FREE_STRUCT(hdr); - return (NNG_ENOMEM); + // Max size is limited to 4 GB, but you really never want to have + // to have a larger value. If you think you need that, you're doing it + // wrong. You *can* set the size to 0 for unlimited. + if ((rv = nni_copyin_size(&val, buf, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&d->mtx); + *valp = val; + nni_mtx_unlock(&d->mtx); } - hdr->value = nv; - nni_list_append(l, hdr); - return (0); + return (rv); } -int -nni_ws_dialer_header(nni_ws_dialer *d, const char *n, const char *v) +static int +ws_dialer_get_size( + nni_ws_dialer *d, size_t *valp, void *buf, size_t *szp, nni_type t) { - int rv; + size_t val; nni_mtx_lock(&d->mtx); - rv = ws_set_header(&d->headers, n, v); + val = *valp; nni_mtx_unlock(&d->mtx); + return (nni_copyout_size(val, buf, szp, t)); +} + +static int +ws_dialer_set_maxframe(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->maxframe, buf, sz, t)); +} + +static int +ws_dialer_get_maxframe(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->maxframe, buf, szp, t)); +} + +static int +ws_dialer_set_fragsize(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->fragsize, buf, sz, t)); +} + +static int +ws_dialer_get_fragsize(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->fragsize, buf, szp, t)); +} + +static int +ws_dialer_set_recvmax(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_set_size(d, &d->recvmax, buf, sz, t)); +} + +static int +ws_dialer_get_recvmax(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + return (ws_dialer_get_size(d, &d->recvmax, buf, szp, t)); +} + +static int +ws_dialer_set_req_headers(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + rv = ws_set_headers(&d->headers, buf); + nni_mtx_unlock(&d->mtx); + } return (rv); } -void -nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe) +static int +ws_dialer_set_proto(void *arg, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + if ((rv = ws_check_string(buf, sz, t)) == 0) { + char *ns; + if ((ns = nni_strdup(buf)) == NULL) { + rv = NNG_ENOMEM; + } else { + nni_mtx_lock(&d->mtx); + if (d->proto != NULL) { + nni_strfree(d->proto); + } + d->proto = ns; + nni_mtx_unlock(&d->mtx); + } + } + return (rv); +} + +static int +ws_dialer_get_proto(void *arg, void *buf, size_t *szp, nni_type t) { + nni_ws_dialer *d = arg; + int rv; nni_mtx_lock(&d->mtx); - d->maxframe = maxframe; + rv = nni_copyout_str(d->proto != NULL ? d->proto : "", buf, szp, t); nni_mtx_unlock(&d->mtx); + return (rv); +} + +static const nni_option ws_dialer_options[] = { + { + .o_name = NNI_OPT_WS_MSGMODE, + .o_set = ws_dialer_set_msgmode, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_set = ws_dialer_set_maxframe, + .o_get = ws_dialer_get_maxframe, + }, + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_set = ws_dialer_set_fragsize, + .o_get = ws_dialer_get_fragsize, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_set = ws_dialer_set_recvmax, + .o_get = ws_dialer_get_recvmax, + }, + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_set = ws_dialer_set_req_headers, + // XXX: Get not implemented yet; likely of marginal value. + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_set = ws_dialer_set_proto, + .o_get = ws_dialer_get_proto, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_dialer_set_header( + nni_ws_dialer *d, const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + name += strlen(NNG_OPT_WS_REQUEST_HEADER); + if ((rv = ws_check_string(buf, sz, t)) == 0) { + nni_mtx_lock(&d->mtx); + rv = ws_set_header(&d->headers, name, buf); + nni_mtx_unlock(&d->mtx); + } + return (rv); +} + +static int +ws_dialer_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + rv = nni_setopt(ws_dialer_options, name, d, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_client_setx(d->client, name, buf, sz, t); + } + + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_REQUEST_HEADER)) { + rv = ws_dialer_set_header(d, name, buf, sz, t); + } + } + return (rv); +} + +static int +ws_dialer_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + nni_ws_dialer *d = arg; + int rv; + + rv = nni_getopt(ws_dialer_options, name, d, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_http_client_getx(d->client, name, buf, szp, t); + } + return (rv); +} + +int +nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) +{ + nni_ws_dialer *d; + int rv; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&d->headers, ws_header, node); + NNI_LIST_INIT(&d->wspend, nni_ws, node); + nni_mtx_init(&d->mtx); + nni_cv_init(&d->cv, &d->mtx); + + if ((rv = nng_url_clone(&d->url, url)) != 0) { + ws_dialer_free(d); + return (rv); + } + + if ((rv = nni_http_client_init(&d->client, url)) != 0) { + ws_dialer_free(d); + return (rv); + } + d->isstream = true; + d->recvmax = WS_DEF_RECVMAX; + d->maxframe = WS_DEF_MAXRXFRAME; + d->fragsize = WS_DEF_MAXTXFRAME; + + d->ops.sd_free = ws_dialer_free; + d->ops.sd_close = ws_dialer_close; + d->ops.sd_dial = ws_dialer_dial; + d->ops.sd_setx = ws_dialer_setx; + d->ops.sd_getx = ws_dialer_getx; + *dp = (void *) d; + return (0); } // Dialer does not get a hook chance, as it can examine the request @@ -2130,3 +2504,293 @@ nni_ws_dialer_set_maxframe(nni_ws_dialer *d, size_t maxframe) // The implementation will send periodic PINGs, and respond with // PONGs. + +static void +ws_str_free(void *arg) +{ + nni_ws *ws = arg; + nni_reap(&ws->reap, ws_fini, ws); +} + +static void +ws_str_close(void *arg) +{ + nni_ws *ws = arg; + ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); +} + +static void +ws_str_send(void *arg, nni_aio *aio) +{ + nni_ws * ws = arg; + int rv; + ws_frame *frame; + + if (nni_aio_begin(aio) != 0) { + return; + } + + if (!ws->isstream) { + nni_msg *msg; + unsigned niov; + nni_iov iov[2]; + if ((msg = nni_aio_get_msg(aio)) == NULL) { + nni_aio_finish_error(aio, NNG_EINVAL); + return; + } + niov = 0; + if (nng_msg_header_len(msg) > 0) { + iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + niov++; + } + iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + niov++; + + // Scribble into the iov for now. + nni_aio_set_iov(aio, niov, iov); + } + + if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { + nni_aio_finish_error(aio, NNG_ENOMEM); + return; + } + frame->aio = aio; + if ((rv = ws_frame_prep_tx(ws, frame)) != 0) { + nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); + return; + } + + nni_mtx_lock(&ws->mtx); + + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + ws_frame_fini(frame); + return; + } + if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, rv); + ws_frame_fini(frame); + return; + } + nni_aio_set_prov_extra(aio, 0, frame); + nni_list_append(&ws->sendq, aio); + nni_list_append(&ws->txq, frame); + ws_start_write(ws); + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_str_recv(void *arg, nng_aio *aio) +{ + nni_ws *ws = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ws->mtx); + if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&ws->recvq, aio); + if (nni_list_first(&ws->recvq) == aio) { + ws_read_finish_msg(ws); + } + ws_start_read(ws); + + nni_mtx_unlock(&ws->mtx); +} + +static int +ws_get_request_headers(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + nni_mtx_lock(&ws->mtx); + if (ws->reqhdrs == NULL) { + ws->reqhdrs = nni_http_req_headers(ws->req); + } + nni_mtx_unlock(&ws->mtx); + return (nni_copyout_str(ws->reqhdrs, buf, szp, t)); +} + +static int +ws_get_response_headers(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + nni_mtx_lock(&ws->mtx); + if (ws->reshdrs == NULL) { + ws->reshdrs = nni_http_res_headers(ws->res); + } + nni_mtx_unlock(&ws->mtx); + return (nni_copyout_str(ws->reshdrs, buf, szp, t)); +} + +static int +ws_get_request_uri(void *arg, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + return (nni_copyout_str(nni_http_req_get_uri(ws->req), buf, szp, t)); +} + +static const nni_option ws_options[] = { + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_get = ws_get_request_headers, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_get = ws_get_response_headers, + }, + { + .o_name = NNG_OPT_WS_REQUEST_URI, + .o_get = ws_get_request_uri, + }, + { + .o_name = NULL, + }, +}; + +static int +ws_str_setx(void *arg, const char *nm, const void *buf, size_t sz, nni_type t) +{ + nni_ws *ws = arg; + int rv; + + // Headers can only be set. + nni_mtx_lock(&ws->mtx); + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); + rv = nni_http_conn_setopt(ws->http, nm, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(ws_options, nm, ws, buf, sz, t); + } + if (rv == NNG_ENOTSUP) { + if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER) || + startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) { + return (NNG_EREADONLY); + } + } + + return (rv); +} + +static int +ws_get_req_header( + nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t) +{ + const char *s; + nm += strlen(NNG_OPT_WS_REQUEST_HEADER); + s = nni_http_req_get_header(ws->req, nm); + if (s == NULL) { + return (NNG_ENOENT); + } + return (nni_copyout_str(s, buf, szp, t)); +} + +static int +ws_get_res_header( + nni_ws *ws, const char *nm, void *buf, size_t *szp, nni_type t) +{ + const char *s; + nm += strlen(NNG_OPT_WS_RESPONSE_HEADER); + s = nni_http_res_get_header(ws->res, nm); + if (s == NULL) { + return (NNG_ENOENT); + } + return (nni_copyout_str(s, buf, szp, t)); +} + +static int +ws_str_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t) +{ + nni_ws *ws = arg; + int rv; + + nni_mtx_lock(&ws->mtx); + if (ws->closed) { + nni_mtx_unlock(&ws->mtx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); + rv = nni_http_conn_getopt(ws->http, nm, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(ws_options, nm, ws, buf, szp, t); + } + // Check for generic headers... + if (rv == NNG_ENOTSUP) { + if (startswith(nm, NNG_OPT_WS_REQUEST_HEADER)) { + rv = ws_get_req_header(ws, nm, buf, szp, t); + } else if (startswith(nm, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_get_res_header(ws, nm, buf, szp, t); + } + } + return (rv); +} + +static int +ws_check_size(const void *buf, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, buf, sz, 0, NNI_MAXSZ, t)); +} + +static const nni_chkoption ws_chkopts[] = { + { + .o_name = NNG_OPT_WS_SENDMAXFRAME, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_WS_RECVMAXFRAME, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_check = ws_check_size, + }, + { + .o_name = NNG_OPT_WS_PROTOCOL, + .o_check = ws_check_string, + }, + { + .o_name = NNG_OPT_WS_REQUEST_HEADERS, + .o_check = ws_check_string, + }, + { + .o_name = NNG_OPT_WS_RESPONSE_HEADERS, + .o_check = ws_check_string, + }, + { + .o_name = NULL, + }, +}; + +int +nni_ws_checkopt(const char *name, const void *data, size_t sz, nni_type t) +{ + int rv; + + rv = nni_chkopt(ws_chkopts, name, data, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("tcp", name, data, sz, t); + } + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("tls+tcp", name, data, sz, t); + } + if (rv == NNG_ENOTSUP) { + if (startswith(name, NNG_OPT_WS_REQUEST_HEADER) || + startswith(name, NNG_OPT_WS_RESPONSE_HEADER)) { + rv = ws_check_string(data, sz, t); + } + } + // Potentially, add checks for header options. + return (rv); +} |
