aboutsummaryrefslogtreecommitdiff
path: root/demo/raw
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-12 11:37:26 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-12 15:36:12 -0700
commit7e4fa0fcc2135e7d19410c1e642656180de04bb0 (patch)
tree0af320b0e781a45201d743151eb94a21f846efda /demo/raw
parentac917f68ef6291c0c07f4d5bd7b29eec867434bf (diff)
downloadnng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.tar.gz
nng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.tar.bz2
nng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.zip
Use contexts for new async demo.
Diffstat (limited to 'demo/raw')
-rw-r--r--demo/raw/README.adoc53
-rw-r--r--demo/raw/async.c224
2 files changed, 277 insertions, 0 deletions
diff --git a/demo/raw/README.adoc b/demo/raw/README.adoc
new file mode 100644
index 00000000..35290616
--- /dev/null
+++ b/demo/raw/README.adoc
@@ -0,0 +1,53 @@
+= async
+
+This is a simple asynchronous demo, that demonstrates use of the RAW
+option with a server, along with async message handling, to obtain a
+very high level of asynchronous operation, suitable for use in a highly
+concurrent server application.
+
+== Compiling
+
+You can override the level of concurrency with the `PARALLEL`
+define. This determines how many requests the server will accept
+at a time, and keep outstanding. Note that for our toy
+implementation, we create this many "logical" flows of execution
+(these are _NOT_ threads), where a request is followed by a reply.
+
+The value of `PARALLEL` must be at least one, and may be as large
+as your memory will permit. (The default value is 32.)
+
+On UNIX-style systems:
+
+[source, bash]
+----
+% export CPPFLAGS="-D PARALLEL=32 -I /usr/local/include"
+% export LDFLAGS="-L /usr/local/lib -lnng"
+% export CC="cc"
+% ${CC} ${CPPFLAGS} async.c -o async ${LDFLAGS}
+----
+
+== Running
+
+To run the server, use the arguments `__url__ -s`.
+
+To run the client, use the arguments `__url__ __msec__`.
+
+The _msec_ is a "delay" time that server will wait before responding.
+We have these delays so simulate long running work.
+
+In the following example, all of the clients should complete within
+2 seconds. (Assuming `PARALLEL` is defined to be large enough.)
+
+[source,bash]
+----
+% export URL="tcp://127.0.0.1:55995"
+# start the server
+% ./async $URL -s &
+# start a bunch of clients
+# Note that these all run concurrently!
+% ./async $URL 2 &
+% ./async $URL 2 &
+% ./async $URL 2 &
+% ./async $URL 2 &
+% ./async $URL 2 &
+----
diff --git a/demo/raw/async.c b/demo/raw/async.c
new file mode 100644
index 00000000..0285d8d2
--- /dev/null
+++ b/demo/raw/async.c
@@ -0,0 +1,224 @@
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitoar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// This program serves as an example for how to write an async RPC service,
+// using the RAW request/reply pattern and nn_poll. The server receives
+// messages and keeps them on a list, replying to them.
+
+// Our demonstration application layer protocol is simple. The client sends
+// a number of milliseconds to wait before responding. The server just gives
+// back an empty reply after waiting that long.
+
+// To run this program, start the server as async_demo <url> -s
+// Then connect to it with the client as async_client <url> <msec>.
+//
+// For example:
+//
+// % ./async tcp://127.0.0.1:5555 -s &
+// % ./async tcp://127.0.0.1:5555 323
+// Request took 324 milliseconds.
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/supplemental/util/platform.h>
+
+// Parallel is the maximum number of outstanding requests we can handle.
+// This is *NOT* the number of threads in use, but instead represents
+// outstanding work items. Select a small number to reduce memory size.
+// (Each one of these can be thought of as a request-reply loop.)
+#ifndef PARALLEL
+#define PARALLEL 32
+#endif
+
+// The server keeps a list of work items, sorted by expiration time,
+// so that we can use this to set the timeout to the correct value for
+// use in poll.
+struct work {
+ enum { INIT, RECV, WAIT, SEND } state;
+ nng_aio * aio;
+ nng_socket sock;
+ nng_msg * msg;
+};
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
+ exit(1);
+}
+
+void
+server_cb(void *arg)
+{
+ struct work *work = arg;
+ nng_msg * msg;
+ int rv;
+ uint32_t when;
+
+ switch (work->state) {
+ case INIT:
+ work->state = RECV;
+ nng_recv_aio(work->sock, work->aio);
+ break;
+ case RECV:
+ if ((rv = nng_aio_result(work->aio)) != 0) {
+ fatal("nng_recv_aio", rv);
+ }
+ msg = nng_aio_get_msg(work->aio);
+ if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
+ // bad message, just ignore it.
+ nng_msg_free(msg);
+ nng_recv_aio(work->sock, work->aio);
+ return;
+ }
+ work->msg = msg;
+ work->state = WAIT;
+ nng_sleep_aio(when, work->aio);
+ break;
+ case WAIT:
+ // We could add more data to the message here.
+ nng_aio_set_msg(work->aio, work->msg);
+ work->msg = NULL;
+ work->state = SEND;
+ nng_send_aio(work->sock, work->aio);
+ break;
+ case SEND:
+ if ((rv = nng_aio_result(work->aio)) != 0) {
+ nng_msg_free(work->msg);
+ fatal("nng_send_aio", rv);
+ }
+ work->state = RECV;
+ nng_recv_aio(work->sock, work->aio);
+ break;
+ default:
+ fatal("bad state!", NNG_ESTATE);
+ break;
+ }
+}
+
+struct work *
+alloc_work(nng_socket sock)
+{
+ struct work *w;
+ int rv;
+
+ if ((w = nng_alloc(sizeof(*w))) == NULL) {
+ fatal("nng_alloc", NNG_ENOMEM);
+ }
+ if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
+ fatal("nng_aio_alloc", rv);
+ }
+ w->state = INIT;
+ w->sock = sock;
+ return (w);
+}
+
+// The server runs forever.
+int
+server(const char *url)
+{
+ nng_socket sock;
+ struct work *works[PARALLEL];
+ int rv;
+ int i;
+
+ /* Create the socket. */
+ rv = nng_rep0_open(&sock);
+ if (rv != 0) {
+ fatal("nng_rep0_open", rv);
+ }
+ if ((rv = nng_setopt_int(sock, NNG_OPT_RAW, 1)) != 0) {
+ fatal("nng_setopt_int", rv);
+ }
+
+ for (i = 0; i < PARALLEL; i++) {
+ works[i] = alloc_work(sock);
+ }
+
+ if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
+ fatal("nng_listen", rv);
+ }
+
+ for (i = 0; i < PARALLEL; i++) {
+ server_cb(works[i]); // this starts them going (INIT state)
+ }
+
+ for (;;) {
+ nng_msleep(3600000); // neither pause() nor sleep() portable
+ }
+}
+
+/* The client runs just once, and then returns. */
+int
+client(const char *url, const char *msecstr)
+{
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ unsigned msec;
+
+ msec = atoi(msecstr) * 1000;
+
+ if ((rv = nng_req0_open(&sock)) != 0) {
+ fatal("nng_req0_open", rv);
+ }
+
+ if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
+ fatal("nng_dial", rv);
+ }
+
+ start = nng_clock();
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ fatal("nng_msg_alloc", rv);
+ }
+ if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
+ fatal("nng_msg_append_u32", rv);
+ }
+
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("nng_send", rv);
+ }
+
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ fatal("nng_recvmsg", rv);
+ }
+ end = nng_clock();
+ nng_msg_free(msg);
+ nng_close(sock);
+
+ printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
+ return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+ int rc;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: %s <url> [-s|<secs>]\n", argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ if (strcmp(argv[2], "-s") == 0) {
+ rc = server(argv[1]);
+ } else {
+ rc = client(argv[1], argv[2]);
+ }
+ exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+}