aboutsummaryrefslogtreecommitdiff
path: root/demo/raw/raw.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-24 14:59:39 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-25 08:11:26 -0700
commit301de3ac5c7cf8a5eaaf3c58157251db781841d6 (patch)
treed0a19062d01de0df130a59613134330809b2a5ae /demo/raw/raw.c
parentb36dadf267842fb2fad7596f90f8f0cd78ac4af5 (diff)
downloadnng-301de3ac5c7cf8a5eaaf3c58157251db781841d6.tar.gz
nng-301de3ac5c7cf8a5eaaf3c58157251db781841d6.tar.bz2
nng-301de3ac5c7cf8a5eaaf3c58157251db781841d6.zip
fixes #486 Revisit SOVERSION and VERSION
fixes #485 Honor BUILD_SHARED_LIBS fixes #483 Don't expose private symbols in shared library fixes #481 Export CMake target This is a "large" commit involving changes that don't affect the code directly, but which have an impact on how we package and build our project. The most significant of these changes is that we now build only either a shared or a static library, depending on the setting of the BUILD_SHARED_LIBS option. We also suppress private symbols from being exposed when the underlying toolchain lets us do so. Minor updates to the way we version the ABI are used, and we now have a nice exported CMake project. To import this project in another, simply do find_package(nng) and you can add target_link_libraries(nng::nng) to your targets. CMake does the rest for you.
Diffstat (limited to 'demo/raw/raw.c')
-rw-r--r--demo/raw/raw.c224
1 files changed, 224 insertions, 0 deletions
diff --git a/demo/raw/raw.c b/demo/raw/raw.c
new file mode 100644
index 00000000..0285d8d2
--- /dev/null
+++ b/demo/raw/raw.c
@@ -0,0 +1,224 @@
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitoar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// This program serves as an example for how to write an async RPC service,
+// using the RAW request/reply pattern and nn_poll. The server receives
+// messages and keeps them on a list, replying to them.
+
+// Our demonstration application layer protocol is simple. The client sends
+// a number of milliseconds to wait before responding. The server just gives
+// back an empty reply after waiting that long.
+
+// To run this program, start the server as async_demo <url> -s
+// Then connect to it with the client as async_client <url> <msec>.
+//
+// For example:
+//
+// % ./async tcp://127.0.0.1:5555 -s &
+// % ./async tcp://127.0.0.1:5555 323
+// Request took 324 milliseconds.
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <nng/nng.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/supplemental/util/platform.h>
+
+// Parallel is the maximum number of outstanding requests we can handle.
+// This is *NOT* the number of threads in use, but instead represents
+// outstanding work items. Select a small number to reduce memory size.
+// (Each one of these can be thought of as a request-reply loop.)
+#ifndef PARALLEL
+#define PARALLEL 32
+#endif
+
+// The server keeps a list of work items, sorted by expiration time,
+// so that we can use this to set the timeout to the correct value for
+// use in poll.
+struct work {
+ enum { INIT, RECV, WAIT, SEND } state;
+ nng_aio * aio;
+ nng_socket sock;
+ nng_msg * msg;
+};
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
+ exit(1);
+}
+
+void
+server_cb(void *arg)
+{
+ struct work *work = arg;
+ nng_msg * msg;
+ int rv;
+ uint32_t when;
+
+ switch (work->state) {
+ case INIT:
+ work->state = RECV;
+ nng_recv_aio(work->sock, work->aio);
+ break;
+ case RECV:
+ if ((rv = nng_aio_result(work->aio)) != 0) {
+ fatal("nng_recv_aio", rv);
+ }
+ msg = nng_aio_get_msg(work->aio);
+ if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
+ // bad message, just ignore it.
+ nng_msg_free(msg);
+ nng_recv_aio(work->sock, work->aio);
+ return;
+ }
+ work->msg = msg;
+ work->state = WAIT;
+ nng_sleep_aio(when, work->aio);
+ break;
+ case WAIT:
+ // We could add more data to the message here.
+ nng_aio_set_msg(work->aio, work->msg);
+ work->msg = NULL;
+ work->state = SEND;
+ nng_send_aio(work->sock, work->aio);
+ break;
+ case SEND:
+ if ((rv = nng_aio_result(work->aio)) != 0) {
+ nng_msg_free(work->msg);
+ fatal("nng_send_aio", rv);
+ }
+ work->state = RECV;
+ nng_recv_aio(work->sock, work->aio);
+ break;
+ default:
+ fatal("bad state!", NNG_ESTATE);
+ break;
+ }
+}
+
+struct work *
+alloc_work(nng_socket sock)
+{
+ struct work *w;
+ int rv;
+
+ if ((w = nng_alloc(sizeof(*w))) == NULL) {
+ fatal("nng_alloc", NNG_ENOMEM);
+ }
+ if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
+ fatal("nng_aio_alloc", rv);
+ }
+ w->state = INIT;
+ w->sock = sock;
+ return (w);
+}
+
+// The server runs forever.
+int
+server(const char *url)
+{
+ nng_socket sock;
+ struct work *works[PARALLEL];
+ int rv;
+ int i;
+
+ /* Create the socket. */
+ rv = nng_rep0_open(&sock);
+ if (rv != 0) {
+ fatal("nng_rep0_open", rv);
+ }
+ if ((rv = nng_setopt_int(sock, NNG_OPT_RAW, 1)) != 0) {
+ fatal("nng_setopt_int", rv);
+ }
+
+ for (i = 0; i < PARALLEL; i++) {
+ works[i] = alloc_work(sock);
+ }
+
+ if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
+ fatal("nng_listen", rv);
+ }
+
+ for (i = 0; i < PARALLEL; i++) {
+ server_cb(works[i]); // this starts them going (INIT state)
+ }
+
+ for (;;) {
+ nng_msleep(3600000); // neither pause() nor sleep() portable
+ }
+}
+
+/* The client runs just once, and then returns. */
+int
+client(const char *url, const char *msecstr)
+{
+ nng_socket sock;
+ int rv;
+ nng_msg * msg;
+ nng_time start;
+ nng_time end;
+ unsigned msec;
+
+ msec = atoi(msecstr) * 1000;
+
+ if ((rv = nng_req0_open(&sock)) != 0) {
+ fatal("nng_req0_open", rv);
+ }
+
+ if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
+ fatal("nng_dial", rv);
+ }
+
+ start = nng_clock();
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ fatal("nng_msg_alloc", rv);
+ }
+ if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
+ fatal("nng_msg_append_u32", rv);
+ }
+
+ if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
+ fatal("nng_send", rv);
+ }
+
+ if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
+ fatal("nng_recvmsg", rv);
+ }
+ end = nng_clock();
+ nng_msg_free(msg);
+ nng_close(sock);
+
+ printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
+ return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+ int rc;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: %s <url> [-s|<secs>]\n", argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ if (strcmp(argv[2], "-s") == 0) {
+ rc = server(argv[1]);
+ } else {
+ rc = client(argv[1], argv[2]);
+ }
+ exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+}