aboutsummaryrefslogtreecommitdiff
path: root/demo/async
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-12 11:37:26 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-12 15:36:12 -0700
commit7e4fa0fcc2135e7d19410c1e642656180de04bb0 (patch)
tree0af320b0e781a45201d743151eb94a21f846efda /demo/async
parentac917f68ef6291c0c07f4d5bd7b29eec867434bf (diff)
downloadnng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.tar.gz
nng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.tar.bz2
nng-7e4fa0fcc2135e7d19410c1e642656180de04bb0.zip
Use contexts for new async demo.
Diffstat (limited to 'demo/async')
-rw-r--r--demo/async/README.adoc50
-rw-r--r--demo/async/client.c96
-rwxr-xr-xdemo/async/run.sh27
-rw-r--r--demo/async/server.c (renamed from demo/async/async.c)99
4 files changed, 171 insertions, 101 deletions
diff --git a/demo/async/README.adoc b/demo/async/README.adoc
index 35290616..ec91c722 100644
--- a/demo/async/README.adoc
+++ b/demo/async/README.adoc
@@ -1,20 +1,24 @@
= async
-This is a simple asynchronous demo, that demonstrates use of the RAW
-option with a server, along with async message handling, to obtain a
-very high level of asynchronous operation, suitable for use in a highly
-concurrent server application.
+This is a simple asynchronous demo, that demonstrates use of the contexts
+and asynchronous message handling and operations, to obtain highly concurrent
+processing with minimal fuss.
== Compiling
You can override the level of concurrency with the `PARALLEL`
define. This determines how many requests the server will accept
-at a time, and keep outstanding. Note that for our toy
-implementation, we create this many "logical" flows of execution
-(these are _NOT_ threads), where a request is followed by a reply.
+at a time, and keep outstanding. Note that for our toy implementation,
+we create this many "logical" flows of execution (contexts) (these are
+_NOT_ threads), where a request is followed by a reply.
The value of `PARALLEL` must be at least one, and may be as large
-as your memory will permit. (The default value is 32.)
+as your memory will permit. (The default value is 128.) Probably
+you want the value to be small enough to ensure that you have enough
+file descriptors. (You can create more contexts than this, but generally
+you can't have more than one client per descriptor. Contexts can be used
+on the client side to support many thousands of concurrent requests over
+even just a single TCP connection, however.)
On UNIX-style systems:
@@ -23,31 +27,19 @@ On UNIX-style systems:
% export CPPFLAGS="-D PARALLEL=32 -I /usr/local/include"
% export LDFLAGS="-L /usr/local/lib -lnng"
% export CC="cc"
-% ${CC} ${CPPFLAGS} async.c -o async ${LDFLAGS}
+% ${CC} ${CPPFLAGS} server.c -o server ${LDFLAGS}
+% ${CC} ${CPPFLAGS} client.c -o client ${LDFLAGS}
----
== Running
-To run the server, use the arguments `__url__ -s`.
+The easiest thing is to simply use the `run.sh` script, which
+sends COUNT (10) random jobs to the server in parallel.
-To run the client, use the arguments `__url__ __msec__`.
+You can of course run the client and server manually instead.
-The _msec_ is a "delay" time that server will wait before responding.
-We have these delays so simulate long running work.
+The server takes the address (url) as its only argument.
-In the following example, all of the clients should complete within
-2 seconds. (Assuming `PARALLEL` is defined to be large enough.)
-
-[source,bash]
-----
-% export URL="tcp://127.0.0.1:55995"
-# start the server
-% ./async $URL -s &
-# start a bunch of clients
-# Note that these all run concurrently!
-% ./async $URL 2 &
-% ./async $URL 2 &
-% ./async $URL 2 &
-% ./async $URL 2 &
-% ./async $URL 2 &
-----
+The client takes the address (url), followed by the number of
+milliseconds the server should "wait" before responding (to simulate
+an expensive operation.)
diff --git a/demo/async/client.c b/demo/async/client.c
new file mode 100644
index 00000000..f236cc92
--- /dev/null
+++ b/demo/async/client.c
@@ -0,0 +1,96 @@
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitoar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// This program is just a simple client application for our demo server.
+// It is in a separate file to keep the server code clearer to understand.
+//
+// Our demonstration application layer protocol is simple. The client sends
+// a number of milliseconds to wait before responding. The server just gives
+// back an empty reply after waiting that long.
+
+// For example:
+//
+// % ./server tcp://127.0.0.1:5555 &
+// % ./client tcp://127.0.0.1:5555 323
+// Request took 324 milliseconds.
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/supplemental/util/platform.h>
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
+ exit(1);
+}
+
+/* The client runs just once, and then returns. */
+int
+client(const char *url, const char *msecstr)
+{
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ unsigned msec;
+
+ msec = atoi(msecstr);
+
+ if ((rv = nng_req0_open(&sock)) != 0) {
+ fatal("nng_req0_open", rv);
+ }
+
+ if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
+ fatal("nng_dial", rv);
+ }
+
+ start = nng_clock();
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ fatal("nng_msg_alloc", rv);
+ }
+ if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
+ fatal("nng_msg_append_u32", rv);
+ }
+
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("nng_send", rv);
+ }
+
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ fatal("nng_recvmsg", rv);
+ }
+ end = nng_clock();
+ nng_msg_free(msg);
+ nng_close(sock);
+
+ printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
+ return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+ int rc;
+
+ if (argc != 3) {
+ fprintf(stderr, "Usage: %s <url> <secs>\n", argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ rc = client(argv[1], argv[2]);
+ exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+}
diff --git a/demo/async/run.sh b/demo/async/run.sh
new file mode 100755
index 00000000..856fea9f
--- /dev/null
+++ b/demo/async/run.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+ADDR=ipc:///tmp/async_demo
+COUNT=10
+
+./server $ADDR &
+SERVER_PID=$!
+trap "kill $SERVER_PID" 0
+typeset -a CLIENT_PID
+i=0
+sleep 1
+while (( i < COUNT ))
+do
+ i=$(( i + 1 ))
+ rnd=$(( RANDOM % 1000 + 500 ))
+ echo "Starting client $i: server replies after $rnd msec"
+ ./client $ADDR $rnd &
+ eval CLIENT_PID[$i]=$!
+done
+
+i=0
+while (( i < COUNT ))
+do
+ i=$(( i + 1 ))
+ wait ${CLIENT_PID[$i]}
+done
+kill $SERVER_PID
diff --git a/demo/async/async.c b/demo/async/server.c
index 0285d8d2..fe884b3e 100644
--- a/demo/async/async.c
+++ b/demo/async/server.c
@@ -8,8 +8,11 @@
//
// This program serves as an example for how to write an async RPC service,
-// using the RAW request/reply pattern and nn_poll. The server receives
-// messages and keeps them on a list, replying to them.
+// using the request/reply pattern and contexts (nng_ctx(5)). The server
+// allocates a number of contexts up front, which determines the amount of
+// parallelism possible. The callbacks are handled asynchronously, so
+// this could be done by threads, or something similar. For our uses we
+// make use of an event driven architecture that we already have available.
// Our demonstration application layer protocol is simple. The client sends
// a number of milliseconds to wait before responding. The server just gives
@@ -20,8 +23,8 @@
//
// For example:
//
-// % ./async tcp://127.0.0.1:5555 -s &
-// % ./async tcp://127.0.0.1:5555 323
+// % ./server tcp://127.0.0.1:5555 &
+// % ./client tcp://127.0.0.1:5555 323
// Request took 324 milliseconds.
#include <stdio.h>
@@ -32,15 +35,17 @@
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
-#include <nng/protocol/reqrep0/req.h>
#include <nng/supplemental/util/platform.h>
// Parallel is the maximum number of outstanding requests we can handle.
// This is *NOT* the number of threads in use, but instead represents
// outstanding work items. Select a small number to reduce memory size.
-// (Each one of these can be thought of as a request-reply loop.)
+// (Each one of these can be thought of as a request-reply loop.) Note
+// that you will probably run into limitations on the number of open file
+// descriptors if you set this too high. (If not for that limit, this could
+// be set in the thousands, each context consumes a couple of KB.)
#ifndef PARALLEL
-#define PARALLEL 32
+#define PARALLEL 128
#endif
// The server keeps a list of work items, sorted by expiration time,
@@ -48,9 +53,9 @@
// use in poll.
struct work {
enum { INIT, RECV, WAIT, SEND } state;
- nng_aio * aio;
- nng_socket sock;
- nng_msg * msg;
+ nng_aio *aio;
+ nng_msg *msg;
+ nng_ctx ctx;
};
void
@@ -71,17 +76,17 @@ server_cb(void *arg)
switch (work->state) {
case INIT:
work->state = RECV;
- nng_recv_aio(work->sock, work->aio);
+ nng_ctx_recv(work->ctx, work->aio);
break;
case RECV:
if ((rv = nng_aio_result(work->aio)) != 0) {
- fatal("nng_recv_aio", rv);
+ fatal("nng_ctx_recv", rv);
}
msg = nng_aio_get_msg(work->aio);
if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
// bad message, just ignore it.
nng_msg_free(msg);
- nng_recv_aio(work->sock, work->aio);
+ nng_ctx_recv(work->ctx, work->aio);
return;
}
work->msg = msg;
@@ -93,15 +98,15 @@ server_cb(void *arg)
nng_aio_set_msg(work->aio, work->msg);
work->msg = NULL;
work->state = SEND;
- nng_send_aio(work->sock, work->aio);
+ nng_ctx_send(work->ctx, work->aio);
break;
case SEND:
if ((rv = nng_aio_result(work->aio)) != 0) {
nng_msg_free(work->msg);
- fatal("nng_send_aio", rv);
+ fatal("nng_ctx_send", rv);
}
work->state = RECV;
- nng_recv_aio(work->sock, work->aio);
+ nng_ctx_recv(work->ctx, work->aio);
break;
default:
fatal("bad state!", NNG_ESTATE);
@@ -121,8 +126,10 @@ alloc_work(nng_socket sock)
if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
fatal("nng_aio_alloc", rv);
}
+ if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
+ fatal("nng_ctx_open", rv);
+ }
w->state = INIT;
- w->sock = sock;
return (w);
}
@@ -140,9 +147,6 @@ server(const char *url)
if (rv != 0) {
fatal("nng_rep0_open", rv);
}
- if ((rv = nng_setopt_int(sock, NNG_OPT_RAW, 1)) != 0) {
- fatal("nng_setopt_int", rv);
- }
for (i = 0; i < PARALLEL; i++) {
works[i] = alloc_work(sock);
@@ -161,64 +165,15 @@ server(const char *url)
}
}
-/* The client runs just once, and then returns. */
-int
-client(const char *url, const char *msecstr)
-{
- nng_socket sock;
- int rv;
- nng_msg * msg;
- nng_time start;
- nng_time end;
- unsigned msec;
-
- msec = atoi(msecstr) * 1000;
-
- if ((rv = nng_req0_open(&sock)) != 0) {
- fatal("nng_req0_open", rv);
- }
-
- if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
- fatal("nng_dial", rv);
- }
-
- start = nng_clock();
-
- if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
- fatal("nng_msg_alloc", rv);
- }
- if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
- fatal("nng_msg_append_u32", rv);
- }
-
- if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
- fatal("nng_send", rv);
- }
-
- if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
- fatal("nng_recvmsg", rv);
- }
- end = nng_clock();
- nng_msg_free(msg);
- nng_close(sock);
-
- printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
- return (0);
-}
-
int
main(int argc, char **argv)
{
int rc;
- if (argc < 3) {
- fprintf(stderr, "Usage: %s <url> [-s|<secs>]\n", argv[0]);
+ if (argc != 2) {
+ fprintf(stderr, "Usage: %s <url>\n", argv[0]);
exit(EXIT_FAILURE);
}
- if (strcmp(argv[2], "-s") == 0) {
- rc = server(argv[1]);
- } else {
- rc = client(argv[1], argv[2]);
- }
+ rc = server(argv[1]);
exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}