diff options
Diffstat (limited to 'demo')
| -rw-r--r-- | demo/rest/README.adoc | 25 | ||||
| -rw-r--r-- | demo/rest/server.c | 372 |
2 files changed, 397 insertions, 0 deletions
diff --git a/demo/rest/README.adoc b/demo/rest/README.adoc new file mode 100644 index 00000000..6a363329 --- /dev/null +++ b/demo/rest/README.adoc @@ -0,0 +1,25 @@ += REST API Gateway demo + +This is a somewhat contrived demonstration, but may be useful +in a pattern for solving real world problems. + +There is a single "server" program, that does these: + +. REST API at /api/rest/rot13 - this API takes data from HTTP POST commands, + and forwards them to an NNG REQ socket. When the REQ response comes, + the reply is redirected back to the server. (For the purposes of the + demonstration, our server just performs ROT13 on input.) + +. REP server (implemented in the same program using inproc, for demonstration + purposes. In a real world scenario this might instead go to another + process on another computer.) + +[source, bash] +---- +% env PORT=8888 # default +% ./server & +% curl -d ABC http://127.0.0.1:8888/api/rest/rot13; echo +NOP +% curl -d ABC http://127.0.0.1:8888/api/rest/rot13; echo +ABC +---- diff --git a/demo/rest/server.c b/demo/rest/server.c new file mode 100644 index 00000000..9fbcfa34 --- /dev/null +++ b/demo/rest/server.c @@ -0,0 +1,372 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#define INPROC_URL "inproc://rot13" +#define REST_URL "http://127.0.0.1:%u/api/rest/rot13" + +// REST API -> NNG REP server demonstration. + +// This is a silly demo -- it listens on port 8888 (or $PORT if present), +// and accepts HTTP POST requests at /api/rest/rot13 +// +// These requests are converted into an NNG REQ message, and sent to an +// NNG REP server (builtin inproc_server, for demonstration purposes only). +// The reply is obtained from the server, and sent back to the client via +// the HTTP server framework. + +// Example usage: +// +// % export CPPFLAGS="-I /usr/local/include" +// % export LDFLAGS="-L /usr/local/lib -lnng" +// % export CC="cc" +// % ${CC} ${CPPFLAGS} server.c -o server ${LDFLAGS} +// % ./server & +// % curl -d TEST http://127.0.0.1:8888/api/rest/rot13 +// GRFG +// + +#include <nng/nng.h> +#include <nng/protocol/reqrep0/rep.h> +#include <nng/protocol/reqrep0/req.h> +#include <nng/supplemental/http/http.h> +#include <nng/supplemental/util/platform.h> + +#include <ctype.h> +#include <stdio.h> +#include <stdlib.h> + +// utility function +void +fatal(const char *what, int rv) +{ + fprintf(stderr, "%s: %s\n", what, nng_strerror(rv)); + exit(1); +} + +// This server acts as a proxy. We take HTTP POST requests, convert them to +// REQ messages, and when the reply is received, send the reply back to +// the original HTTP client. +// +// The state flow looks like: +// +// 1. Receive HTTP request & headers +// 2. Receive HTTP request (POST) data +// 3. Send POST payload as REQ body +// 4. Receive REP reply (including payload) +// 5. Return REP message body to the HTTP server (which forwards to client) +// 6. Restart at step 1. +// +// The above flow is pretty linear, and so we use contexts (nng_ctx) to +// obtain parallelism. + +typedef enum { + READ_DATA, // Reading HTTP post payload + SEND_REQ, // Sending REQ request + RECV_REP, // Receiving REQ reply +} job_state; + +typedef struct rest_job { + nng_aio * http_aio; // aio from HTTP we must reply to + nng_http_res *http_res; // HTTP response object + job_state state; // 0 = sending, 1 = receiving + nng_msg * msg; // request message + nng_aio * aio; // request flow + nng_ctx ctx; // context on the request socket +} rest_job; + +nng_socket req_sock; + +void +rest_free_job(rest_job *job) +{ + if (job == NULL) { + return; + } + if (job->http_res != 0) { + nng_http_res_free(job->http_res); + } + if (job->aio != NULL) { + nng_aio_free(job->aio); + } + if (job->msg != NULL) { + nng_msg_free(job->msg); + } + if (job->ctx != 0) { + nng_ctx_close(job->ctx); + } + free(job); +} + +static void +rest_http_fatal(rest_job *job, const char *fmt, int rv) +{ + char buf[128]; + nng_aio * aio = job->http_aio; + nng_http_res *res = job->http_res; + + job->http_res = NULL; + job->http_aio = NULL; + snprintf(buf, sizeof(buf), fmt, nng_strerror(rv)); + nng_http_res_set_status(res, NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR); + nng_http_res_set_reason(res, buf); + nng_aio_set_output(aio, 0, res); + nng_aio_finish(aio, 0); + rest_free_job(job); +} + +void +rest_job_cb(void *arg) +{ + rest_job *job = arg; + nng_aio * aio = job->aio; + int rv; + + switch (job->state) { + case READ_DATA: + if ((rv = nng_aio_result(aio)) != 0) { + rest_http_fatal(job, "read POST data failed: %s", rv); + return; + } + // We got good data. The message should already be set up, + // so at this point we need to just update the state and + // start the send. + nng_aio_set_msg(aio, job->msg); + job->state = SEND_REQ; + nng_ctx_send(job->ctx, aio); + break; + case SEND_REQ: + if ((rv = nng_aio_result(aio)) != 0) { + rest_http_fatal(job, "send REQ failed: %s", rv); + return; + } + job->msg = NULL; + // Message was sent, so now wait for the reply. + nng_aio_set_msg(aio, NULL); + job->state = RECV_REP; + nng_ctx_recv(job->ctx, aio); + break; + case RECV_REP: + if ((rv = nng_aio_result(aio)) != 0) { + rest_http_fatal(job, "recv reply failed: %s", rv); + return; + } + job->msg = nng_aio_get_msg(aio); + // We got a reply, so give it back to the server. + rv = nng_http_res_copy_data(job->http_res, + nng_msg_body(job->msg), nng_msg_len(job->msg)); + if (rv != 0) { + rest_http_fatal(job, "nng_http_res_copy_data: %s", rv); + return; + } + // Set the output - the HTTP server will send it back to the + // user agent with a 200 response. + nng_aio_set_output(job->http_aio, 0, job->http_res); + nng_aio_finish(job->http_aio, 0); + job->http_aio = NULL; + job->http_res = NULL; + // We are done with the job. + rest_free_job(job); + return; + default: + fatal("bad case", NNG_ESTATE); + break; + } +} + +// Our rest server just takes the message body, creates a request ID +// for it, and sends it on. This runs in raw mode, so +void +rest_handle(nng_aio *aio) +{ + struct rest_job * job; + nng_http_req * req = nng_aio_get_input(aio, 0); + nng_http_handler *h = nng_aio_get_input(aio, 1); + nng_http_conn * conn = nng_aio_get_input(aio, 2); + nng_http_res * res; + nng_msg * msg; + const char * clen; + size_t sz; + nng_iov iov; + int rv; + + if ((job = malloc(sizeof(*job))) == NULL) { + nng_aio_finish(aio, NNG_ENOMEM); + return; + } + if (((rv = nng_aio_alloc(&job->aio, rest_job_cb, job)) != 0) || + ((rv = nng_http_res_alloc(&job->http_res)) != 0) || + ((rv = nng_ctx_open(&job->ctx, req_sock)) != 0)) { + rest_free_job(job); + nng_aio_finish(aio, rv); + return; + } + + job->http_aio = aio; + if ((clen = nng_http_req_get_header(req, "Content-Length")) == NULL) { + nng_http_res *res = job->http_res; + job->http_res = NULL; + nng_http_res_set_status(res, NNG_HTTP_STATUS_LENGTH_REQUIRED); + nng_http_res_set_reason(res, NULL); + nng_aio_set_output(aio, 0, res); + nng_aio_finish(aio, 0); + rest_free_job(job); + return; + } + // Arbitrary limit, reject jobs with no data, or more than 128KB. + // Note that normally REQ/REP sockets don't transport over 1MB, so + // if you adjust this to be more than that, you'll need to also + // set the NNG_OPT_RECVMAXSIZE option. + sz = atoi(clen); + if ((sz < 1) || (sz > 128 * 1024)) { + nng_http_res *res = job->http_res; + job->http_res = NULL; + nng_http_res_set_status(res, NNG_HTTP_STATUS_BAD_REQUEST); + nng_aio_set_output(aio, 0, res); + nng_aio_finish(aio, 0); + rest_free_job(job); + return; + } + + if ((rv = nng_msg_alloc(&job->msg, sz)) != 0) { + rest_http_fatal(job, "nng_msg_alloc: %s", rv); + return; + } + + iov.iov_buf = nng_msg_body(job->msg); + iov.iov_len = nng_msg_len(job->msg); + if ((rv = nng_aio_set_iov(job->aio, 1, &iov)) != 0) { + rest_http_fatal(job, "nng_set_iov: %s", rv); + return; + } + + job->state = READ_DATA; + // This submits the request, and the state machine takes over + // all further processing. + nng_http_conn_read_all(conn, job->aio); +} + +void +rest_start(uint16_t port) +{ + nng_http_server * server; + nng_http_handler *handler; + char rest_addr[128]; + nng_url * url; + int rv; + + // Set up some strings, etc. We use the port number + // from the argument list. + snprintf(rest_addr, sizeof(rest_addr), REST_URL, port); + if ((rv = nng_url_parse(&url, rest_addr)) != 0) { + fatal("nng_url_parse", rv); + } + + // Create the REQ socket, and put it in raw mode, connected to + // the remote REP server (our inproc server in this case). + if ((rv = nng_req0_open(&req_sock)) != 0) { + fatal("nng_req0_open", rv); + } + if ((rv = nng_dial(req_sock, INPROC_URL, NULL, NNG_FLAG_NONBLOCK)) != + 0) { + fatal("nng_dial(" INPROC_URL ")", rv); + } + + // Get a suitable HTTP server instance. This creates one + // if it doesn't already exist. + if ((rv = nng_http_server_hold(&server, url)) != 0) { + fatal("nng_http_server_hold", rv); + } + + // Allocate the handler - we usea dynamic handler for REST + // using the function "rest_handle" declared above. + rv = nng_http_handler_alloc(&handler, url->u_path, rest_handle); + if (rv != 0) { + fatal("nng_http_handler_alloc", rv); + } + + if ((rv = nng_http_handler_set_method(handler, "POST")) != 0) { + fatal("nng_http_handler_set_method", rv); + } + if ((rv = nng_http_server_add_handler(server, handler)) != 0) { + fatal("nng_http_handler_add_handler", rv); + } + if ((rv = nng_http_server_start(server)) != 0) { + fatal("nng_http_server_start", rv); + } + + nng_url_free(url); +} + +// +// inproc_server - this just is a simple REP server that listens for +// messages, and performs ROT13 on them before sending them. This +// doesn't have to be in the same process -- it is hear for demonstration +// simplicity only. (Most likely this would be somewhere else.) Note +// especially that this uses inproc, so nothing can get to it directly +// from outside the process. +// +void +inproc_server(void *arg) +{ + nng_socket s; + int rv; + nng_msg * msg; + + if (((rv = nng_rep0_open(&s)) != 0) || + ((rv = nng_listen(s, INPROC_URL, NULL, 0)) != 0)) { + fatal("unable to set up inproc", rv); + } + // This is simple enough that we don't need concurrency. Plus it + // makes for an easier demo. + for (;;) { + char *body; + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + fatal("inproc recvmsg", rv); + } + body = nng_msg_body(msg); + for (int i = 0; i < nng_msg_len(msg); i++) { + // Table lookup would be faster, but this works. + if (isupper(body[i])) { + char base = body[i] - 'A'; + base = (base + 13) % 26; + body[i] = base + 'A'; + } else if (islower(body[i])) { + char base = body[i] - 'a'; + base = (base + 13) % 26; + body[i] = base + 'a'; + } + } + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + fatal("inproc sendmsg", rv); + } + } +} + +int +main(int argc, char **argv) +{ + int rv; + nng_thread *inproc_thr; + uint16_t port = 0; + + rv = nng_thread_create(&inproc_thr, inproc_server, NULL); + if (rv != 0) { + fatal("cannot start inproc server", rv); + } + if (getenv("PORT") != NULL) { + port = (uint16_t) atoi(getenv("PORT")); + } + port = port ? port : 8888; + rest_start(port); + + // This runs forever. The inproc_thr never exits, so we + // just block behind its condition variable. + nng_thread_destroy(inproc_thr); +} |
