summaryrefslogtreecommitdiff
path: root/src/supplemental/http/http.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/http/http.c')
-rw-r--r--src/supplemental/http/http.c604
1 files changed, 604 insertions, 0 deletions
diff --git a/src/supplemental/http/http.c b/src/supplemental/http/http.c
new file mode 100644
index 00000000..3958a738
--- /dev/null
+++ b/src/supplemental/http/http.c
@@ -0,0 +1,604 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+// We insist that individual headers fit in 8K.
+// If you need more than that, you need something we can't do.
+#define HTTP_BUFSIZE 8192
+
+// types of reads
+enum read_flavor {
+ HTTP_RD_RAW,
+ HTTP_RD_FULL,
+ HTTP_RD_REQ,
+ HTTP_RD_RES,
+};
+
+enum write_flavor {
+ HTTP_WR_RAW,
+ HTTP_WR_FULL,
+ HTTP_WR_REQ,
+ HTTP_WR_RES,
+};
+
+#define SET_RD_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_RD_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+#define SET_WR_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_WR_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+
+struct nni_http {
+ void *sock;
+ void (*rd)(void *, nni_aio *);
+ void (*wr)(void *, nni_aio *);
+ void (*close)(void *);
+ void (*fini)(void *);
+
+ bool closed;
+
+ nni_list rdq; // high level http read requests
+ nni_list wrq; // high level http write requests
+
+ nni_aio *rd_aio; // bottom half read operations
+ nni_aio *wr_aio; // bottom half write operations
+
+ nni_mtx mtx;
+
+ uint8_t *rd_buf;
+ size_t rd_get;
+ size_t rd_put;
+ size_t rd_bufsz;
+};
+
+static void
+http_close(nni_http *http)
+{
+ // Call with lock held.
+ nni_aio *aio;
+
+ if (http->closed) {
+ return;
+ }
+
+ http->closed = true;
+ if (nni_list_first(&http->wrq)) {
+ nni_aio_cancel(http->wr_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->wrq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ if (nni_list_first(&http->rdq)) {
+ nni_aio_cancel(http->rd_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+
+ if (http->sock != NULL) {
+ http->close(http->sock);
+ }
+}
+
+void
+nni_http_close(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// http_rd_buf attempts to satisfy the read from data in the buffer.
+static int
+http_rd_buf(nni_http *http, nni_aio *aio)
+{
+ size_t cnt = http->rd_put - http->rd_get;
+ size_t n;
+ uint8_t *rbuf = http->rd_buf;
+ int i;
+ int rv;
+ bool raw = false;
+
+ rbuf += http->rd_get;
+
+ switch (GET_RD_FLAVOR(aio)) {
+ case HTTP_RD_RAW:
+ raw = true; // FALLTHROUGH
+ case HTTP_RD_FULL:
+ for (i = 0; (aio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ n = aio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ memcpy(aio->a_iov[0].iov_buf, rbuf, n);
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ http->rd_get += n;
+ rbuf += n;
+ aio->a_count += n;
+ cnt -= n;
+
+ if (aio->a_iov[0].iov_len == 0) {
+ aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ }
+ }
+
+ if ((aio->a_niov == 0) || (raw && (aio->a_count != 0))) {
+ // Finished the read. (We are finished if we either
+ // got *all* the data, or we got *some* data for
+ // a raw read.)
+ return (0);
+ }
+
+ // No more data left in the buffer, so use a physio.
+ // (Note that we get here if we either have not completed
+ // a full transaction on a FULL read, or were not even able
+ // to get *any* data for a partial RAW read.)
+ for (i = 0; i < aio->a_niov; i++) {
+ http->rd_aio->a_iov[i] = aio->a_iov[i];
+ }
+ nni_aio_set_data(http->rd_aio, 1, NULL);
+ http->rd_aio->a_niov = aio->a_niov;
+ http->rd(http->sock, http->rd_aio);
+ return (NNG_EAGAIN);
+
+ case HTTP_RD_REQ:
+ rv = nni_http_req_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+
+ case HTTP_RD_RES:
+ rv = nni_http_res_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+ }
+ return (NNG_EINVAL);
+}
+
+static void
+http_rd_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ int rv;
+
+ if (http->closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = http_rd_buf(http, aio);
+ }
+ switch (rv) {
+ case NNG_EAGAIN:
+ return;
+ case 0:
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, aio->a_count);
+ break;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ http_close(http);
+ break;
+ }
+ }
+}
+
+static void
+http_rd_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->rd_aio;
+ nni_aio * uaio;
+ size_t cnt;
+ int rv;
+
+ nni_mtx_lock(&http->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ if ((uaio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ cnt = nni_aio_count(aio);
+
+ // If we were reading into the buffer, then advance location(s).
+ if ((uaio = nni_aio_get_data(aio, 1)) != NULL) {
+ http->rd_put += cnt;
+ NNI_ASSERT(http->rd_put <= http->rd_bufsz);
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ // Otherwise we are completing a USER request, and there should
+ // be no data left in the user buffer.
+ NNI_ASSERT(http->rd_get == http->rd_put);
+
+ uaio = nni_list_first(&http->rdq);
+ NNI_ASSERT(uaio != NULL);
+
+ for (int i = 0; (uaio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ size_t n = uaio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ uaio->a_iov[0].iov_len -= n;
+ uaio->a_iov[0].iov_buf += n;
+ uaio->a_count += n;
+ cnt -= n;
+
+ if (uaio->a_iov[0].iov_len == 0) {
+ uaio->a_niov--;
+ for (i = 0; i < uaio->a_niov; i++) {
+ uaio->a_iov[i] = uaio->a_iov[i + 1];
+ }
+ }
+ }
+
+ // Resubmit the start. This will attempt to consume data
+ // from the read buffer (there won't be any), and then either
+ // complete the I/O (for HTTP_RD_RAW, or if there is nothing left),
+ // or submit another physio.
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->rdq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_rd_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->rdq, aio);
+ if (nni_list_first(&http->rdq) == aio) {
+ http_rd_start(http);
+ }
+}
+
+static void
+http_wr_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ if ((aio = nni_list_first(&http->wrq)) != NULL) {
+
+ for (int i = 0; i < aio->a_niov; i++) {
+ http->wr_aio->a_iov[i] = aio->a_iov[i];
+ }
+ http->wr_aio->a_niov = aio->a_niov;
+ http->wr(http->sock, http->wr_aio);
+ }
+}
+
+static void
+http_wr_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->wr_aio;
+ nni_aio * uaio;
+ int rv;
+ size_t n;
+
+ nni_mtx_lock(&http->mtx);
+
+ uaio = nni_list_first(&http->wrq);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // We failed to complete the aio.
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ if (uaio == NULL) {
+ // write canceled?
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ n = nni_aio_count(aio);
+ uaio->a_count += n;
+ if (GET_WR_FLAVOR(uaio) == HTTP_WR_RAW) {
+ // For raw data, we just send partial completion
+ // notices to the consumer.
+ goto done;
+ }
+ while (n) {
+ NNI_ASSERT(aio->a_niov != 0);
+
+ if (aio->a_iov[0].iov_len > n) {
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= aio->a_iov[0].iov_len;
+ for (int i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ aio->a_niov--;
+ }
+ if ((aio->a_niov != 0) && (aio->a_iov[0].iov_len != 0)) {
+ // We have more to transmit.
+ http->wr(http->sock, aio);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+done:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish(uaio, 0, uaio->a_count);
+
+ // Start next write if another is ready.
+ http_wr_start(http);
+
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->wrq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_wr_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->wrq, aio);
+ if (nni_list_first(&http->wrq) == aio) {
+ http_wr_start(http);
+ }
+}
+
+void
+nni_http_read_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_REQ);
+ aio->a_prov_extra[1] = req;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RES);
+ aio->a_prov_extra[1] = res;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_full(nni_http *http, nni_aio *aio)
+{
+ aio->a_count = 0;
+ SET_RD_FLAVOR(aio, HTTP_RD_FULL);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read(nni_http *http, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RAW);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_req_get_buf(req, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_REQ);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_res_get_buf(res, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_RES);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// Writer. As with nni_http_conn_write, this is used to write data on
+// a connection that has been "upgraded" (e.g. transformed to
+// websocket). It is an error to perform other HTTP exchanges on an
+// connection after this method is called. (This mostly exists to
+// support websocket.)
+void
+nni_http_write(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_RAW);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_full(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_FULL);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_fini(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ if ((http->sock != NULL) && (http->fini != NULL)) {
+ http->fini(http->sock);
+ http->sock = NULL;
+ }
+ nni_mtx_unlock(&http->mtx);
+ nni_aio_stop(http->wr_aio);
+ nni_aio_stop(http->rd_aio);
+ nni_aio_fini(http->wr_aio);
+ nni_aio_fini(http->rd_aio);
+ nni_free(http->rd_buf, http->rd_bufsz);
+ nni_mtx_fini(&http->mtx);
+ NNI_FREE_STRUCT(http);
+}
+
+int
+nni_http_init(nni_http **httpp, nni_http_tran *tran)
+{
+ nni_http *http;
+ int rv;
+
+ if ((http = NNI_ALLOC_STRUCT(http)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ if ((http->rd_buf = nni_alloc(http->rd_bufsz)) == NULL) {
+ NNI_FREE_STRUCT(http);
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&http->mtx);
+ nni_aio_list_init(&http->rdq);
+ nni_aio_list_init(&http->wrq);
+
+ if (((rv = nni_aio_init(&http->wr_aio, http_wr_cb, http)) != 0) ||
+ ((rv = nni_aio_init(&http->rd_aio, http_rd_cb, http)) != 0)) {
+ nni_http_fini(http);
+ return (rv);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ http->rd = tran->h_read;
+ http->wr = tran->h_write;
+ http->close = tran->h_close;
+ http->fini = tran->h_fini;
+ http->sock = tran->h_data;
+
+ *httpp = http;
+
+ return (0);
+}