summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-11-27 14:21:20 -0800
committerGarrett D'Amore <garrett@damore.org>2017-12-26 15:31:53 -0800
commit93db6fe3aaff421d61a15993ba6827b742ab00d1 (patch)
treed4d6372cb5d606ba9bcdb60b88b6271086940895
parentc9bf5a76b0d6aead6ae91af71ada51a17881ac0a (diff)
downloadnng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.gz
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.tar.bz2
nng-93db6fe3aaff421d61a15993ba6827b742ab00d1.zip
fixes #2 Websocket transport
This is a rather large changeset -- it fundamentally adds websocket transport, but as part of this changeset we added a generic framework for both HTTP and websocket. We also made some supporting changes to the core, such as changing the way timeouts work for AIOs and adding additional state keeping for AIOs, and adding a common framework for deferred finalization (to avoid certain kinds of circular deadlocks during resource cleanup). We also invented a new initialization framework so that we can avoid wiring in knowledge about them into the master initialization framework. The HTTP framework is not yet complete, but it is good enough for simple static serving and building additional services on top of -- including websocket. We expect both websocket and HTTP support to evolve considerably, and so these are not part of the public API yet. Property support for the websocket transport (in particular address properties) is still missing, as is support for TLS. The websocket transport here is a bit more robust than the original nanomsg implementation, as it supports multiple sockets listening at the same port sharing the same HTTP server instance, discriminating between them based on URI (and possibly the virtual host). Websocket is enabled by default at present, and work to conditionalize HTTP and websocket further (to minimize bloat) is still pending.
-rw-r--r--CMakeLists.txt5
-rw-r--r--src/CMakeLists.txt5
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h50
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/device.c2
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/init.c52
-rw-r--r--src/core/init.h23
-rw-r--r--src/core/list.c7
-rw-r--r--src/core/message.c1
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/reap.c89
-rw-r--r--src/core/reap.h36
-rw-r--r--src/core/socket.c30
-rw-r--r--src/core/socket.h18
-rw-r--r--src/core/strs.c91
-rw-r--r--src/core/strs.h3
-rw-r--r--src/core/transport.c72
-rw-r--r--src/core/transport.h3
-rw-r--r--src/nng.c10
-rw-r--r--src/platform/posix/posix_tcp.c4
-rw-r--r--src/supplemental/base64/base64.c2
-rw-r--r--src/supplemental/base64/base64.h2
-rw-r--r--src/supplemental/http/CMakeLists.txt17
-rw-r--r--src/supplemental/http/client.c147
-rw-r--r--src/supplemental/http/http.c604
-rw-r--r--src/supplemental/http/http.h290
-rw-r--r--src/supplemental/http/http_msg.c956
-rw-r--r--src/supplemental/http/server.c1103
-rw-r--r--src/supplemental/mbedtls/tls.c6
-rw-r--r--src/supplemental/websocket/CMakeLists.txt14
-rw-r--r--src/supplemental/websocket/websocket.c1935
-rw-r--r--src/supplemental/websocket/websocket.h63
-rw-r--r--src/transport/tls/tls.c70
-rw-r--r--src/transport/ws/CMakeLists.txt19
-rw-r--r--src/transport/ws/README.adoc38
-rw-r--r--src/transport/ws/websocket.c533
-rw-r--r--src/transport/ws/websocket.h62
-rw-r--r--src/transport/zerotier/zerotier.c15
-rw-r--r--tests/CMakeLists.txt3
-rw-r--r--tests/base64.c2
-rw-r--r--tests/httpclient.c120
-rw-r--r--tests/httpserver.c146
-rw-r--r--tests/platform.c6
-rw-r--r--tests/sha1.c2
-rw-r--r--tests/tls.c5
-rw-r--r--tests/trantest.h17
-rw-r--r--tests/ws.c97
-rw-r--r--tests/zt.c2
50 files changed, 6709 insertions, 146 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 27f0321e..e123c3fd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -168,6 +168,11 @@ if (NNG_TRANSPORT_TCP)
add_definitions (-DNNG_HAVE_TCP)
endif ()
+option (NNG_TRANSPORT_WS "Enable WebSocket transport." ON)
+if (NNG_TRANSPORT_WS)
+ add_definitions (-DNNG_HAVE_WEBSOCKET)
+endif ()
+
option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF)
if (NNG_TRANSPORT_ZEROTIER)
add_definitions (-DNNG_HAVE_ZEROTIER)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 93fa390d..6fae89e0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -62,6 +62,8 @@ set (NNG_SOURCES
core/protocol.h
core/random.c
core/random.h
+ core/reap.c
+ core/reap.h
core/socket.c
core/socket.h
core/strs.c
@@ -120,8 +122,10 @@ if (NNG_PLATFORM_WINDOWS)
endif()
add_subdirectory(supplemental/base64)
+add_subdirectory(supplemental/http)
add_subdirectory(supplemental/mbedtls)
add_subdirectory(supplemental/sha1)
+add_subdirectory(supplemental/websocket)
add_subdirectory(protocol/bus0)
add_subdirectory(protocol/pair0)
@@ -135,6 +139,7 @@ add_subdirectory(transport/inproc)
add_subdirectory(transport/ipc)
add_subdirectory(transport/tcp)
add_subdirectory(transport/tls)
+add_subdirectory(transport/ws)
add_subdirectory(transport/zerotier)
include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src
diff --git a/src/core/aio.c b/src/core/aio.c
index cec2ff7c..6ce5641d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -66,7 +67,8 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
}
memset(aio, 0, sizeof(*aio));
nni_cv_init(&aio->a_cv, &nni_aio_lk);
- aio->a_expire = NNI_TIME_NEVER;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_timeout = NNG_DURATION_INFINITE;
if (arg == NULL) {
arg = aio;
}
@@ -116,9 +118,9 @@ nni_aio_stop(nni_aio *aio)
}
void
-nni_aio_set_timeout(nni_aio *aio, nni_time when)
+nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
- aio->a_expire = when;
+ aio->a_timeout = when;
}
void
@@ -158,15 +160,54 @@ nni_aio_get_ep(nni_aio *aio)
}
void
-nni_aio_set_data(nni_aio *aio, void *data)
+nni_aio_set_data(nni_aio *aio, int index, void *data)
{
- aio->a_data = data;
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) {
+ aio->a_user_data[index] = data;
+ }
}
void *
-nni_aio_get_data(nni_aio *aio)
+nni_aio_get_data(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) {
+ return (aio->a_user_data[index]);
+ }
+ return (NULL);
+}
+
+void
+nni_aio_set_input(nni_aio *aio, int index, void *data)
{
- return (aio->a_data);
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) {
+ aio->a_inputs[index] = data;
+ }
+}
+
+void *
+nni_aio_get_input(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) {
+ return (aio->a_inputs[index]);
+ }
+ return (NULL);
+}
+
+void
+nni_aio_set_output(nni_aio *aio, int index, void *data)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) {
+ aio->a_outputs[index] = data;
+ }
+}
+
+void *
+nni_aio_get_output(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) {
+ return (aio->a_outputs[index]);
+ }
+ return (NULL);
}
int
@@ -219,8 +260,21 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
aio->a_active = 1;
- if (aio->a_expire != NNI_TIME_NEVER) {
+
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ aio->a_expire = NNI_TIME_ZERO;
+ nni_aio_expire_add(aio);
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
nni_aio_expire_add(aio);
+ break;
}
nni_mtx_unlock(&nni_aio_lk);
return (0);
diff --git a/src/core/aio.h b/src/core/aio.h
index 3bdcf433..c4c09421 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -22,9 +23,10 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, int);
// An nni_aio is an async I/O handle.
struct nni_aio {
- int a_result; // Result code (nng_errno)
- size_t a_count; // Bytes transferred (I/O only)
- nni_time a_expire;
+ int a_result; // Result code (nng_errno)
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
nni_cv a_cv;
@@ -35,8 +37,6 @@ struct nni_aio {
unsigned a_expiring : 1; // expiration callback in progress
unsigned a_waiting : 1; // a thread is waiting for this to finish
unsigned a_synch : 1; // run completion synchronously
- unsigned a_reltime : 1; // expiration time is relative
- unsigned a_pad : 25; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
@@ -53,13 +53,21 @@ struct nni_aio {
// Resolver operations.
nni_sockaddr *a_addr;
- // Extra user data.
- void *a_data;
+ // User scratch data. Consumers may store values here, which
+ // must be preserved by providers and the framework.
+ void *a_user_data[4];
+
+ // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
+ // specified. The semantics of these will vary, and depend on the
+ // specific operation.
+ void *a_inputs[4];
+ void *a_outputs[4];
// Provider-use fields.
nni_aio_cancelfn a_prov_cancel;
void * a_prov_data;
nni_list_node a_prov_node;
+ void * a_prov_extra[4]; // Extra data used by provider
// Expire node.
nni_list_node a_expire_node;
@@ -96,12 +104,32 @@ extern void nni_aio_stop(nni_aio *);
// nni_aio_set_data sets user data. This should only be done by the
// consumer, initiating the I/O. The intention is to be able to store
// additional data for use when the operation callback is executed.
-extern void nni_aio_set_data(nni_aio *, void *);
+// The index represents the "index" at which to store the data. A maximum
+// of 4 elements can be stored with the (index >= 0 && index < 4).
+extern void nni_aio_set_data(nni_aio *, int, void *);
// nni_aio_get_data returns the user data that was previously stored
// with nni_aio_set_data.
-extern void *nni_aio_get_data(nni_aio *);
+extern void *nni_aio_get_data(nni_aio *, int);
+
+// nni_set_input sets input parameters on the AIO. The semantic details
+// of this will be determined by the specific AIO operation. AIOs can
+// carry up to 4 input parameters.
+extern void nni_aio_set_input(nni_aio *, int, void *);
+
+// nni_get_input returns the input value stored by nni_aio_set_input.
+extern void *nni_aio_get_input(nni_aio *, int);
+
+// nni_set_output sets output results on the AIO, allowing providers to
+// return results to consumers. The semantic details are determined by
+// the AIO operation. Up to 4 outputs can be carried on an AIO.
+extern void nni_aio_set_output(nni_aio *, int, void *);
+
+// nni_get_output returns an output previously stored on the AIO.
+extern void *nni_aio_get_output(nni_aio *, int);
+// XXX: These should be refactored in terms of the generic inputs and
+// outputs.
extern void nni_aio_set_msg(nni_aio *, nni_msg *);
extern nni_msg *nni_aio_get_msg(nni_aio *);
extern void nni_aio_set_pipe(nni_aio *, void *);
@@ -122,10 +150,10 @@ extern void * nni_aio_get_ep(nni_aio *);
// completion callback.
void nni_aio_set_synch(nni_aio *);
-// nni_aio_set_timeout sets the timeout (absolute) when the AIO will
+// nni_aio_set_timeout sets the timeout (relative) when the AIO will
// be canceled. The cancelation does not happen until after nni_aio_start
// is called.
-extern void nni_aio_set_timeout(nni_aio *, nni_time);
+extern void nni_aio_set_timeout(nni_aio *, nni_duration);
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
diff --git a/src/core/defs.h b/src/core/defs.h
index 5a9ded92..3a714f85 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -25,6 +25,9 @@
#define NNI_ASSERT(x)
#endif
+// Returns the size of an array in elements. (Convenience.)
+#define NNI_NUM_ELEMENTS(x) (sizeof(x) / sizeof((x)[0]))
+
// These types are common but have names shared with user space.
typedef struct nng_msg nni_msg;
typedef struct nng_sockaddr nni_sockaddr;
diff --git a/src/core/device.c b/src/core/device.c
index 2e000d7e..e6b75897 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -159,7 +159,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
return (rv);
}
- nni_aio_set_timeout(p->aio, NNI_TIME_NEVER);
+ nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE);
}
dd->npath = npath;
*dp = dd;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fa30bf77..3058f5c0 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -333,8 +333,8 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- nni_aio_set_timeout(ep->ep_tmo_aio,
- nni_clock() + (backoff ? nni_random() % backoff : 0));
+ nni_aio_set_timeout(
+ ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
diff --git a/src/core/init.c b/src/core/init.c
index 4a7fd974..cb6dbeee 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -10,15 +11,25 @@
#include "core/nng_impl.h"
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
+static nni_mtx nni_init_mtx;
+static nni_list nni_init_list;
+static bool nni_inited = false;
+
static int
nni_init_helper(void)
{
int rv;
+ nni_mtx_init(&nni_init_mtx);
+ NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node);
+ nni_inited = true;
+
if (((rv = nni_taskq_sys_init()) != 0) ||
+ ((rv = nni_reap_sys_init()) != 0) ||
((rv = nni_timer_sys_init()) != 0) ||
((rv = nni_aio_sys_init()) != 0) ||
((rv = nni_random_sys_init()) != 0) ||
@@ -29,6 +40,7 @@ nni_init_helper(void)
((rv = nni_tran_sys_init()) != 0)) {
nni_fini();
}
+
return (rv);
}
@@ -41,14 +53,54 @@ nni_init(void)
void
nni_fini(void)
{
+ if (!nni_inited) {
+ return;
+ }
+ if (!nni_list_empty(&nni_init_list)) {
+ nni_initializer *init;
+
+ nni_mtx_lock(&nni_init_mtx);
+ while ((init = nni_list_first(&nni_init_list)) != NULL) {
+ if (init->i_fini != NULL) {
+ init->i_fini();
+ }
+ init->i_once = 0;
+ nni_list_remove(&nni_init_list, init);
+ }
+ nni_mtx_unlock(&nni_init_mtx);
+ }
nni_tran_sys_fini();
nni_proto_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
nni_sock_sys_fini();
nni_random_sys_fini();
+ nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_aio_sys_fini();
nni_timer_sys_fini();
nni_taskq_sys_fini();
+
+ nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
+ nni_inited = false;
+}
+
+int
+nni_initialize(nni_initializer *init)
+{
+ int rv;
+ if (init->i_once) {
+ return (0);
+ }
+ nni_mtx_lock(&nni_init_mtx);
+ if (init->i_once) {
+ nni_mtx_unlock(&nni_init_mtx);
+ return (0);
+ }
+ if ((rv = init->i_init()) == 0) {
+ init->i_once = 1;
+ nni_list_append(&nni_init_list, init);
+ }
+ nni_mtx_unlock(&nni_init_mtx);
+ return (rv);
}
diff --git a/src/core/init.h b/src/core/init.h
index ffcebf64..d21bb4c5 100644
--- a/src/core/init.h
+++ b/src/core/init.h
@@ -1,5 +1,7 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -21,4 +23,23 @@ int nni_init(void);
// that all resources used by the library are released back to the system.
void nni_fini(void);
+typedef struct nni_initializer {
+ int (*i_init)(void); // i_init is called exactly once
+ void (*i_fini)(void); // i_fini is called on shutdown
+ int i_once; // private -- initialize to zero
+ nni_list_node i_node; // private -- initialize to zero
+} nni_initializer;
+
+// nni_initialize will call the initialization routine exactly once. This is
+// done efficiently, so that if the caller has initialized already, then
+// subsequent calls are "cheap" (no synchronization cost). The initialization
+// function must not itself cause any further calls to nni_initialize; the
+// function should limit itself to initialization of locks and static data
+// structures. When shutting down, the finalizer will be called. The
+// order in which finalizers are called is unspecified.
+//
+// An initializer may fail (due to resource exhaustion), in which case the
+// return value of nni_initialize will be non-zero.
+int nni_initialize(nni_initializer *);
+
#endif // CORE_INIT_H
diff --git a/src/core/list.c b/src/core/list.c
index 8b26b64a..f489a705 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -1,5 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -151,7 +153,10 @@ nni_list_active(nni_list *list, void *item)
int
nni_list_empty(nni_list *list)
{
- return (list->ll_head.ln_next == &list->ll_head);
+ // The first check ensures that we treat an uninitialized list
+ // as empty. This use useful for statically initialized lists.
+ return ((list->ll_head.ln_next == NULL) ||
+ (list->ll_head.ln_next == &list->ll_head));
}
int
diff --git a/src/core/message.c b/src/core/message.c
index b44dfdf6..35153f01 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -9,7 +9,6 @@
//
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 87dd2de1..1f2297c1 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -38,6 +38,7 @@
#include "core/panic.h"
#include "core/protocol.h"
#include "core/random.h"
+#include "core/reap.h"
#include "core/strs.h"
#include "core/taskq.h"
#include "core/thread.h"
diff --git a/src/core/reap.c b/src/core/reap.c
new file mode 100644
index 00000000..8191dba3
--- /dev/null
+++ b/src/core/reap.c
@@ -0,0 +1,89 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include "reap.h"
+
+#include <stdbool.h>
+
+static nni_list nni_reap_list;
+static nni_mtx nni_reap_mtx;
+static nni_cv nni_reap_cv;
+static bool nni_reap_exit = false;
+static nni_thr nni_reap_thr;
+
+static void
+nni_reap_stuff(void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&nni_reap_mtx);
+ for (;;) {
+ nni_reap_item *item;
+ if ((item = nni_list_first(&nni_reap_list)) != NULL) {
+ nni_list_remove(&nni_reap_list, item);
+ nni_mtx_unlock(&nni_reap_mtx);
+
+ item->r_func(item->r_ptr);
+ nni_mtx_lock(&nni_reap_mtx);
+ continue;
+ }
+
+ if (nni_reap_exit) {
+ break;
+ }
+
+ nni_cv_wait(&nni_reap_cv);
+ }
+ nni_mtx_unlock(&nni_reap_mtx);
+}
+
+void
+nni_reap(nni_reap_item *item, nni_cb func, void *ptr)
+{
+ nni_mtx_lock(&nni_reap_mtx);
+ item->r_func = func;
+ item->r_ptr = ptr;
+ nni_list_append(&nni_reap_list, item);
+ nni_cv_wake(&nni_reap_cv);
+ nni_mtx_unlock(&nni_reap_mtx);
+}
+
+int
+nni_reap_sys_init(void)
+{
+ int rv;
+
+ NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link);
+ nni_mtx_init(&nni_reap_mtx);
+ nni_cv_init(&nni_reap_cv, &nni_reap_mtx);
+ nni_reap_exit = false;
+
+ // If this fails, we don't fail init, instead we will try to
+ // start up at reap time.
+ if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) {
+ nni_cv_fini(&nni_reap_cv);
+ nni_mtx_fini(&nni_reap_mtx);
+ return (rv);
+ }
+ nni_thr_run(&nni_reap_thr);
+ return (0);
+}
+
+void
+nni_reap_sys_fini(void)
+{
+ nni_mtx_lock(&nni_reap_mtx);
+ nni_reap_exit = true;
+ nni_cv_wake(&nni_reap_cv);
+ nni_mtx_unlock(&nni_reap_mtx);
+ nni_thr_fini(&nni_reap_thr);
+}
diff --git a/src/core/reap.h b/src/core/reap.h
new file mode 100644
index 00000000..fbc008b2
--- /dev/null
+++ b/src/core/reap.h
@@ -0,0 +1,36 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_REAP_H
+#define CORE_REAP_H
+
+#include "core/defs.h"
+#include "core/list.h"
+
+// nni_reap_item is defined here so that it can be inlined into
+// structures. Callers must access its members directly.
+typedef struct nni_reap_item {
+ nni_list_node r_link;
+ void * r_ptr;
+ nni_cb r_func;
+} nni_reap_item;
+
+// nni_reap performs an asynchronous reap of an item. This allows functions
+// it calls to acquire locks or resources without worrying about deadlocks
+// (such as from a completion callback.) The called function should avoid
+// blocking for too long if possible, since only one reap thread is present
+// in the system. The intended usage is for an nni_reap_item to be a member
+// of the structure to be reaped, and and then this function is called to
+// finalize it.
+extern void nni_reap(nni_reap_item *, nni_cb, void *);
+extern int nni_reap_sys_init(void);
+extern void nni_reap_sys_fini(void);
+
+#endif // CORE_REAP_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 54e1bd6a..409e4f66 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -775,22 +775,8 @@ nni_sock_closeall(void)
static void
nni_sock_normalize_expiration(nni_aio *aio, nni_duration def)
{
- if (aio->a_reltime) {
- if (aio->a_expire == (nni_time) -2) {
- aio->a_expire = def;
- }
- switch (aio->a_expire) {
- case (nni_time) 0:
- aio->a_expire = NNI_TIME_ZERO;
- break;
- case (nni_time) -1:
- aio->a_expire = NNI_TIME_NEVER;
- break;
- default:
- aio->a_expire = nni_clock() + aio->a_expire;
- break;
- }
- aio->a_reltime = 0;
+ if (aio->a_timeout == (nni_duration) -2) {
+ aio->a_timeout = def;
}
}
@@ -821,6 +807,18 @@ nni_sock_peer(nni_sock *sock)
return (sock->s_peer_id.p_id);
}
+const char *
+nni_sock_proto_name(nni_sock *sock)
+{
+ return (sock->s_self_id.p_name);
+}
+
+const char *
+nni_sock_peer_name(nni_sock *sock)
+{
+ return (sock->s_peer_id.p_name);
+}
+
void
nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 52310ef3..37c67436 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -14,14 +14,16 @@
extern int nni_sock_sys_init(void);
extern void nni_sock_sys_fini(void);
-extern int nni_sock_find(nni_sock **, uint32_t);
-extern void nni_sock_rele(nni_sock *);
-extern int nni_sock_open(nni_sock **, const nni_proto *);
-extern void nni_sock_close(nni_sock *);
-extern void nni_sock_closeall(void);
-extern int nni_sock_shutdown(nni_sock *);
-extern uint16_t nni_sock_proto(nni_sock *);
-extern uint16_t nni_sock_peer(nni_sock *);
+extern int nni_sock_find(nni_sock **, uint32_t);
+extern void nni_sock_rele(nni_sock *);
+extern int nni_sock_open(nni_sock **, const nni_proto *);
+extern void nni_sock_close(nni_sock *);
+extern void nni_sock_closeall(void);
+extern int nni_sock_shutdown(nni_sock *);
+extern uint16_t nni_sock_proto(nni_sock *);
+extern uint16_t nni_sock_peer(nni_sock *);
+extern const char *nni_sock_proto_name(nni_sock *);
+extern const char *nni_sock_peer_name(nni_sock *);
extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t);
extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *);
extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
diff --git a/src/core/strs.c b/src/core/strs.c
index 6cee605b..a03c0bb5 100644
--- a/src/core/strs.c
+++ b/src/core/strs.c
@@ -8,6 +8,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -17,35 +20,28 @@
// part of standard C99. (C11 has added some things here, but we cannot
// count on them.)
+// Note that we supply our own version of strdup and strfree unconditionally,
+// so that these can be freed with nni_free(strlen(s)+1) if desired. (Likewise
+// a string buffer allocated with nni_alloc can be freed with nni_strfree
+// provided the length is correct.)
+
char *
nni_strdup(const char *src)
{
-#ifdef NNG_HAVE_STRDUP
-#ifdef _WIN32
- return (_strdup(src));
-#else
- return (strdup(src));
-#endif
-#else
char * dst;
- size_t len = strlen(src);
+ size_t len = strlen(src) + 1;
if ((dst = nni_alloc(len)) != NULL) {
memcpy(dst, src, len);
}
return (dst);
-#endif
}
void
nni_strfree(char *s)
{
if (s != NULL) {
-#ifdef NNG_HAVE_STRDUP
- free(s);
-#else
nni_free(s, strlen(s) + 1);
-#endif
}
}
@@ -114,3 +110,72 @@ nni_strnlen(const char *s, size_t len)
return (n);
#endif
}
+
+char *
+nni_strcasestr(const char *s1, const char *s2)
+{
+#ifdef NNG_HAVE_STRCASESTR
+ return (strcasestr(s1, s2));
+#else
+ const char *t1, *t2;
+ while (*s1) {
+ for (t1 = s1, t2 = s2; *t1 && *t2; t2++, t1++) {
+ if (tolower(*t1) != tolower(*t2)) {
+ break;
+ }
+ }
+ if (*t2 == 0) {
+ return ((char *) s1);
+ }
+ s1++;
+ }
+ return (NULL);
+#endif
+}
+
+int
+nni_strncasecmp(const char *s1, const char *s2, size_t n)
+{
+#ifdef NNG_HAVE_STRNCASECMP
+#ifdef _WIN32
+ return (_strnicmp(s1, s2, n));
+#else
+ return (strncasecmp(s1, s2, n));
+#endif
+#else
+ for (int i = 0; i < n; i++) {
+ uint8_t c1 = (uint8_t) tolower(*s1++);
+ uint8_t c2 = (uint8_t) tolower(*s2++);
+ if (c1 == c2) {
+ if (c1 == 0) {
+ return (0);
+ }
+ continue;
+ }
+ return (c1 < c2 ? -1 : 1);
+ }
+ return (0);
+#endif
+}
+
+int
+nni_asprintf(char **sp, const char *fmt, ...)
+{
+ va_list ap;
+ size_t len;
+ char * s;
+
+ va_start(ap, fmt);
+ len = vsnprintf(NULL, 0, fmt, ap);
+ va_end(ap);
+ len++;
+
+ if ((s = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ va_start(ap, fmt);
+ (void) vsnprintf(s, len, fmt, ap);
+ va_end(ap);
+ *sp = s;
+ return (0);
+} \ No newline at end of file
diff --git a/src/core/strs.h b/src/core/strs.h
index 42ec4997..3b369fe4 100644
--- a/src/core/strs.h
+++ b/src/core/strs.h
@@ -18,5 +18,8 @@ extern void nni_strfree(char *);
extern size_t nni_strlcpy(char *, const char *, size_t);
extern size_t nni_strlcat(char *, const char *, size_t);
extern size_t nni_strnlen(const char *, size_t);
+extern char * nni_strcasestr(const char *, const char *);
+extern int nni_strncasecmp(const char *, const char *, size_t);
+extern int nni_asprintf(char **, const char *, ...);
#endif // CORE_STRS_H
diff --git a/src/core/transport.c b/src/core/transport.c
index 359b03fd..31da773f 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -13,6 +13,7 @@
#include "transport/ipc/ipc.h"
#include "transport/tcp/tcp.h"
#include "transport/tls/tls.h"
+#include "transport/ws/websocket.h"
#include "transport/zerotier/zerotier.h"
#include <stdio.h>
@@ -105,6 +106,72 @@ nni_tran_find(const char *addr)
return (NULL);
}
+// nni_tran_parse_host_port is a convenience routine to parse the host portion
+// of a URL (which includes a DNS name or IP address and an optional service
+// name or port, separated by a colon) into its host and port name parts. It
+// understands IPv6 address literals when surrounded by brackets ([]).
+// If either component is empty, then NULL is passed back for the value,
+// otherwise a string suitable for freeing with nni_strfree is supplied.
+int
+nni_tran_parse_host_port(const char *pair, char **hostp, char **portp)
+{
+ const char *hstart;
+ const char *pstart;
+ char * host;
+ char * port;
+ size_t hlen, plen;
+
+ if (pair[0] == '[') {
+ hstart = pair + 1;
+ hlen = 0;
+ while (hstart[hlen] != ']') {
+ if (hstart[hlen] == '\0') {
+ return (NNG_EADDRINVAL);
+ }
+ hlen++;
+ }
+ pstart = hstart + hlen + 1; // skip over the trailing ']'
+ } else {
+ // Normal thing.
+ hstart = pair;
+ hlen = 0;
+ while ((hstart[hlen] != ':') && (hstart[hlen] != '\0')) {
+ hlen++;
+ }
+ pstart = hstart + hlen;
+ }
+ if (pstart[0] == ':') {
+ pstart++;
+ }
+ plen = strlen(pstart);
+
+ host = NULL;
+ if (hostp) {
+ if ((hlen > 1) || ((hlen == 1) && (*hstart != '*'))) {
+ if ((host = nni_alloc(hlen + 1)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ memcpy(host, hstart, hlen);
+ host[hlen] = '\0';
+ }
+ }
+
+ port = NULL;
+ if ((plen != 0) && (portp)) {
+ if ((port = nni_strdup(pstart)) == NULL) {
+ nni_strfree(host);
+ return (NNG_ENOMEM);
+ }
+ }
+ if (hostp) {
+ *hostp = host;
+ }
+ if (portp) {
+ *portp = port;
+ }
+ return (0);
+}
+
int
nni_tran_chkopt(const char *name, const void *v, size_t sz)
{
@@ -154,9 +221,12 @@ static nni_tran_ctor nni_tran_ctors[] = {
#ifdef NNG_HAVE_TLS
nng_tls_register,
#endif
-#ifdef NNI_HAVE_ZEROTIER
+#ifdef NNG_HAVE_ZEROTIER
nng_zt_register,
#endif
+#ifdef NNG_HAVE_WEBSOCKET
+ nng_ws_register,
+#endif
NULL,
};
diff --git a/src/core/transport.h b/src/core/transport.h
index b82e2c92..e8a1f620 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -163,6 +163,9 @@ struct nni_tran_pipe {
nni_tran_pipe_option *p_options;
};
+// Utility for transports.
+extern int nni_tran_parse_host_port(const char *, char **, char **);
+
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(const char *);
diff --git a/src/nng.c b/src/nng.c
index db4a5c7c..6cd78e1d 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -969,8 +969,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
*app = (nng_aio *) aio;
}
- aio->a_expire = (nni_time) NNG_DURATION_DEFAULT;
- aio->a_reltime = 1;
+ aio->a_timeout = NNG_DURATION_DEFAULT;
return (rv);
}
@@ -1020,11 +1019,8 @@ void
nng_aio_set_timeout(nng_aio *ap, nng_duration dur)
{
// Durations here are relative, since we have no notion of a
- // common clock. But underlying aio uses absolute times normally.
- // Fortunately the absolute times are big enough; we just have to
- // make sure that we "convert" the timeout from relative time to
- // absolute time when submitting operations.
- nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur);
+ // common clock..
+ nni_aio_set_timeout((nni_aio *) ap, dur);
}
#if 0
diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c
index e03f8056..69d7e7ce 100644
--- a/src/platform/posix/posix_tcp.c
+++ b/src/platform/posix/posix_tcp.c
@@ -38,11 +38,11 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa,
return (rv);
}
- if (rsa->s_un.s_family != NNG_AF_UNSPEC) {
+ if ((rsa != NULL) && (rsa->s_un.s_family != NNG_AF_UNSPEC)) {
len = nni_posix_nn2sockaddr((void *) &ss, rsa);
nni_posix_epdesc_set_remote(ed, &ss, len);
}
- if (lsa->s_un.s_family != NNG_AF_UNSPEC) {
+ if ((lsa != NULL) && (lsa->s_un.s_family != NNG_AF_UNSPEC)) {
len = nni_posix_nn2sockaddr((void *) &ss, lsa);
nni_posix_epdesc_set_local(ed, &ss, len);
}
diff --git a/src/supplemental/base64/base64.c b/src/supplemental/base64/base64.c
index 95a4a491..2f6b4da2 100644
--- a/src/supplemental/base64/base64.c
+++ b/src/supplemental/base64/base64.c
@@ -1,6 +1,6 @@
//
// Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
-// Copyright 2017 Staysail Systems, Inc.
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"),
diff --git a/src/supplemental/base64/base64.h b/src/supplemental/base64/base64.h
index eb186ae0..68346435 100644
--- a/src/supplemental/base64/base64.h
+++ b/src/supplemental/base64/base64.h
@@ -1,6 +1,6 @@
//
// Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
-// Copyright 2017 Staysail Systems, Inc.
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"),
diff --git a/src/supplemental/http/CMakeLists.txt b/src/supplemental/http/CMakeLists.txt
new file mode 100644
index 00000000..dff06d70
--- /dev/null
+++ b/src/supplemental/http/CMakeLists.txt
@@ -0,0 +1,17 @@
+#
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+set(HTTP_SOURCES
+ supplemental/http/http.c
+ supplemental/http/http_msg.c
+ supplemental/http/server.c
+ supplemental/http/client.c
+ supplemental/http/http.h)
+set(NNG_SOURCES ${NNG_SOURCES} ${HTTP_SOURCES} PARENT_SCOPE)
diff --git a/src/supplemental/http/client.c b/src/supplemental/http/client.c
new file mode 100644
index 00000000..374ab5bb
--- /dev/null
+++ b/src/supplemental/http/client.c
@@ -0,0 +1,147 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+struct nni_http_client {
+ nng_sockaddr addr;
+ nni_list aios;
+ nni_mtx mtx;
+ bool closed;
+ bool tls;
+ nni_aio * connaio;
+ nni_plat_tcp_ep *tep;
+};
+
+static void
+http_conn_start(nni_http_client *c)
+{
+ nni_plat_tcp_ep_connect(c->tep, c->connaio);
+}
+
+static void
+http_conn_done(void *arg)
+{
+ nni_http_client * c = arg;
+ nni_aio * aio;
+ int rv;
+ nni_plat_tcp_pipe *p;
+ nni_http_tran t;
+ nni_http * http;
+
+ nni_mtx_lock(&c->mtx);
+ rv = nni_aio_result(c->connaio);
+ p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL;
+ if ((aio = nni_list_first(&c->aios)) == NULL) {
+ if (p != NULL) {
+ nni_plat_tcp_pipe_fini(p);
+ }
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+
+ t.h_data = p;
+ t.h_write = (void *) nni_plat_tcp_pipe_send;
+ t.h_read = (void *) nni_plat_tcp_pipe_recv;
+ t.h_close = (void *) nni_plat_tcp_pipe_close;
+ t.h_fini = (void *) nni_plat_tcp_pipe_fini;
+
+ if ((rv = nni_http_init(&http, &t)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_plat_tcp_pipe_fini(p);
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+
+ nni_aio_set_output(aio, 0, http);
+ nni_aio_finish(aio, 0, 0);
+
+ if (!nni_list_empty(&c->aios)) {
+ http_conn_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_http_client_fini(nni_http_client *c)
+{
+ nni_aio_fini(c->connaio);
+ nni_plat_tcp_ep_fini(c->tep);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+int
+nni_http_client_init(nni_http_client **cp, nng_sockaddr *sa)
+{
+ int rv;
+
+ nni_http_client *c;
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->addr = *sa;
+ rv = nni_plat_tcp_ep_init(&c->tep, NULL, &c->addr, NNI_EP_MODE_DIAL);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(c);
+ return (rv);
+ }
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->aios);
+
+ if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) {
+ nni_http_client_fini(c);
+ return (rv);
+ }
+ *cp = c;
+ return (0);
+}
+
+static void
+http_connect_cancel(nni_aio *aio, int rv)
+{
+ nni_http_client *c = aio->a_prov_data;
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ if (nni_list_empty(&c->aios)) {
+ nni_aio_cancel(c->connaio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_http_client_connect(nni_http_client *c, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_connect_cancel, aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ nni_list_append(&c->aios, aio);
+ if (nni_list_first(&c->aios) == aio) {
+ http_conn_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+} \ No newline at end of file
diff --git a/src/supplemental/http/http.c b/src/supplemental/http/http.c
new file mode 100644
index 00000000..3958a738
--- /dev/null
+++ b/src/supplemental/http/http.c
@@ -0,0 +1,604 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+// We insist that individual headers fit in 8K.
+// If you need more than that, you need something we can't do.
+#define HTTP_BUFSIZE 8192
+
+// types of reads
+enum read_flavor {
+ HTTP_RD_RAW,
+ HTTP_RD_FULL,
+ HTTP_RD_REQ,
+ HTTP_RD_RES,
+};
+
+enum write_flavor {
+ HTTP_WR_RAW,
+ HTTP_WR_FULL,
+ HTTP_WR_REQ,
+ HTTP_WR_RES,
+};
+
+#define SET_RD_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_RD_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+#define SET_WR_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_WR_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+
+struct nni_http {
+ void *sock;
+ void (*rd)(void *, nni_aio *);
+ void (*wr)(void *, nni_aio *);
+ void (*close)(void *);
+ void (*fini)(void *);
+
+ bool closed;
+
+ nni_list rdq; // high level http read requests
+ nni_list wrq; // high level http write requests
+
+ nni_aio *rd_aio; // bottom half read operations
+ nni_aio *wr_aio; // bottom half write operations
+
+ nni_mtx mtx;
+
+ uint8_t *rd_buf;
+ size_t rd_get;
+ size_t rd_put;
+ size_t rd_bufsz;
+};
+
+static void
+http_close(nni_http *http)
+{
+ // Call with lock held.
+ nni_aio *aio;
+
+ if (http->closed) {
+ return;
+ }
+
+ http->closed = true;
+ if (nni_list_first(&http->wrq)) {
+ nni_aio_cancel(http->wr_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->wrq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ if (nni_list_first(&http->rdq)) {
+ nni_aio_cancel(http->rd_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+
+ if (http->sock != NULL) {
+ http->close(http->sock);
+ }
+}
+
+void
+nni_http_close(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// http_rd_buf attempts to satisfy the read from data in the buffer.
+static int
+http_rd_buf(nni_http *http, nni_aio *aio)
+{
+ size_t cnt = http->rd_put - http->rd_get;
+ size_t n;
+ uint8_t *rbuf = http->rd_buf;
+ int i;
+ int rv;
+ bool raw = false;
+
+ rbuf += http->rd_get;
+
+ switch (GET_RD_FLAVOR(aio)) {
+ case HTTP_RD_RAW:
+ raw = true; // FALLTHROUGH
+ case HTTP_RD_FULL:
+ for (i = 0; (aio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ n = aio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ memcpy(aio->a_iov[0].iov_buf, rbuf, n);
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ http->rd_get += n;
+ rbuf += n;
+ aio->a_count += n;
+ cnt -= n;
+
+ if (aio->a_iov[0].iov_len == 0) {
+ aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ }
+ }
+
+ if ((aio->a_niov == 0) || (raw && (aio->a_count != 0))) {
+ // Finished the read. (We are finished if we either
+ // got *all* the data, or we got *some* data for
+ // a raw read.)
+ return (0);
+ }
+
+ // No more data left in the buffer, so use a physio.
+ // (Note that we get here if we either have not completed
+ // a full transaction on a FULL read, or were not even able
+ // to get *any* data for a partial RAW read.)
+ for (i = 0; i < aio->a_niov; i++) {
+ http->rd_aio->a_iov[i] = aio->a_iov[i];
+ }
+ nni_aio_set_data(http->rd_aio, 1, NULL);
+ http->rd_aio->a_niov = aio->a_niov;
+ http->rd(http->sock, http->rd_aio);
+ return (NNG_EAGAIN);
+
+ case HTTP_RD_REQ:
+ rv = nni_http_req_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+
+ case HTTP_RD_RES:
+ rv = nni_http_res_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+ }
+ return (NNG_EINVAL);
+}
+
+static void
+http_rd_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ int rv;
+
+ if (http->closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = http_rd_buf(http, aio);
+ }
+ switch (rv) {
+ case NNG_EAGAIN:
+ return;
+ case 0:
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, aio->a_count);
+ break;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ http_close(http);
+ break;
+ }
+ }
+}
+
+static void
+http_rd_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->rd_aio;
+ nni_aio * uaio;
+ size_t cnt;
+ int rv;
+
+ nni_mtx_lock(&http->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ if ((uaio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ cnt = nni_aio_count(aio);
+
+ // If we were reading into the buffer, then advance location(s).
+ if ((uaio = nni_aio_get_data(aio, 1)) != NULL) {
+ http->rd_put += cnt;
+ NNI_ASSERT(http->rd_put <= http->rd_bufsz);
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ // Otherwise we are completing a USER request, and there should
+ // be no data left in the user buffer.
+ NNI_ASSERT(http->rd_get == http->rd_put);
+
+ uaio = nni_list_first(&http->rdq);
+ NNI_ASSERT(uaio != NULL);
+
+ for (int i = 0; (uaio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ size_t n = uaio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ uaio->a_iov[0].iov_len -= n;
+ uaio->a_iov[0].iov_buf += n;
+ uaio->a_count += n;
+ cnt -= n;
+
+ if (uaio->a_iov[0].iov_len == 0) {
+ uaio->a_niov--;
+ for (i = 0; i < uaio->a_niov; i++) {
+ uaio->a_iov[i] = uaio->a_iov[i + 1];
+ }
+ }
+ }
+
+ // Resubmit the start. This will attempt to consume data
+ // from the read buffer (there won't be any), and then either
+ // complete the I/O (for HTTP_RD_RAW, or if there is nothing left),
+ // or submit another physio.
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->rdq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_rd_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->rdq, aio);
+ if (nni_list_first(&http->rdq) == aio) {
+ http_rd_start(http);
+ }
+}
+
+static void
+http_wr_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ if ((aio = nni_list_first(&http->wrq)) != NULL) {
+
+ for (int i = 0; i < aio->a_niov; i++) {
+ http->wr_aio->a_iov[i] = aio->a_iov[i];
+ }
+ http->wr_aio->a_niov = aio->a_niov;
+ http->wr(http->sock, http->wr_aio);
+ }
+}
+
+static void
+http_wr_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->wr_aio;
+ nni_aio * uaio;
+ int rv;
+ size_t n;
+
+ nni_mtx_lock(&http->mtx);
+
+ uaio = nni_list_first(&http->wrq);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // We failed to complete the aio.
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ if (uaio == NULL) {
+ // write canceled?
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ n = nni_aio_count(aio);
+ uaio->a_count += n;
+ if (GET_WR_FLAVOR(uaio) == HTTP_WR_RAW) {
+ // For raw data, we just send partial completion
+ // notices to the consumer.
+ goto done;
+ }
+ while (n) {
+ NNI_ASSERT(aio->a_niov != 0);
+
+ if (aio->a_iov[0].iov_len > n) {
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= aio->a_iov[0].iov_len;
+ for (int i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ aio->a_niov--;
+ }
+ if ((aio->a_niov != 0) && (aio->a_iov[0].iov_len != 0)) {
+ // We have more to transmit.
+ http->wr(http->sock, aio);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+done:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish(uaio, 0, uaio->a_count);
+
+ // Start next write if another is ready.
+ http_wr_start(http);
+
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->wrq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_wr_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->wrq, aio);
+ if (nni_list_first(&http->wrq) == aio) {
+ http_wr_start(http);
+ }
+}
+
+void
+nni_http_read_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_REQ);
+ aio->a_prov_extra[1] = req;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RES);
+ aio->a_prov_extra[1] = res;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_full(nni_http *http, nni_aio *aio)
+{
+ aio->a_count = 0;
+ SET_RD_FLAVOR(aio, HTTP_RD_FULL);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read(nni_http *http, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RAW);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_req_get_buf(req, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_REQ);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_res_get_buf(res, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_RES);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// Writer. As with nni_http_conn_write, this is used to write data on
+// a connection that has been "upgraded" (e.g. transformed to
+// websocket). It is an error to perform other HTTP exchanges on an
+// connection after this method is called. (This mostly exists to
+// support websocket.)
+void
+nni_http_write(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_RAW);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_full(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_FULL);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_fini(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ if ((http->sock != NULL) && (http->fini != NULL)) {
+ http->fini(http->sock);
+ http->sock = NULL;
+ }
+ nni_mtx_unlock(&http->mtx);
+ nni_aio_stop(http->wr_aio);
+ nni_aio_stop(http->rd_aio);
+ nni_aio_fini(http->wr_aio);
+ nni_aio_fini(http->rd_aio);
+ nni_free(http->rd_buf, http->rd_bufsz);
+ nni_mtx_fini(&http->mtx);
+ NNI_FREE_STRUCT(http);
+}
+
+int
+nni_http_init(nni_http **httpp, nni_http_tran *tran)
+{
+ nni_http *http;
+ int rv;
+
+ if ((http = NNI_ALLOC_STRUCT(http)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ if ((http->rd_buf = nni_alloc(http->rd_bufsz)) == NULL) {
+ NNI_FREE_STRUCT(http);
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&http->mtx);
+ nni_aio_list_init(&http->rdq);
+ nni_aio_list_init(&http->wrq);
+
+ if (((rv = nni_aio_init(&http->wr_aio, http_wr_cb, http)) != 0) ||
+ ((rv = nni_aio_init(&http->rd_aio, http_rd_cb, http)) != 0)) {
+ nni_http_fini(http);
+ return (rv);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ http->rd = tran->h_read;
+ http->wr = tran->h_write;
+ http->close = tran->h_close;
+ http->fini = tran->h_fini;
+ http->sock = tran->h_data;
+
+ *httpp = http;
+
+ return (0);
+}
diff --git a/src/supplemental/http/http.h b/src/supplemental/http/http.h
new file mode 100644
index 00000000..09c72f8a
--- /dev/null
+++ b/src/supplemental/http/http.h
@@ -0,0 +1,290 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_SUPPLEMENTAL_HTTP_HTTP_H
+#define NNG_SUPPLEMENTAL_HTTP_HTTP_H
+
+#include <stdbool.h>
+
+// nni_http_msg represents an HTTP request or response message.
+typedef struct nni_http_msg nni_http_msg;
+typedef struct nni_http_res nni_http_res;
+typedef struct nni_http_entity nni_http_entity;
+
+typedef struct nni_http_tran {
+ void *h_data;
+ void (*h_read)(void *, nni_aio *);
+ void (*h_write)(void *, nni_aio *);
+ void (*h_close)(void *);
+ void (*h_fini)(void *);
+} nni_http_tran;
+
+typedef struct nni_http_req nni_http_req;
+
+extern int nni_http_req_init(nni_http_req **);
+extern void nni_http_req_fini(nni_http_req *);
+extern void nni_http_req_reset(nni_http_req *);
+extern int nni_http_req_set_header(nni_http_req *, const char *, const char *);
+extern int nni_http_req_add_header(nni_http_req *, const char *, const char *);
+extern int nni_http_req_del_header(nni_http_req *, const char *);
+extern int nni_http_req_get_buf(nni_http_req *, void **, size_t *);
+extern int nni_http_req_set_method(nni_http_req *, const char *);
+extern int nni_http_req_set_version(nni_http_req *, const char *);
+extern int nni_http_req_set_uri(nni_http_req *, const char *);
+extern const char *nni_http_req_get_header(nni_http_req *, const char *);
+extern const char *nni_http_req_get_header(nni_http_req *, const char *);
+extern const char *nni_http_req_get_version(nni_http_req *);
+extern const char *nni_http_req_get_uri(nni_http_req *);
+extern const char *nni_http_req_get_method(nni_http_req *);
+extern int nni_http_req_parse(nni_http_req *, void *, size_t, size_t *);
+
+extern int nni_http_res_init(nni_http_res **);
+extern void nni_http_res_fini(nni_http_res *);
+extern void nni_http_res_reset(nni_http_res *);
+extern int nni_http_res_get_buf(nni_http_res *, void **, size_t *);
+extern int nni_http_res_set_header(nni_http_res *, const char *, const char *);
+extern int nni_http_res_add_header(nni_http_res *, const char *, const char *);
+extern int nni_http_res_del_header(nni_http_res *, const char *);
+extern int nni_http_res_set_version(nni_http_res *, const char *);
+extern int nni_http_res_set_status(nni_http_res *, int, const char *);
+extern const char *nni_http_res_get_header(nni_http_res *, const char *);
+extern const char *nni_http_res_get_version(nni_http_res *);
+extern const char *nni_http_res_get_reason(nni_http_res *);
+extern int nni_http_res_get_status(nni_http_res *);
+extern int nni_http_res_parse(nni_http_res *, void *, size_t, size_t *);
+extern int nni_http_res_set_data(nni_http_res *, const void *, size_t);
+extern int nni_http_res_copy_data(nni_http_res *, const void *, size_t);
+extern int nni_http_res_alloc_data(nni_http_res *, size_t);
+extern void nni_http_res_get_data(nni_http_res *, void **, size_t *);
+extern int nni_http_res_init_error(nni_http_res **, uint16_t);
+
+// HTTP status codes. This list is not exhaustive.
+enum { NNI_HTTP_STATUS_CONTINUE = 100,
+ NNI_HTTP_STATUS_SWITCHING = 101,
+ NNI_HTTP_STATUS_PROCESSING = 102,
+ NNI_HTTP_STATUS_OK = 200,
+ NNI_HTTP_STATUS_CREATED = 201,
+ NNI_HTTP_STATUS_ACCEPTED = 202,
+ NNI_HTTP_STATUS_NOT_AUTHORITATIVE = 203,
+ NNI_HTTP_STATUS_NO_CONTENT = 204,
+ NNI_HTTP_STATUS_RESET_CONTENT = 205,
+ NNI_HTTP_STATUS_PARTIAL_CONTENT = 206,
+ NNI_HTTP_STATUS_MULTI_STATUS = 207,
+ NNI_HTTP_STATUS_ALREADY_REPORTED = 208,
+ NNI_HTTP_STATUS_IM_USED = 226,
+ NNI_HTTP_STATUS_MULTIPLE_CHOICES = 300,
+ NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY = 301,
+ NNI_HTTP_STATUS_FOUND = 302,
+ NNI_HTTP_STATUS_SEE_OTHER = 303,
+ NNI_HTTP_STATUS_NOT_MODIFIED = 304,
+ NNI_HTTP_STATUS_USE_PROXY = 305,
+ NNI_HTTP_STATUS_TEMPORARY_REDIRECT = 307,
+ NNI_HTTP_STATUS_PERMANENT_REDIRECT = 308,
+ NNI_HTTP_STATUS_BAD_REQUEST = 400,
+ NNI_HTTP_STATUS_UNAUTHORIZED = 401,
+ NNI_HTTP_STATUS_PAYMENT_REQUIRED = 402,
+ NNI_HTTP_STATUS_FORBIDDEN = 403,
+ NNI_HTTP_STATUS_NOT_FOUND = 404,
+ NNI_HTTP_STATUS_METHOD_NOT_ALLOWED = 405,
+ NNI_HTTP_STATUS_NOT_ACCEPTABLE = 406,
+ NNI_HTTP_STATUS_PROXY_AUTH_REQUIRED = 407,
+ NNI_HTTP_STATUS_REQUEST_TIMEOUT = 408,
+ NNI_HTTP_STATUS_CONFLICT = 409,
+ NNI_HTTP_STATUS_GONE = 410,
+ NNI_HTTP_STATUS_LENGTH_REQUIRED = 411,
+ NNI_HTTP_STATUS_PRECONDITION_FAILED = 412,
+ NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE = 413,
+ NNI_HTTP_STATUS_URI_TOO_LONG = 414,
+ NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE = 415,
+ NNI_HTTP_STATUS_RANGE_NOT_SATISFIABLE = 416,
+ NNI_HTTP_STATUS_EXPECTATION_FAILED = 417,
+ NNI_HTTP_STATUS_TEAPOT = 418,
+ NNI_HTTP_STATUS_UNPROCESSABLE_ENTITY = 422,
+ NNI_HTTP_STATUS_LOCKED = 423,
+ NNI_HTTP_STATUS_FAILED_DEPENDENCY = 424,
+ NNI_HTTP_STATUS_UPGRADE_REQUIRED = 426,
+ NNI_HTTP_STATUS_PRECONDITION_REQUIRED = 428,
+ NNI_HTTP_STATUS_TOO_MANY_REQUESTS = 429,
+ NNI_HTTP_STATUS_HEADERS_TOO_LARGE = 431,
+ NNI_HTTP_STATUS_UNAVAIL_LEGAL_REASONS = 451,
+ NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR = 500,
+ NNI_HTTP_STATUS_NOT_IMPLEMENTED = 501,
+ NNI_HTTP_STATUS_BAD_GATEWAY = 502,
+ NNI_HTTP_STATUS_SERVICE_UNAVAILABLE = 503,
+ NNI_HTTP_STATUS_GATEWAY_TIMEOUT = 504,
+ NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP = 505,
+ NNI_HTTP_STATUS_VARIANT_ALSO_NEGOTIATES = 506,
+ NNI_HTTP_STATUS_INSUFFICIENT_STORAGE = 507,
+ NNI_HTTP_STATUS_LOOP_DETECTED = 508,
+ NNI_HTTP_STATUS_NOT_EXTENDED = 510,
+ NNI_HTTP_STATUS_NETWORK_AUTH_REQUIRED = 511,
+};
+
+// An HTTP connection is a connection over which messages are exchanged.
+// Generally, clients send request messages, and then read responses.
+// Servers, read requests, and write responses. However, we do not
+// require a 1:1 mapping between request and response here -- the application
+// is responsible for dealing with that.
+//
+// We only support HTTP/1.1, though using the nni_http_conn_read and
+// nni_http_conn_write low level methods, it is possible to write an upgrader
+// (such as websocket!) that might support e.g. HTTP/2 or reading data that
+// follows a legacy HTTP/1.0 message.
+//
+// Any error on the connection, including cancellation of a request, is fatal
+// the connection.
+typedef struct nni_http nni_http;
+
+extern int nni_http_init(nni_http **, nni_http_tran *);
+extern void nni_http_close(nni_http *);
+extern void nni_http_fini(nni_http *);
+
+// Reading messages -- the caller must supply a preinitialized (but otherwise
+// idle) message. We recommend the caller store this in the aio's user data.
+// Note that the iovs of the aio's are clobbered by these methods -- callers
+// must not use them for any other purpose.
+
+extern void nni_http_write_req(nni_http *, nni_http_req *, nni_aio *);
+extern void nni_http_write_res(nni_http *, nni_http_res *, nni_aio *);
+extern void nni_http_read_req(nni_http *, nni_http_req *, nni_aio *);
+extern void nni_http_read_res(nni_http *, nni_http_res *, nni_aio *);
+
+extern void nni_http_read(nni_http *, nni_aio *);
+extern void nni_http_read_full(nni_http *, nni_aio *);
+extern void nni_http_write(nni_http *, nni_aio *);
+extern void nni_http_write_full(nni_http *, nni_aio *);
+
+typedef struct nni_http_server nni_http_server;
+
+typedef struct {
+ // h_path is the relative URI that we are going to match against.
+ // Must not be NULL. Note that query parameters (things following
+ // a "?" at the end of the path) are ignored when matching. This
+ // field may not be NULL.
+ const char *h_path;
+
+ // h_method is the HTTP method to handle such as "GET" or "POST".
+ // Must not be empty or NULL. If the incoming method is HEAD, then
+ // the server will process HEAD the same as GET, but will not send
+ // any response body.
+ const char *h_method;
+
+ // h_host is used to match on a specific Host: entry. If left NULL,
+ // then this handler will match regardless of the Host: value.
+ const char *h_host;
+
+ // h_is_dir indicates that the path represents a directory, and
+ // any path which is a logically below it should also be matched.
+ // This means that "/phone" will match for "/phone/bob" but not
+ // "/phoneme/ma". Be advised that it is not possible to register
+ // a handler for a parent and a different handler for children.
+ // (This restriction may be lifted in the future.)
+ bool h_is_dir;
+
+ // h_is_upgrader is used for callbacks that "upgrade" (or steal)
+ // their connection. When this is true, the server framework
+ // assumes that the handler takes over *all* of the details of
+ // the connection. Consequently, the connection is disassociated
+ // from the framework, and no response is sent. (Upgraders are
+ // responsible for adopting the connection, including closing it
+ // when they are done, and for sending any HTTP response message.
+ // This is true even if an error occurs.)
+ bool h_is_upgrader;
+
+ // h_cb is a callback that handles the request. The conventions
+ // are as follows:
+ //
+ // inputs:
+ // 0 - nni_http * for the actual underlying HTTP channel
+ // 1 - nni_http_req * for the HTTP request object
+ // 2 - void * for the opaque pointer supplied at registration
+ //
+ // outputs:
+ // 0 - (optional) nni_http_res * for an HTTP response (see below)
+ //
+ // The callback may choose to return the a response object in output 0,
+ // in which case the framework will handle sending the reply.
+ // (Response entity content is also sent if the response data
+ // is not NULL.) The callback may instead do it's own replies, in
+ // which case the response output should be NULL.
+ //
+ // Note that any request entity data is *NOT* supplied automatically
+ // with the request object; the callback is expected to call the
+ // nni_http_read_data method to retrieve any message data based upon
+ // the presence of headers. (It may also call nni_http_read or
+ // nni_http_write on the channel as it sees fit.)
+ //
+ // Upgraders should call the completion routine immediately,
+ // once they have collected the request object and HTTP channel.
+ void (*h_cb)(nni_aio *);
+} nni_http_handler;
+
+// nni_http_server will look for an existing server with the same
+// socket address, or create one if one does not exist. The servers
+// are reference counted to permit sharing the server object across
+// multiple subsystems. The sockaddr matching is very limited though,
+// and the addresses must match *exactly*.
+extern int nni_http_server_init(nni_http_server **, nng_sockaddr *);
+
+// nni_http_server_fini drops the reference count on the server, and
+// if this was the last reference, closes down the server and frees
+// all related resources. It will not affect hijacked connections.
+extern void nni_http_server_fini(nni_http_server *);
+
+// nni_http_server_add_handler registers a new handler on the server.
+// This function will return NNG_EADDRINUSE if a conflicting handler
+// is already registered (i.e. a handler with the same value for Host,
+// Method, and URL.) The first parameter receives an opaque handle to
+// the handler, that can be used to unregister the handler later.
+extern int nni_http_server_add_handler(
+ void **, nni_http_server *, nni_http_handler *, void *);
+
+extern void nni_http_server_del_handler(nni_http_server *, void *);
+
+// nni_http_server_start starts listening on the supplied port.
+extern int nni_http_server_start(nni_http_server *);
+
+// nni_http_server_stop stops the server, closing the listening socket.
+// Connections that have been "upgraded" are unaffected. Connections
+// associated with a callback will complete their callback, and then close.
+extern void nni_http_server_stop(nni_http_server *);
+
+// nni_http_server_add_static is a short cut to add static
+// content handler to the server. The host may be NULL, and the
+// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL,
+// then application/octet stream will be the (probably bad) default.
+// The actual data is copied, and so the caller may discard it once
+// this function returns.
+extern int nni_http_server_add_static(nni_http_server *, const char *host,
+ const char *ctype, const char *uri, const void *, size_t);
+
+// nni_http_server_add file is a short cut to add a file-backed static
+// content handler to the server. The host may be NULL, and the
+// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL,
+// then the server will try to guess it based on the filename -- but only
+// a small number of file types are builtin. URI is the absolute URI
+// (sans hostname and scheme), and the path is the path on the local
+// filesystem where the file can be found.
+extern int nni_http_server_add_file(nni_http_server *, const char *host,
+ const char *ctype, const char *uri, const char *path);
+
+// TLS will use
+// extern int nni_http_server_start_tls(nni_http_server *, nng_sockaddr *,
+// nni_tls_config *);
+
+// Client stuff.
+
+typedef struct nni_http_client nni_http_client;
+
+extern int nni_http_client_init(nni_http_client **, nng_sockaddr *);
+extern void nni_http_client_fini(nni_http_client *);
+extern void nni_http_client_connect(nni_http_client *, nni_aio *);
+
+#endif // NNG_SUPPLEMENTAL_HTTP_HTTP_H
diff --git a/src/supplemental/http/http_msg.c b/src/supplemental/http/http_msg.c
new file mode 100644
index 00000000..2e245868
--- /dev/null
+++ b/src/supplemental/http/http_msg.c
@@ -0,0 +1,956 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+// Note that as we parse headers, the rule is that if a header is already
+// present, then we can append it to the existing header, separated by
+// a comma. From experience, for example, Firefox uses a Connection:
+// header with two values, "keepalive", and "upgrade".
+typedef struct http_header {
+ char * name;
+ char * value;
+ nni_list_node node;
+} http_header;
+
+struct nni_http_entity {
+ char * data;
+ size_t size; // allocated/expected size
+ size_t len; // current length
+ bool own; // if true, data is "ours", and should be freed
+};
+
+struct nni_http_req {
+ nni_list hdrs;
+ nni_http_entity data;
+ char * meth;
+ char * uri;
+ char * vers;
+ char * buf;
+ size_t bufsz;
+};
+
+struct nni_http_res {
+ nni_list hdrs;
+ nni_http_entity data;
+ int code;
+ char * rsn;
+ char * vers;
+ char * buf;
+ size_t bufsz;
+};
+
+static int
+http_set_string(char **strp, const char *val)
+{
+ char *news;
+ if ((news = nni_strdup(val)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_strfree(*strp);
+ *strp = news;
+ return (0);
+}
+
+static void
+http_headers_reset(nni_list *hdrs)
+{
+ http_header *h;
+ while ((h = nni_list_first(hdrs)) != NULL) {
+ nni_list_remove(hdrs, h);
+ if (h->name != NULL) {
+ nni_strfree(h->name);
+ }
+ if (h->value != NULL) {
+ nni_free(h->value, strlen(h->value) + 1);
+ }
+ NNI_FREE_STRUCT(h);
+ }
+}
+
+static void
+http_entity_reset(nni_http_entity *entity)
+{
+ if (entity->own && entity->size) {
+ nni_free(entity->data, entity->size);
+ }
+ entity->data = NULL;
+ entity->size = 0;
+ entity->own = false;
+}
+
+void
+nni_http_req_reset(nni_http_req *req)
+{
+ http_headers_reset(&req->hdrs);
+ http_entity_reset(&req->data);
+ nni_strfree(req->vers);
+ nni_strfree(req->meth);
+ nni_strfree(req->uri);
+ req->vers = req->meth = req->uri = NULL;
+ if (req->bufsz) {
+ req->buf[0] = '\0';
+ }
+}
+
+void
+nni_http_res_reset(nni_http_res *res)
+{
+ http_headers_reset(&res->hdrs);
+ http_entity_reset(&res->data);
+ nni_strfree(res->rsn);
+ nni_strfree(res->vers);
+ res->code = 0;
+ if (res->bufsz) {
+ res->buf[0] = '\0';
+ }
+}
+
+void
+nni_http_req_fini(nni_http_req *req)
+{
+ nni_http_req_reset(req);
+ if (req->bufsz) {
+ nni_free(req->buf, req->bufsz);
+ }
+ NNI_FREE_STRUCT(req);
+}
+
+void
+nni_http_res_fini(nni_http_res *res)
+{
+ nni_http_res_reset(res);
+ if (res->bufsz) {
+ nni_free(res->buf, res->bufsz);
+ }
+ NNI_FREE_STRUCT(res);
+}
+
+static int
+http_del_header(nni_list *hdrs, const char *key)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ nni_list_remove(hdrs, h);
+ nni_strfree(h->name);
+ nni_free(h->value, strlen(h->value) + 1);
+ NNI_FREE_STRUCT(h);
+ return (0);
+ }
+ }
+ return (NNG_ENOENT);
+}
+
+int
+nni_req_del_header(nni_http_req *req, const char *key)
+{
+ return (http_del_header(&req->hdrs, key));
+}
+
+int
+nni_res_del_header(nni_http_res *res, const char *key)
+{
+ return (http_del_header(&res->hdrs, key));
+}
+
+static int
+http_set_header(nni_list *hdrs, const char *key, const char *val)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ char * news;
+ size_t len = strlen(val) + 1;
+ if ((news = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ snprintf(news, len, "%s", val);
+ nni_free(h->value, strlen(h->value) + 1);
+ h->value = news;
+ return (0);
+ }
+ }
+
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((h->name = nni_strdup(key)) == NULL) {
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) {
+ nni_strfree(h->name);
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ strncpy(h->value, val, strlen(val) + 1);
+ nni_list_append(hdrs, h);
+ return (0);
+}
+
+int
+nni_http_req_set_header(nni_http_req *req, const char *key, const char *val)
+{
+ return (http_set_header(&req->hdrs, key, val));
+}
+
+int
+nni_http_res_set_header(nni_http_res *res, const char *key, const char *val)
+{
+ return (http_set_header(&res->hdrs, key, val));
+}
+
+static int
+http_add_header(nni_list *hdrs, const char *key, const char *val)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ char * news;
+ size_t len = strlen(h->value) + strlen(val) + 3;
+ if ((news = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ snprintf(news, len, "%s, %s", h->value, val);
+ nni_free(h->value, strlen(h->value) + 1);
+ h->value = news;
+ return (0);
+ }
+ }
+
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((h->name = nni_strdup(key)) == NULL) {
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) {
+ nni_strfree(h->name);
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ strncpy(h->value, val, strlen(val) + 1);
+ nni_list_append(hdrs, h);
+ return (0);
+}
+
+int
+nni_http_req_add_header(nni_http_req *req, const char *key, const char *val)
+{
+ return (http_add_header(&req->hdrs, key, val));
+}
+
+int
+nni_http_res_add_header(nni_http_res *res, const char *key, const char *val)
+{
+ return (http_add_header(&res->hdrs, key, val));
+}
+
+static const char *
+http_get_header(nni_list *hdrs, const char *key)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(h->name, key) == 0) {
+ return (h->value);
+ }
+ }
+ return (NULL);
+}
+
+const char *
+nni_http_req_get_header(nni_http_req *req, const char *key)
+{
+ return (http_get_header(&req->hdrs, key));
+}
+
+const char *
+nni_http_res_get_header(nni_http_res *res, const char *key)
+{
+ return (http_get_header(&res->hdrs, key));
+}
+
+// http_entity_set_data sets the entity, but does not update the
+// content-length header.
+static void
+http_entity_set_data(nni_http_entity *entity, const void *data, size_t size)
+{
+ if (entity->own) {
+ nni_free(entity->data, entity->size);
+ }
+ entity->data = (void *) data;
+ entity->size = size;
+ entity->own = false;
+}
+
+static int
+http_entity_alloc_data(nni_http_entity *entity, size_t size)
+{
+ void *newdata;
+ if ((newdata = nni_alloc(size)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ http_entity_set_data(entity, newdata, size);
+ entity->own = true;
+ return (0);
+}
+
+static int
+http_entity_copy_data(nni_http_entity *entity, const void *data, size_t size)
+{
+ int rv;
+ if ((rv = http_entity_alloc_data(entity, size)) == 0) {
+ memcpy(entity->data, data, size);
+ }
+ return (rv);
+}
+
+static int
+http_set_content_length(nni_http_entity *entity, nni_list *hdrs)
+{
+ char buf[16];
+ (void) snprintf(buf, sizeof(buf), "%u", (unsigned) entity->size);
+ return (http_set_header(hdrs, "Content-Length", buf));
+}
+
+static void
+http_entity_get_data(nni_http_entity *entity, void **datap, size_t *sizep)
+{
+ *datap = entity->data;
+ *sizep = entity->size;
+}
+
+void
+nni_http_req_get_data(nni_http_req *req, void **datap, size_t *sizep)
+{
+ http_entity_get_data(&req->data, datap, sizep);
+}
+
+void
+nni_http_res_get_data(nni_http_res *res, void **datap, size_t *sizep)
+{
+ http_entity_get_data(&res->data, datap, sizep);
+}
+
+int
+nni_http_req_set_data(nni_http_req *req, const void *data, size_t size)
+{
+ int rv;
+
+ http_entity_set_data(&req->data, data, size);
+ if ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0) {
+ http_entity_set_data(&req->data, NULL, 0);
+ }
+ return (rv);
+}
+
+int
+nni_http_res_set_data(nni_http_res *res, const void *data, size_t size)
+{
+ int rv;
+
+ http_entity_set_data(&res->data, data, size);
+ if ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0) {
+ http_entity_set_data(&res->data, NULL, 0);
+ }
+ return (rv);
+}
+
+int
+nni_http_req_copy_data(nni_http_req *req, const void *data, size_t size)
+{
+ int rv;
+
+ if (((rv = http_entity_copy_data(&req->data, data, size)) != 0) ||
+ ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0)) {
+ http_entity_set_data(&req->data, NULL, 0);
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_http_res_copy_data(nni_http_res *res, const void *data, size_t size)
+{
+ int rv;
+
+ if (((rv = http_entity_copy_data(&res->data, data, size)) != 0) ||
+ ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0)) {
+ http_entity_set_data(&res->data, NULL, 0);
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_http_res_alloc_data(nni_http_res *res, size_t size)
+{
+ return (http_entity_alloc_data(&res->data, size));
+}
+
+static int
+http_parse_header(nni_list *hdrs, void *line)
+{
+ http_header *h;
+ char * key = line;
+ char * val;
+ char * end;
+
+ // Find separation between key and value
+ if ((val = strchr(key, ':')) == NULL) {
+ return (NNG_EPROTO);
+ }
+
+ // Trim leading and trailing whitespace from header
+ *val = '\0';
+ val++;
+ while (*val == ' ' || *val == '\t') {
+ val++;
+ }
+ end = val + strlen(val);
+ end--;
+ while ((end > val) && (*end == ' ' || *end == '\t')) {
+ *end = '\0';
+ end--;
+ }
+
+ return (http_add_header(hdrs, key, val));
+}
+
+// http_sprintf_headers makes headers for an HTTP request or an HTTP response
+// object. Each header is dumped from the list. If the buf is NULL,
+// or the sz is 0, then a dryrun is done, in order to allow the caller to
+// determine how much space is needed. Returns the size of the space needed,
+// not including the terminating NULL byte. Truncation occurs if the size
+// returned is >= the requested size.
+static size_t
+http_sprintf_headers(char *buf, size_t sz, nni_list *list)
+{
+ size_t rv = 0;
+ http_header *h;
+
+ if (buf == NULL) {
+ sz = 0;
+ }
+
+ NNI_LIST_FOREACH (list, h) {
+ size_t l;
+ l = snprintf(buf, sz, "%s: %s\r\n", h->name, h->value);
+ if (buf != NULL) {
+ buf += l;
+ }
+ sz = (sz > l) ? sz - l : 0;
+ rv += l;
+ }
+ return (rv);
+}
+
+static int
+http_asprintf(char **bufp, size_t *szp, nni_list *hdrs, const char *fmt, ...)
+{
+ va_list ap;
+ size_t len;
+ size_t n;
+ char * buf;
+
+ va_start(ap, fmt);
+ len = vsnprintf(NULL, 0, fmt, ap);
+ va_end(ap);
+
+ len += http_sprintf_headers(NULL, 0, hdrs);
+ len += 5; // \r\n\r\n\0
+
+ if (len <= *szp) {
+ buf = *bufp;
+ } else {
+ if ((buf = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_free(*bufp, *szp);
+ *bufp = buf;
+ *szp = len;
+ }
+ va_start(ap, fmt);
+ n = vsnprintf(buf, len, fmt, ap);
+ va_end(ap);
+ buf += n;
+ len -= n;
+ n = http_sprintf_headers(buf, len, hdrs);
+ buf += n;
+ len -= n;
+ snprintf(buf, len, "\r\n");
+ return (0);
+}
+
+static int
+http_req_prepare(nni_http_req *req)
+{
+ int rv;
+ if ((req->uri == NULL) || (req->meth == NULL)) {
+ return (NNG_EINVAL);
+ }
+ rv = http_asprintf(&req->buf, &req->bufsz, &req->hdrs, "%s %s %s\r\n",
+ req->meth, req->uri, req->vers != NULL ? req->vers : "HTTP/1.1");
+ return (rv);
+}
+
+static int
+http_res_prepare(nni_http_res *res)
+{
+ int rv;
+ rv = http_asprintf(&res->buf, &res->bufsz, &res->hdrs, "%s %d %s\r\n",
+ res->vers != NULL ? res->vers : "HTTP/1.1", res->code,
+ res->rsn != NULL ? res->rsn : "Unknown Error");
+ return (rv);
+}
+
+int
+nni_http_req_get_buf(nni_http_req *req, void **data, size_t *szp)
+{
+ int rv;
+
+ if ((req->buf == NULL) && (rv = http_req_prepare(req)) != 0) {
+ return (rv);
+ }
+ *data = req->buf;
+ *szp = strlen(req->buf);
+ return (0);
+}
+
+int
+nni_http_res_get_buf(nni_http_res *res, void **data, size_t *szp)
+{
+ int rv;
+
+ if ((res->buf == NULL) && (rv = http_res_prepare(res)) != 0) {
+ return (rv);
+ }
+ *data = res->buf;
+ *szp = strlen(res->buf);
+ return (0);
+}
+
+int
+nni_http_req_init(nni_http_req **reqp)
+{
+ nni_http_req *req;
+ if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&req->hdrs, http_header, node);
+ req->buf = NULL;
+ req->bufsz = 0;
+ req->data.data = NULL;
+ req->data.size = 0;
+ req->data.own = false;
+ req->vers = NULL;
+ req->meth = NULL;
+ req->uri = NULL;
+ *reqp = req;
+ return (0);
+}
+
+int
+nni_http_res_init(nni_http_res **resp)
+{
+ nni_http_res *res;
+ if ((res = NNI_ALLOC_STRUCT(res)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&res->hdrs, http_header, node);
+ res->buf = NULL;
+ res->bufsz = 0;
+ res->data.data = NULL;
+ res->data.size = 0;
+ res->data.own = false;
+ res->vers = NULL;
+ res->rsn = NULL;
+ res->code = 0;
+ *resp = res;
+ return (0);
+}
+
+const char *
+nni_http_req_get_method(nni_http_req *req)
+{
+ return (req->meth);
+}
+
+const char *
+nni_http_req_get_uri(nni_http_req *req)
+{
+ return (req->uri);
+}
+
+const char *
+nni_http_req_get_version(nni_http_req *req)
+{
+ return (req->vers);
+}
+
+const char *
+nni_http_res_get_version(nni_http_res *res)
+{
+ return (res->vers);
+}
+
+int
+nni_http_req_set_version(nni_http_req *req, const char *vers)
+{
+ return (http_set_string(&req->vers, vers));
+}
+
+int
+nni_http_res_set_version(nni_http_res *res, const char *vers)
+{
+ return (http_set_string(&res->vers, vers));
+}
+
+int
+nni_http_req_set_uri(nni_http_req *req, const char *uri)
+{
+ return (http_set_string(&req->uri, uri));
+}
+
+int
+nni_http_req_set_method(nni_http_req *req, const char *meth)
+{
+ return (http_set_string(&req->meth, meth));
+}
+
+int
+nni_http_res_set_status(nni_http_res *res, int status, const char *reason)
+{
+ int rv;
+ if ((rv = http_set_string(&res->rsn, reason)) == 0) {
+ res->code = status;
+ }
+ return (rv);
+}
+
+int
+nni_http_res_get_status(nni_http_res *res)
+{
+ return (res->code);
+}
+
+const char *
+nni_http_res_get_reason(nni_http_res *res)
+{
+ return (res->rsn);
+}
+
+static int
+http_scan_line(void *vbuf, size_t n, size_t *lenp)
+{
+ size_t len;
+ char lc;
+ char * buf = vbuf;
+
+ lc = 0;
+ for (len = 0; len < n; len++) {
+ char c = buf[len];
+ if (c == '\n') {
+ // Technically we should be receiving CRLF, but
+ // debugging is easier with just LF, so we behave
+ // following Postel's Law.
+ if (lc != '\r') {
+ buf[len] = '\0';
+ } else {
+ buf[len - 1] = '\0';
+ }
+ *lenp = len + 1;
+ return (0);
+ }
+ // If we have a control character (other than CR), or a CR
+ // followed by anything other than LF, then its an error.
+ if (((c < ' ') && (c != '\r')) || (lc == '\r')) {
+ return (NNG_EPROTO);
+ }
+ lc = c;
+ }
+ // Scanned the entire content, but did not find a line.
+ return (NNG_EAGAIN);
+}
+
+static int
+http_req_parse_line(nni_http_req *req, void *line)
+{
+ int rv;
+ char *method;
+ char *uri;
+ char *version;
+
+ method = line;
+ if ((uri = strchr(method, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *uri = '\0';
+ uri++;
+
+ if ((version = strchr(uri, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *version = '\0';
+ version++;
+
+ if (((rv = nni_http_req_set_method(req, method)) != 0) ||
+ ((rv = nni_http_req_set_uri(req, uri)) != 0) ||
+ ((rv = nni_http_req_set_version(req, version)) != 0)) {
+ return (rv);
+ }
+ return (0);
+}
+
+static int
+http_res_parse_line(nni_http_res *res, uint8_t *line)
+{
+ int rv;
+ char *reason;
+ char *codestr;
+ char *version;
+ int status;
+
+ version = (char *) line;
+ if ((codestr = strchr(version, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *codestr = '\0';
+ codestr++;
+
+ if ((reason = strchr(codestr, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *reason = '\0';
+ reason++;
+
+ status = atoi(codestr);
+ if ((status < 100) || (status > 999)) {
+ return (NNG_EPROTO);
+ }
+
+ if (((rv = nni_http_res_set_status(res, status, reason)) != 0) ||
+ ((rv = nni_http_res_set_version(res, version)) != 0)) {
+ return (rv);
+ }
+ return (0);
+}
+
+// nni_http_req_parse parses a request (but not any attached entity data).
+// The amount of data consumed is returned in lenp. Returns zero on
+// success, NNG_EPROTO on parse failure, NNG_EAGAIN if more data is
+// required, or NNG_ENOMEM on memory exhaustion. Note that lenp may
+// be updated even in the face of errors (esp. NNG_EAGAIN, which is
+// not an error so much as a request for more data.)
+int
+nni_http_req_parse(nni_http_req *req, void *buf, size_t n, size_t *lenp)
+{
+
+ size_t len = 0;
+ size_t cnt;
+ int rv = 0;
+
+ for (;;) {
+ uint8_t *line;
+ if ((rv = http_scan_line(buf, n, &cnt)) != 0) {
+ break;
+ }
+
+ len += cnt;
+ line = buf;
+ buf = line + cnt;
+ n -= cnt;
+
+ if (*line == '\0') {
+ break;
+ }
+
+ if (req->vers != NULL) {
+ rv = http_parse_header(&req->hdrs, line);
+ } else {
+ rv = http_req_parse_line(req, line);
+ }
+
+ if (rv != 0) {
+ break;
+ }
+ }
+
+ *lenp = len;
+ return (rv);
+}
+
+int
+nni_http_res_parse(nni_http_res *res, void *buf, size_t n, size_t *lenp)
+{
+
+ size_t len = 0;
+ size_t cnt;
+ int rv = 0;
+ for (;;) {
+ uint8_t *line;
+ if ((rv = http_scan_line(buf, n, &cnt)) != 0) {
+ break;
+ }
+
+ len += cnt;
+ line = buf;
+ buf = line + cnt;
+ n -= cnt;
+
+ if (*line == '\0') {
+ break;
+ }
+
+ if (res->vers != NULL) {
+ rv = http_parse_header(&res->hdrs, line);
+ } else {
+ rv = http_res_parse_line(res, line);
+ }
+
+ if (rv != 0) {
+ break;
+ }
+ }
+
+ *lenp = len;
+ return (rv);
+}
+
+int
+nni_http_res_init_error(nni_http_res **resp, uint16_t err)
+{
+ char * rsn;
+ char rsnbuf[80];
+ char html[1024];
+ nni_http_res *res;
+
+ if ((nni_http_res_init(&res)) != 0) {
+ return (NNG_ENOMEM);
+ }
+
+ // Note that it is expected that redirect URIs will update the
+ // payload to reflect the target location.
+ switch (err) {
+ case NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY:
+ rsn = "Moved Permanently";
+ break;
+ case NNI_HTTP_STATUS_MULTIPLE_CHOICES:
+ rsn = "Multiple Choices";
+ break;
+ case NNI_HTTP_STATUS_FOUND:
+ rsn = "Found";
+ break;
+ case NNI_HTTP_STATUS_SEE_OTHER:
+ rsn = "See Other";
+ break;
+ case NNI_HTTP_STATUS_TEMPORARY_REDIRECT:
+ rsn = "Temporary Redirect";
+ break;
+ case NNI_HTTP_STATUS_BAD_REQUEST:
+ rsn = "Bad Request";
+ break;
+ case NNI_HTTP_STATUS_UNAUTHORIZED:
+ rsn = "Unauthorized";
+ break;
+ case NNI_HTTP_STATUS_PAYMENT_REQUIRED:
+ rsn = "Payment Required";
+ break;
+ case NNI_HTTP_STATUS_NOT_FOUND:
+ rsn = "Not Found";
+ break;
+ case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ // Caller must also supply an Allow: header
+ rsn = "Method Not Allowed";
+ break;
+ case NNI_HTTP_STATUS_NOT_ACCEPTABLE:
+ rsn = "Not Acceptable";
+ break;
+ case NNI_HTTP_STATUS_REQUEST_TIMEOUT:
+ rsn = "Request Timeout";
+ break;
+ case NNI_HTTP_STATUS_CONFLICT:
+ rsn = "Conflict";
+ break;
+ case NNI_HTTP_STATUS_GONE:
+ rsn = "Gone";
+ break;
+ case NNI_HTTP_STATUS_LENGTH_REQUIRED:
+ rsn = "Length Required";
+ break;
+ case NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE:
+ rsn = "Payload Too Large";
+ break;
+ case NNI_HTTP_STATUS_FORBIDDEN:
+ rsn = "Forbidden";
+ break;
+ case NNI_HTTP_STATUS_URI_TOO_LONG:
+ rsn = "URI Too Long";
+ break;
+ case NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE:
+ rsn = "Unsupported Media Type";
+ break;
+ case NNI_HTTP_STATUS_EXPECTATION_FAILED:
+ rsn = "Expectation Failed";
+ break;
+ case NNI_HTTP_STATUS_UPGRADE_REQUIRED:
+ // Caller must add "Upgrade:" header.
+ rsn = "Upgrade Required";
+ break;
+ case NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR:
+ rsn = "Internal Server Error";
+ break;
+ case NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP:
+ rsn = "HTTP version not supported";
+ break;
+ case NNI_HTTP_STATUS_NOT_IMPLEMENTED:
+ rsn = "Not Implemented";
+ break;
+ case NNI_HTTP_STATUS_SERVICE_UNAVAILABLE:
+ rsn = "Service Unavailable";
+ break;
+ default:
+ snprintf(rsnbuf, sizeof(rsnbuf), "HTTP error code %d", err);
+ rsn = rsnbuf;
+ break;
+ }
+
+ // very simple builtin error page
+ snprintf(html, sizeof(html),
+ "<head><title>%d %s</title></head>"
+ "<body><p/><h1 align=\"center\">"
+ "<span style=\"font-size: 36px; border-radius: 5px; "
+ "background-color: black; color: white; padding: 7px; "
+ "font-family: Arial, sans serif;\">%d</span></h1>"
+ "<p align=\"center\">"
+ "<span style=\"font-size: 24px; font-family: Arial, sans serif;\">"
+ "%s</span></p></body>",
+ err, rsn, err, rsn);
+
+ nni_http_res_set_status(res, err, rsn);
+ nni_http_res_copy_data(res, html, strlen(html));
+ nni_http_res_set_version(res, "HTTP/1.1");
+ nni_http_res_set_header(
+ res, "Content-Type", "text/html; charset=UTF-8");
+ // We could set the date, but we don't necessarily have a portable
+ // way to get the time of day.
+
+ *resp = res;
+ return (0);
+}
diff --git a/src/supplemental/http/server.c b/src/supplemental/http/server.c
new file mode 100644
index 00000000..15b04f80
--- /dev/null
+++ b/src/supplemental/http/server.c
@@ -0,0 +1,1103 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+static int http_server_sys_init(void);
+static void http_server_sys_fini(void);
+
+static nni_initializer http_server_initializer = {
+ .i_init = http_server_sys_init,
+ .i_fini = http_server_sys_fini,
+ .i_once = 0,
+};
+
+typedef struct http_handler {
+ nni_list_node node;
+ void * h_arg;
+ char * h_path;
+ char * h_method;
+ char * h_host;
+ bool h_is_upgrader;
+ bool h_is_dir;
+ void (*h_cb)(nni_aio *);
+ void (*h_free)(void *);
+} http_handler;
+
+typedef struct http_sconn {
+ nni_list_node node;
+ nni_http * http;
+ nni_http_server *server;
+ nni_http_req * req;
+ nni_http_res * res;
+ bool close;
+ bool closed;
+ bool finished;
+ nni_aio * cbaio;
+ nni_aio * rxaio;
+ nni_aio * txaio;
+ nni_aio * txdataio;
+ nni_http_tran tran;
+} http_sconn;
+
+struct nni_http_server {
+ nng_sockaddr addr;
+ nni_list_node node;
+ int refcnt;
+ int starts;
+ nni_list handlers;
+ nni_list conns;
+ nni_list reaps;
+ nni_mtx mtx;
+ nni_cv cv;
+ bool closed;
+ bool tls;
+ nni_task cleanup;
+ nni_aio * accaio;
+ nni_plat_tcp_ep *tep;
+};
+
+static nni_list http_servers;
+static nni_mtx http_servers_lk;
+
+static void
+http_sconn_fini(void *arg)
+{
+ http_sconn *sc = arg;
+ NNI_ASSERT(!sc->finished);
+ sc->finished = true;
+ nni_aio_stop(sc->rxaio);
+ nni_aio_stop(sc->txaio);
+ nni_aio_stop(sc->txdataio);
+ nni_aio_stop(sc->cbaio);
+ if (sc->http != NULL) {
+ nni_http_fini(sc->http);
+ }
+ if (sc->req != NULL) {
+ nni_http_req_fini(sc->req);
+ }
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ }
+ nni_aio_fini(sc->rxaio);
+ nni_aio_fini(sc->txaio);
+ nni_aio_fini(sc->txdataio);
+ nni_aio_fini(sc->cbaio);
+ NNI_FREE_STRUCT(sc);
+}
+
+static void
+http_sconn_close(http_sconn *sc)
+{
+ nni_http_server *s;
+ s = sc->server;
+
+ NNI_ASSERT(!sc->finished);
+ nni_mtx_lock(&s->mtx);
+ if (!sc->closed) {
+ nni_http *h;
+ sc->closed = true;
+ // Close the underlying transport.
+ if ((h = sc->http) != NULL) {
+ nni_http_close(h);
+ }
+ if (nni_list_node_active(&sc->node)) {
+ nni_list_remove(&s->conns, sc);
+ nni_list_append(&s->reaps, sc);
+ }
+ nni_task_dispatch(&s->cleanup);
+ } else {
+ nni_panic("DOUBLE CLOSE!\n");
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+http_sconn_txdatdone(void *arg)
+{
+ http_sconn *sc = arg;
+ nni_aio * aio = sc->txdataio;
+
+ if (nni_aio_result(aio) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ sc->res = NULL;
+ }
+
+ if (sc->close) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+}
+
+static void
+http_sconn_txdone(void *arg)
+{
+ http_sconn *sc = arg;
+ nni_aio * aio = sc->txaio;
+ int rv;
+ void * data;
+ size_t size;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ // For HEAD requests, we just treat like "GET" but don't send
+ // the data. (Required per HTTP.)
+ if (strcmp(nni_http_req_get_method(sc->req), "HEAD") == 0) {
+ size = 0;
+ } else {
+ nni_http_res_get_data(sc->res, &data, &size);
+ }
+ if (size) {
+ // Submit data.
+ sc->txdataio->a_niov = 1;
+ sc->txdataio->a_iov[0].iov_buf = data;
+ sc->txdataio->a_iov[0].iov_len = size;
+ nni_http_write_full(sc->http, sc->txdataio);
+ return;
+ }
+
+ if (sc->close) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ sc->res = NULL;
+ }
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+}
+
+static char
+http_hexval(char c)
+{
+ if ((c >= '0') && (c <= '9')) {
+ return (c - '0');
+ }
+ if ((c >= 'a') && (c <= 'f')) {
+ return ((c - 'a') + 10);
+ }
+ if ((c >= 'A') && (c <= 'F')) {
+ return ((c - 'A') + 10);
+ }
+ return (0);
+}
+
+static char *
+http_uri_canonify(char *path)
+{
+ char *tmp;
+ char *dst;
+
+ // Chomp off query string.
+ if ((tmp = strchr(path, '?')) != NULL) {
+ *tmp = '\0';
+ }
+ // If the URI was absolute, make it relative.
+ if ((strncasecmp(path, "http://", strlen("http://")) == 0) ||
+ (strncasecmp(path, "https://", strlen("https://")) == 0)) {
+ // Skip past the ://
+ path = strchr(path, ':');
+ path += 3;
+
+ // scan for the end of the host, distinguished by a /
+ // path delimiter. There might not be one, in which case
+ // the whole thing is the host and we assume the path is
+ // just /.
+ if ((path = strchr(path, '/')) == NULL) {
+ return ("/");
+ }
+ }
+
+ // Now we have to unescape things. Unescaping is a shrinking
+ // operation (strictly), so this is safe. This is just URL decode.
+ // Note that paths with an embedded NUL are going to be treated as
+ // though truncated. Don't be that guy that sends %00 in a URL.
+ tmp = path;
+ dst = path;
+ while (*tmp != '\0') {
+ char c;
+ if ((c = *tmp) != '%') {
+ *dst++ = c;
+ tmp++;
+ continue;
+ }
+ if (isxdigit(tmp[1]) && isxdigit(tmp[2])) {
+ c = http_hexval(tmp[1]);
+ c *= 16;
+ c += http_hexval(tmp[2]);
+ *dst++ = c;
+ tmp += 3;
+ }
+ // garbage in, garbage out
+ *dst++ = c;
+ tmp++;
+ }
+ *dst = '\0';
+ return (path);
+}
+
+static void
+http_sconn_error(http_sconn *sc, uint16_t err)
+{
+ nni_http_res *res;
+
+ if (nni_http_res_init_error(&res, err) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ sc->res = res;
+ nni_http_write_res(sc->http, res, sc->txaio);
+}
+
+static void
+http_sconn_rxdone(void *arg)
+{
+ http_sconn * sc = arg;
+ nni_http_server *s = sc->server;
+ nni_aio * aio = sc->rxaio;
+ int rv;
+ http_handler * h;
+ const char * val;
+ nni_http_req * req = sc->req;
+ char * uri;
+ size_t urisz;
+ char * path;
+ char * tmp;
+ bool badmeth = false;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ // Validate the request -- it has to at least look like HTTP 1.x
+ // We flatly refuse to deal with HTTP 0.9, and we can't cope with
+ // HTTP/2.
+ if ((val = nni_http_req_get_version(req)) == NULL) {
+ sc->close = true;
+ http_sconn_error(sc, NNI_HTTP_STATUS_BAD_REQUEST);
+ return;
+ }
+ if (strncmp(val, "HTTP/1.", 7) != 0) {
+ sc->close = true;
+ http_sconn_error(sc, NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP);
+ return;
+ }
+ if (strcmp(val, "HTTP/1.1") != 0) {
+ // We treat HTTP/1.0 connections as non-persistent.
+ // No effort is made to handle "persistent" HTTP/1.0
+ // since that was not standard. (Everyone is at 1.1 now
+ // anyways.)
+ sc->close = true;
+ }
+
+ // If the connection was 1.0, or a connection: close was requested,
+ // then mark this close on our end.
+ if ((val = nni_http_req_get_header(req, "Connection")) != NULL) {
+ // HTTP 1.1 says these have to be case insensitive (7230)
+ if (nni_strcasestr(val, "close") != NULL) {
+ // In theory this could falsely match some other weird
+ // connection header that included the word close not
+ // as part of a whole token. No such legal definitions
+ // exist, and so anyone who does that gets what they
+ // deserve. (Fairly harmless actually, since it only
+ // prevents persistent connections.)
+ sc->close = true;
+ }
+ }
+
+ val = nni_http_req_get_uri(req);
+ urisz = strlen(val) + 1;
+ if ((uri = nni_alloc(urisz)) == NULL) {
+ http_sconn_close(sc); // out of memory
+ return;
+ }
+ strncpy(uri, val, urisz);
+ path = http_uri_canonify(uri);
+
+ NNI_LIST_FOREACH (&s->handlers, h) {
+ size_t len;
+ if (h->h_host != NULL) {
+ val = nni_http_req_get_header(req, "Host");
+ if (val == NULL) {
+ // We insist on a matching Host: line for
+ // virtual hosting. This leaves HTTP/1.0
+ // out in the cold basically.
+ continue;
+ }
+
+ // A few ways hosts can match. They might have
+ // a port attached -- we ignore that. (We don't
+ // run multiple ports, so if you got here, presumably
+ // the port at least is correct!) It might also have
+ // a lone trailing dot, so that is ok too.
+
+ // Ignore the trailing dot if the handler supplied it.
+ len = strlen(h->h_host);
+ if ((len > 0) && (h->h_host[len - 1] == '.')) {
+ len--;
+ }
+ if ((nni_strncasecmp(val, h->h_host, len) != 0)) {
+ continue;
+ }
+ if ((val[len] != '\0') && (val[len] != ':') &&
+ ((val[len] != '.') || (val[len + 1] != '\0'))) {
+ continue;
+ }
+ }
+
+ NNI_ASSERT(h->h_method != NULL);
+
+ len = strlen(h->h_path);
+ if (strncmp(path, h->h_path, len) != 0) {
+ continue;
+ }
+ switch (path[len]) {
+ case '\0':
+ break;
+ case '/':
+ if ((path[len + 1] != '\0') && (!h->h_is_dir)) {
+ // trailing component and not a directory.
+ // Note that this should force a failure.
+ continue;
+ }
+ break;
+ default:
+ continue; // some other substring, not matched.
+ }
+
+ // So, what about the method?
+ val = nni_http_req_get_method(req);
+ if (strcmp(val, h->h_method) == 0) {
+ break;
+ }
+ // HEAD is remapped to GET.
+ if ((strcmp(val, "HEAD") == 0) &&
+ (strcmp(h->h_method, "GET") == 0)) {
+ break;
+ }
+ badmeth = 1;
+ }
+
+ nni_free(uri, urisz);
+ if (h == NULL) {
+ if (badmeth) {
+ http_sconn_error(
+ sc, NNI_HTTP_STATUS_METHOD_NOT_ALLOWED);
+ } else {
+ http_sconn_error(sc, NNI_HTTP_STATUS_NOT_FOUND);
+ }
+ return;
+ }
+
+ nni_aio_set_input(sc->cbaio, 0, sc->http);
+ nni_aio_set_input(sc->cbaio, 1, sc->req);
+ nni_aio_set_input(sc->cbaio, 2, h->h_arg);
+ nni_aio_set_data(sc->cbaio, 1, h);
+
+ // Technically, probably callback should initialize this with
+ // start, but we do it instead.
+
+ if (nni_aio_start(sc->cbaio, NULL, NULL) == 0) {
+ h->h_cb(sc->cbaio);
+ }
+}
+
+static void
+http_sconn_cbdone(void *arg)
+{
+ http_sconn * sc = arg;
+ nni_aio * aio = sc->cbaio;
+ nni_http_res *res;
+ http_handler *h;
+
+ if (nni_aio_result(aio) != 0) {
+ // Hard close, no further feedback.
+ http_sconn_close(sc);
+ return;
+ }
+
+ h = nni_aio_get_data(aio, 1);
+ res = nni_aio_get_output(aio, 0);
+
+ // If its an upgrader, and they didn't give us back a response, it
+ // means that they took over, and we should just discard this session,
+ // without closing the underlying channel.
+ if ((h->h_is_upgrader) && (res == NULL)) {
+ sc->http = NULL; // the underlying HTTP is not closed
+ sc->req = NULL;
+ sc->res = NULL;
+ http_sconn_close(sc); // discard server session though
+ return;
+ }
+ if (res != NULL) {
+
+ const char *val;
+ val = nni_http_res_get_header(res, "Connection");
+ if ((val != NULL) && (strstr(val, "close") != NULL)) {
+ sc->close = true;
+ }
+ if (sc->close) {
+ nni_http_res_set_header(res, "Connection", "close");
+ }
+ sc->res = res;
+ nni_http_write_res(sc->http, res, sc->txaio);
+ } else if (sc->close) {
+ http_sconn_close(sc);
+ } else {
+ // Presumably client already sent a response.
+ // Wait for another request.
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+ }
+}
+
+static int
+http_sconn_init(http_sconn **scp, nni_plat_tcp_pipe *tcp)
+{
+ http_sconn *sc;
+ int rv;
+
+ if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_http_req_init(&sc->req)) != 0) ||
+ ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) ||
+ ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) ||
+ ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) !=
+ 0) ||
+ ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) {
+ // Can't even accept the incoming request. Hard close.
+ http_sconn_close(sc);
+ return (rv);
+ }
+ // XXX: for HTTPS we would then try to do the TLS negotiation here.
+ // That would use a different set of tran values.
+
+ sc->tran.h_data = tcp;
+ sc->tran.h_read = (void *) nni_plat_tcp_pipe_recv;
+ sc->tran.h_write = (void *) nni_plat_tcp_pipe_send;
+ sc->tran.h_close = (void *) nni_plat_tcp_pipe_close; // close implied
+ sc->tran.h_fini = (void *) nni_plat_tcp_pipe_fini;
+
+ if ((rv = nni_http_init(&sc->http, &sc->tran)) != 0) {
+ http_sconn_close(sc);
+ return (rv);
+ }
+ *scp = sc;
+ return (0);
+}
+
+static void
+http_server_acccb(void *arg)
+{
+ nni_http_server * s = arg;
+ nni_aio * aio = s->accaio;
+ nni_plat_tcp_pipe *tcp;
+ http_sconn * sc;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ if (rv == NNG_ECLOSED) {
+ return;
+ }
+ // try again?
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return;
+ }
+ tcp = nni_aio_get_pipe(aio);
+ if (http_sconn_init(&sc, tcp) != 0) {
+ nni_plat_tcp_pipe_close(tcp);
+ nni_plat_tcp_pipe_fini(tcp);
+
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return;
+ }
+ nni_mtx_lock(&s->mtx);
+ sc->server = s;
+ if (s->closed) {
+ nni_http_close(sc->http);
+ sc->closed = true;
+ nni_list_append(&s->reaps, sc);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_append(&s->conns, sc);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+}
+
+static void
+http_server_cleanup(void *arg)
+{
+ nni_http_server *s = arg;
+ http_sconn * sc;
+ nni_mtx_lock(&s->mtx);
+ while ((sc = nni_list_first(&s->reaps)) != NULL) {
+ nni_list_remove(&s->reaps, sc);
+ nni_mtx_unlock(&s->mtx);
+ http_sconn_fini(sc);
+ nni_mtx_lock(&s->mtx);
+ }
+ if (nni_list_empty(&s->reaps) && nni_list_empty(&s->conns)) {
+ nni_cv_wake(&s->cv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+http_handler_fini(http_handler *h)
+{
+ nni_strfree(h->h_path);
+ nni_strfree(h->h_host);
+ nni_strfree(h->h_method);
+ if (h->h_free != NULL) {
+ h->h_free(h->h_arg);
+ }
+ NNI_FREE_STRUCT(h);
+}
+
+static void
+http_server_fini(nni_http_server *s)
+{
+ http_handler *h;
+
+ nni_mtx_lock(&s->mtx);
+ while ((!nni_list_empty(&s->conns)) || (!nni_list_empty(&s->reaps))) {
+ nni_cv_wait(&s->cv);
+ }
+ if (s->tep != NULL) {
+ nni_plat_tcp_ep_fini(s->tep);
+ }
+ while ((h = nni_list_first(&s->handlers)) != NULL) {
+ nni_list_remove(&s->handlers, h);
+ http_handler_fini(h);
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_task_wait(&s->cleanup);
+ nni_aio_fini(s->accaio);
+ nni_cv_fini(&s->cv);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+void
+nni_http_server_fini(nni_http_server *s)
+{
+ nni_mtx_lock(&http_servers_lk);
+ s->refcnt--;
+ if (s->refcnt == 0) {
+ nni_list_remove(&http_servers, s);
+ http_server_fini(s);
+ }
+ nni_mtx_unlock(&http_servers_lk);
+}
+
+static int
+http_server_init(nni_http_server **serverp, nng_sockaddr *sa)
+{
+ nni_http_server *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->mtx);
+ nni_cv_init(&s->cv, &s->mtx);
+ nni_task_init(NULL, &s->cleanup, http_server_cleanup, s);
+ NNI_LIST_INIT(&s->handlers, http_handler, node);
+ NNI_LIST_INIT(&s->conns, http_sconn, node);
+ NNI_LIST_INIT(&s->reaps, http_sconn, node);
+ if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) {
+ http_server_fini(s);
+ return (rv);
+ }
+ s->addr = *sa;
+ *serverp = s;
+ return (0);
+}
+
+int
+nni_http_server_init(nni_http_server **serverp, nng_sockaddr *sa)
+{
+ int rv;
+ nni_http_server *s;
+
+ nni_initialize(&http_server_initializer);
+
+ nni_mtx_lock(&http_servers_lk);
+ NNI_LIST_FOREACH (&http_servers, s) {
+ switch (sa->s_un.s_family) {
+ case NNG_AF_INET:
+ if (memcmp(&s->addr.s_un.s_in, &sa->s_un.s_in,
+ sizeof(sa->s_un.s_in)) == 0) {
+ *serverp = s;
+ s->refcnt++;
+ nni_mtx_unlock(&http_servers_lk);
+ return (0);
+ }
+ break;
+ case NNG_AF_INET6:
+ if (memcmp(&s->addr.s_un.s_in6, &sa->s_un.s_in6,
+ sizeof(sa->s_un.s_in6)) == 0) {
+ *serverp = s;
+ s->refcnt++;
+ nni_mtx_unlock(&http_servers_lk);
+ return (0);
+ }
+ break;
+ }
+ }
+
+ // We didn't find a server, try to make a new one.
+ if ((rv = http_server_init(&s, sa)) == 0) {
+ s->addr = *sa;
+ s->refcnt = 1;
+ nni_list_append(&http_servers, s);
+ *serverp = s;
+ }
+
+ nni_mtx_unlock(&http_servers_lk);
+ return (rv);
+}
+
+static int
+http_server_start(nni_http_server *s)
+{
+ int rv;
+
+ rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN);
+ if (rv != 0) {
+ return (rv);
+ }
+ if ((rv = nni_plat_tcp_ep_listen(s->tep)) != 0) {
+ nni_plat_tcp_ep_fini(s->tep);
+ s->tep = NULL;
+ return (rv);
+ }
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return (0);
+}
+
+int
+nni_http_server_start(nni_http_server *s)
+{
+ int rv = 0;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->starts == 0) {
+ rv = http_server_start(s);
+ }
+ if (rv == 0) {
+ s->starts++;
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static void
+http_server_stop(nni_http_server *s)
+{
+ http_sconn *sc;
+
+ if (s->closed) {
+ return;
+ }
+
+ s->closed = true;
+ // Close the TCP endpoint that is listening.
+ if (s->tep) {
+ nni_plat_tcp_ep_close(s->tep);
+ }
+
+ // This marks the server as "shutting down" -- existing
+ // connections finish their activity and close.
+ //
+ // XXX: figure out how to shut down connections that are
+ // blocked waiting to receive a request. We won't do this for
+ // upgraded connections...
+ NNI_LIST_FOREACH (&s->conns, sc) {
+ sc->close = true;
+ }
+}
+
+void
+nni_http_server_stop(nni_http_server *s)
+{
+ nni_mtx_lock(&s->mtx);
+ s->starts--;
+ if (s->starts == 0) {
+ http_server_stop(s);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+int
+http_server_add_handler(void **hp, nni_http_server *s, nni_http_handler *hh,
+ void *arg, void (*freeit)(void *))
+{
+ http_handler *h, *h2;
+ size_t l1, l2;
+
+ // Must have a legal method (and not one that is HEAD), path,
+ // and handler. (The reason HEAD is verboten is that we supply
+ // it automatically as part of GET support.)
+ if ((hh->h_method == NULL) || (hh->h_path == NULL) ||
+ (hh->h_cb == NULL) || (strcmp(hh->h_method, "HEAD") == 0)) {
+ return (NNG_EINVAL);
+ }
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ h->h_arg = arg;
+ h->h_cb = hh->h_cb;
+ h->h_is_dir = hh->h_is_dir;
+ h->h_is_upgrader = hh->h_is_upgrader;
+ h->h_free = freeit;
+
+ // Ignore the port part of the host.
+ if (hh->h_host != NULL) {
+ int rv;
+ rv = nni_tran_parse_host_port(hh->h_host, &h->h_host, NULL);
+ if (rv != 0) {
+ http_handler_fini(h);
+ return (rv);
+ }
+ }
+
+ if (((h->h_method = nni_strdup(hh->h_method)) == NULL) ||
+ ((h->h_path = nni_strdup(hh->h_path)) == NULL)) {
+ http_handler_fini(h);
+ return (NNG_ENOMEM);
+ }
+
+ l1 = strlen(h->h_path);
+ // Chop off trailing "/"
+ while (l1 > 0) {
+ if (h->h_path[l1 - 1] != '/') {
+ break;
+ }
+ l1--;
+ h->h_path[l1] = '\0';
+ }
+
+ nni_mtx_lock(&s->mtx);
+ // General rule for finding a conflict is that if either string
+ // is a strict substring of the other, then we have a
+ // collision. (But only if the methods match, and the host
+ // matches.) Note that a wild card host matches both.
+ NNI_LIST_FOREACH (&s->handlers, h2) {
+ if ((h2->h_host != NULL) && (h->h_host != NULL) &&
+ (strcasecmp(h2->h_host, h->h_host) != 0)) {
+ // Hosts don't match, so we are safe.
+ continue;
+ }
+ if (strcmp(h2->h_method, h->h_method) != 0) {
+ // Different methods, so again we are fine.
+ continue;
+ }
+ l2 = strlen(h2->h_path);
+ if (l1 < l2) {
+ l2 = l1;
+ }
+ if (strncmp(h2->h_path, h->h_path, l2) == 0) {
+ // Path collision. NNG_EADDRINUSE.
+ nni_mtx_unlock(&s->mtx);
+ http_handler_fini(h);
+ return (NNG_EADDRINUSE);
+ }
+ }
+ nni_list_append(&s->handlers, h);
+ nni_mtx_unlock(&s->mtx);
+ if (hp != NULL) {
+ *hp = h;
+ }
+ return (0);
+}
+
+int
+nni_http_server_add_handler(
+ void **hp, nni_http_server *s, nni_http_handler *hh, void *arg)
+{
+ return (http_server_add_handler(hp, s, hh, arg, NULL));
+}
+
+void
+nni_http_server_del_handler(nni_http_server *s, void *harg)
+{
+ http_handler *h = harg;
+
+ nni_mtx_lock(&s->mtx);
+ nni_list_node_remove(&h->node);
+ nni_mtx_unlock(&s->mtx);
+
+ http_handler_fini(h);
+}
+
+// Very limited MIME type map. Used only if the handler does not
+// supply it's own.
+static struct content_map {
+ const char *ext;
+ const char *typ;
+} content_map[] = {
+ // clang-format off
+ { ".ai", "application/postscript" },
+ { ".aif", "audio/aiff" },
+ { ".aiff", "audio/aiff" },
+ { ".avi", "video/avi" },
+ { ".au", "audio/basic" },
+ { ".bin", "application/octet-stream" },
+ { ".bmp", "image/bmp" },
+ { ".css", "text/css" },
+ { ".eps", "application/postscript" },
+ { ".gif", "image/gif" },
+ { ".htm", "text/html" },
+ { ".html", "text/html" },
+ { ".ico", "image/x-icon" },
+ { ".jpeg", "image/jpeg" },
+ { ".jpg", "image/jpeg" },
+ { ".js", "application/javascript" },
+ { ".md", "text/markdown" },
+ { ".mp2", "video/mpeg" },
+ { ".mp3", "audio/mpeg3" },
+ { ".mpeg", "video/mpeg" },
+ { ".mpg", "video/mpeg" },
+ { ".pdf", "application/pdf" },
+ { ".png", "image/png" },
+ { ".ps", "application/postscript" },
+ { ".rtf", "text/rtf" },
+ { ".text", "text/plain" },
+ { ".tif", "image/tiff" },
+ { ".tiff", "image/tiff" },
+ { ".txt", "text/plain" },
+ { ".wav", "audio/wav"},
+ { "README", "text/plain" },
+ { NULL, NULL },
+ // clang-format on
+};
+
+const char *
+http_lookup_type(const char *path)
+{
+ size_t l1 = strlen(path);
+ for (int i = 0; content_map[i].ext != NULL; i++) {
+ size_t l2 = strlen(content_map[i].ext);
+ if (l2 > l1) {
+ continue;
+ }
+ if (strcasecmp(&path[l1 - l2], content_map[i].ext) == 0) {
+ return (content_map[i].typ);
+ }
+ }
+ return (NULL);
+}
+
+typedef struct http_file {
+ char *typ;
+ char *pth;
+} http_file;
+
+static void
+http_handle_file(nni_aio *aio)
+{
+ http_file * f = nni_aio_get_input(aio, 2);
+ nni_http_res *res = NULL;
+ void * data;
+ size_t size;
+ int rv;
+
+ if ((rv = nni_plat_file_get(f->pth, &data, &size)) != 0) {
+ uint16_t status;
+ switch (rv) {
+ case NNG_ENOMEM:
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ break;
+ case NNG_ENOENT:
+ status = NNI_HTTP_STATUS_NOT_FOUND;
+ break;
+ case NNG_EPERM:
+ status = NNI_HTTP_STATUS_FORBIDDEN;
+ break;
+ default:
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ break;
+ }
+ if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ } else {
+ if (((rv = nni_http_res_init(&res)) != 0) ||
+ ((rv = nni_http_res_set_status(
+ res, NNI_HTTP_STATUS_OK, "OK")) != 0) ||
+ ((rv = nni_http_res_set_header(
+ res, "Content-Type", f->typ)) != 0) ||
+ ((rv = nni_http_res_set_data(res, data, size)) != 0)) {
+ nni_free(data, size);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ }
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+http_free_file(void *arg)
+{
+ http_file *f = arg;
+ nni_strfree(f->pth);
+ nni_strfree(f->typ);
+ NNI_FREE_STRUCT(f);
+}
+
+int
+nni_http_server_add_file(nni_http_server *s, const char *host,
+ const char *ctype, const char *uri, const char *path)
+{
+ nni_http_handler h;
+ http_file * f;
+ int rv;
+
+ if ((f = NNI_ALLOC_STRUCT(f)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (ctype == NULL) {
+ ctype = http_lookup_type(path);
+ }
+
+ if (((f->pth = nni_strdup(path)) == NULL) ||
+ ((ctype != NULL) && ((f->typ = nni_strdup(ctype)) == NULL))) {
+ http_free_file(f);
+ return (NNG_ENOMEM);
+ }
+ h.h_method = "GET";
+ h.h_path = uri;
+ h.h_host = host;
+ h.h_cb = http_handle_file;
+ h.h_is_dir = false;
+ h.h_is_upgrader = false;
+
+ if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_file)) !=
+ 0) {
+ http_free_file(f);
+ return (rv);
+ }
+ return (0);
+}
+
+typedef struct http_static {
+ char * typ;
+ void * data;
+ size_t size;
+} http_static;
+
+static void
+http_handle_static(nni_aio *aio)
+{
+ http_static * s = nni_aio_get_input(aio, 2);
+ nni_http_res *r = NULL;
+ int rv;
+
+ if (((rv = nni_http_res_init(&r)) != 0) ||
+ ((rv = nni_http_res_set_header(r, "Content-Type", s->typ)) != 0) ||
+ ((rv = nni_http_res_set_status(r, NNI_HTTP_STATUS_OK, "OK")) !=
+ 0) ||
+ ((rv = nni_http_res_set_data(r, s->data, s->size)) != 0)) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_aio_set_output(aio, 0, r);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+http_free_static(void *arg)
+{
+ http_static *s = arg;
+ nni_strfree(s->typ);
+ nni_free(s->data, s->size);
+ NNI_FREE_STRUCT(s);
+}
+
+int
+nni_http_server_add_static(nni_http_server *s, const char *host,
+ const char *ctype, const char *uri, const void *data, size_t size)
+{
+ nni_http_handler h;
+ http_static * f;
+ int rv;
+
+ if ((f = NNI_ALLOC_STRUCT(f)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (ctype == NULL) {
+ ctype = "application/octet-stream";
+ }
+ if (((f->data = nni_alloc(size)) == NULL) ||
+ ((f->typ = nni_strdup(ctype)) == NULL)) {
+ http_free_static(f);
+ return (NNG_ENOMEM);
+ }
+
+ f->size = size;
+ memcpy(f->data, data, size);
+
+ h.h_method = "GET";
+ h.h_path = uri;
+ h.h_host = host;
+ h.h_cb = http_handle_static;
+ h.h_is_dir = false;
+ h.h_is_upgrader = false;
+
+ if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_static)) !=
+ 0) {
+ http_free_static(f);
+ return (rv);
+ }
+ return (0);
+}
+
+static int
+http_server_sys_init(void)
+{
+ NNI_LIST_INIT(&http_servers, nni_http_server, node);
+ nni_mtx_init(&http_servers_lk);
+ return (0);
+}
+
+static void
+http_server_sys_fini(void)
+{
+ nni_mtx_fini(&http_servers_lk);
+} \ No newline at end of file
diff --git a/src/supplemental/mbedtls/tls.c b/src/supplemental/mbedtls/tls.c
index a33c72b4..94503df5 100644
--- a/src/supplemental/mbedtls/tls.c
+++ b/src/supplemental/mbedtls/tls.c
@@ -417,7 +417,7 @@ nni_tls_send_cb(void *ctx)
aio->a_niov = 1;
aio->a_iov[0].iov_buf = tp->sendbuf + tp->sendoff;
aio->a_iov[0].iov_len = tp->sendlen;
- nni_aio_set_timeout(aio, -1); // No timeout.
+ nni_aio_set_timeout(aio, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_send(tp->tcp, aio);
nni_mtx_unlock(&tp->lk);
return;
@@ -455,7 +455,7 @@ nni_tls_recv_start(nni_tls *tp)
aio->a_niov = 1;
aio->a_iov[0].iov_buf = tp->recvbuf;
aio->a_iov[0].iov_len = NNG_TLS_MAX_RECV_SIZE;
- nni_aio_set_timeout(tp->tcp_recv, -1); // No timeout.
+ nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_recv(tp->tcp, aio);
}
@@ -526,7 +526,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len)
tp->tcp_send->a_niov = 1;
tp->tcp_send->a_iov[0].iov_buf = tp->sendbuf;
tp->tcp_send->a_iov[0].iov_len = len;
- nni_aio_set_timeout(tp->tcp_send, -1); // No timeout.
+ nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send);
return (len);
}
diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt
new file mode 100644
index 00000000..f3283257
--- /dev/null
+++ b/src/supplemental/websocket/CMakeLists.txt
@@ -0,0 +1,14 @@
+#
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+set(WEBSOCKET_SOURCES
+ supplemental/websocket/websocket.c
+ supplemental/websocket/websocket.h)
+set(NNG_SOURCES ${NNG_SOURCES} ${WEBSOCKET_SOURCES} PARENT_SCOPE)
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
new file mode 100644
index 00000000..fe0a9bd9
--- /dev/null
+++ b/src/supplemental/websocket/websocket.c
@@ -0,0 +1,1935 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/base64/base64.h"
+#include "supplemental/http/http.h"
+#include "supplemental/sha1/sha1.h"
+
+#include "websocket.h"
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct ws_frame ws_frame;
+typedef struct ws_msg ws_msg;
+
+struct nni_ws {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_list_node node;
+ nni_reap_item reap;
+ bool closed;
+ bool ready;
+ bool wclose;
+ nni_mtx mtx;
+ nni_list txmsgs;
+ nni_list rxmsgs;
+ ws_frame * txframe;
+ ws_frame * rxframe;
+ nni_aio * txaio; // physical aios
+ nni_aio * rxaio;
+ nni_aio * closeaio;
+ nni_aio * httpaio; // server only, HTTP reply pending
+ nni_http * http;
+ nni_http_req *req;
+ nni_http_res *res;
+ size_t maxframe;
+ size_t fragsize;
+};
+
+struct nni_ws_listener {
+ nni_tls_config * tls;
+ nni_http_server * server;
+ char * proto;
+ char * url;
+ char * host;
+ char * serv;
+ char * path;
+ nni_mtx mtx;
+ nni_list pend;
+ nni_list reply;
+ nni_list aios;
+ bool started;
+ bool closed;
+ void * hp; // handler pointer
+ nni_http_handler handler;
+ nni_ws_listen_hook hookfn;
+ void * hookarg;
+};
+
+// The dialer tracks user aios in two lists. The first list is for aios
+// waiting for the http connection to be established, while the second
+// are waiting for the HTTP negotiation to complete. We keep two lists
+// so we know whether to initiate another outgoing connection after the
+// completion of an earlier connection. (We don't want to establish
+// requests when we already have connects negotiating.)
+struct nni_ws_dialer {
+ nni_tls_config * tls;
+ nni_http_req * req;
+ nni_http_res * res;
+ nni_http_client *client;
+ nni_mtx mtx;
+ nni_aio * conaio;
+ char * proto;
+ char * host;
+ char * serv;
+ char * path;
+ char * qinfo;
+ char * addr; // full address (a URL really)
+ char * uri; // path + query
+ nni_list conaios; // user aios waiting for connect.
+ nni_list httpaios; // user aios waiting for HTTP nego.
+ bool started;
+ bool closed;
+ nng_sockaddr sa;
+};
+
+typedef enum ws_type {
+ WS_CONT = 0x0,
+ WS_TEXT = 0x1,
+ WS_BINARY = 0x2,
+ WS_CLOSE = 0x8,
+ WS_PING = 0x9,
+ WS_PONG = 0xA,
+} ws_type;
+
+typedef enum ws_reason {
+ WS_CLOSE_NORMAL_CLOSE = 1000,
+ WS_CLOSE_GOING_AWAY = 1001,
+ WS_CLOSE_PROTOCOL_ERR = 1002,
+ WS_CLOSE_UNSUPP_FORMAT = 1003,
+ WS_CLOSE_INVALID_DATA = 1007,
+ WS_CLOSE_POLICY = 1008,
+ WS_CLOSE_TOO_BIG = 1009,
+ WS_CLOSE_NO_EXTENSION = 1010,
+ WS_CLOSE_INTERNAL = 1011,
+} ws_reason;
+
+struct ws_frame {
+ nni_list_node node;
+ uint8_t head[14]; // maximum header size
+ uint8_t mask[4]; // read by server, sent by client
+ uint8_t sdata[125]; // short data (for short frames only)
+ size_t hlen; // header length
+ size_t len; // payload length
+ enum ws_type op;
+ bool final;
+ bool masked;
+ size_t bufsz; // allocated size
+ uint8_t * buf;
+ ws_msg * wmsg;
+};
+
+struct ws_msg {
+ nni_list frames;
+ nni_list_node node;
+ nni_ws * ws;
+ nni_msg * msg;
+ nni_aio * aio;
+};
+
+static void ws_send_close(nni_ws *ws, uint16_t code);
+
+// This looks, case independently for a word in a list, which is either
+// space or comma separated.
+static bool
+ws_contains_word(const char *phrase, const char *word)
+{
+ size_t len = strlen(word);
+
+ while ((phrase != NULL) && (*phrase != '\0')) {
+ if ((nni_strncasecmp(phrase, word, len) == 0) &&
+ ((phrase[len] == 0) || (phrase[len] == ' ') ||
+ (phrase[len] == ','))) {
+ return (true);
+ }
+ // Skip to next word.
+ if ((phrase = strchr(phrase, ' ')) != NULL) {
+ while ((*phrase == ' ') || (*phrase == ',')) {
+ phrase++;
+ }
+ }
+ }
+ return (false);
+}
+
+// input is base64 challenge, output is the accepted. input should be
+// 24 character base 64, output is 28 character base64 reply. (output
+// must be large enough to hold 29 bytes to allow for termination.)
+// Returns 0 on success, NNG_EINVAL if the input is malformed somehow.
+static int
+ws_make_accept(const char *key, char *accept)
+{
+ uint8_t rawkey[16];
+ uint8_t digest[20];
+ nni_sha1_ctx ctx;
+
+#define WS_KEY_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+#define WS_KEY_GUIDLEN 36
+
+ if ((strlen(key) != 24) ||
+ (nni_base64_decode(key, 24, rawkey, 16) != 16)) {
+ return (NNG_EINVAL);
+ }
+
+ nni_sha1_init(&ctx);
+ nni_sha1_update(&ctx, rawkey, 16);
+ nni_sha1_update(&ctx, (uint8_t *) WS_KEY_GUID, WS_KEY_GUIDLEN);
+ nni_sha1_final(&ctx, digest);
+
+ nni_base64_encode(digest, 20, accept, 28);
+ accept[28] = '\0';
+ return (0);
+}
+
+static void
+ws_frame_fini(ws_frame *frame)
+{
+ if (frame->bufsz) {
+ nni_free(frame->buf, frame->bufsz);
+ }
+ NNI_FREE_STRUCT(frame);
+}
+
+static void
+ws_msg_fini(ws_msg *wm)
+{
+ ws_frame *frame;
+
+ NNI_ASSERT(!nni_list_node_active(&wm->node));
+ while ((frame = nni_list_first(&wm->frames)) != NULL) {
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ }
+
+ if (wm->msg != NULL) {
+ nni_msg_free(wm->msg);
+ }
+ NNI_FREE_STRUCT(wm);
+}
+
+static void
+ws_mask_frame(ws_frame *frame)
+{
+ uint32_t r;
+ // frames sent by client need mask.
+ if (frame->masked) {
+ return;
+ }
+ r = nni_random();
+ NNI_PUT32(frame->mask, r);
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ memcpy(frame->head + frame->hlen, frame->mask, 4);
+ frame->hlen += 4;
+ frame->head[1] |= 0x80; // set masked bit
+ frame->masked = true;
+}
+
+static void
+ws_unmask_frame(ws_frame *frame)
+{
+ // frames sent by client need mask.
+ if (!frame->masked) {
+ return;
+ }
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ frame->hlen -= 4;
+ frame->head[1] &= 0x7f; // clear masked bit
+ frame->masked = false;
+}
+
+static int
+ws_msg_init_control(
+ ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
+{
+ ws_msg * wm;
+ ws_frame *frame;
+
+ if (len > 125) {
+ return (NNG_EINVAL);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ memcpy(frame->sdata, buf, len);
+
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len;
+ frame->final = true;
+ frame->op = op;
+ frame->head[0] = op | 0x80; // final frame (control)
+ frame->head[1] = len & 0x7F;
+ frame->hlen = 2;
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ wm->aio = NULL;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
+{
+ ws_msg * wm;
+ size_t len;
+ size_t maxfrag = ws->fragsize; // make this tunable. (1MB default)
+ uint8_t *buf;
+ uint8_t op;
+
+ // If the message has a header, move it to front of body. Most of
+ // the time this will not cause a reallocation (there should be
+ // headroom). Doing this simplifies our framing, and avoids sending
+ // tiny frames for headers.
+ if ((len = nni_msg_header_len(msg)) != 0) {
+ int rv;
+ buf = nni_msg_header(msg);
+ if ((rv = nni_msg_insert(msg, buf, len)) != 0) {
+ return (rv);
+ }
+ nni_msg_header_clear(msg);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+
+ len = nni_msg_len(msg);
+ buf = nni_msg_body(msg);
+ op = WS_BINARY; // to start -- no support for sending TEXT frames
+
+ // do ... while because we want at least one frame (even for empty
+ // messages.) Headers get their own frame, if present. Best bet
+ // is to try not to have a header when coming here.
+ do {
+ ws_frame *frame;
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len > maxfrag ? maxfrag : len;
+ frame->buf = buf;
+ frame->op = op;
+
+ buf += frame->len;
+ len -= frame->len;
+ op = WS_CONT;
+
+ if (len == 0) {
+ frame->final = true;
+ }
+ frame->head[0] = frame->op;
+ frame->hlen = 2;
+ if (frame->final) {
+ frame->head[0] |= 0x80; // final frame bit
+ }
+ if (frame->len < 126) {
+ frame->head[1] = frame->len & 0x7f;
+ } else if (frame->len < 65536) {
+ frame->head[1] = 126;
+ NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
+ frame->hlen += 2;
+ } else {
+ frame->head[1] = 127;
+ NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
+ frame->hlen += 8;
+ }
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ } while (len);
+
+ wm->msg = msg;
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static void
+ws_close_cb(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ // Either we sent a close frame, or we didn't. Either way,
+ // we are done, and its time to abort everything else.
+ nni_mtx_lock(&ws->mtx);
+
+ nni_http_close(ws->http);
+ nni_aio_cancel(ws->txaio, NNG_ECLOSED);
+ nni_aio_cancel(ws->rxaio, NNG_ECLOSED);
+
+ // This list (receive) should be empty.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe != NULL) {
+ ws_frame_fini(ws->rxframe);
+ ws->rxframe = NULL;
+ }
+
+ // Any txframe should have been killed with its wmsg.
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg *wm;
+
+ // Receive stuff gets aborted always. No further receives
+ // once we get a close.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ // If were closing "gracefully", then don't abort in-flight
+ // stuff yet. Note that reads should have stopped already.
+ if (!ws->closed) {
+ ws_send_close(ws, code);
+ return;
+ }
+}
+
+static void
+ws_start_write(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+
+ if ((ws->txframe != NULL) || (!ws->ready)) {
+ return; // busy
+ }
+
+ if ((wm = nni_list_first(&ws->txmsgs)) == NULL) {
+ // Nothing to send.
+ return;
+ }
+
+ frame = nni_list_first(&wm->frames);
+ NNI_ASSERT(frame != NULL);
+
+ // Push it out.
+ ws->txframe = frame;
+ ws->txaio->a_niov = frame->len > 0 ? 2 : 1;
+ ws->txaio->a_iov[0].iov_len = frame->hlen;
+ ws->txaio->a_iov[0].iov_buf = frame->head;
+ if (frame->len > 0) {
+ ws->txaio->a_iov[1].iov_len = frame->len;
+ ws->txaio->a_iov[1].iov_buf = frame->buf;
+ }
+ nni_http_write_full(ws->http, ws->txaio);
+}
+
+static void
+ws_cancel_close(nni_aio *aio, int rv)
+{
+ nni_ws *ws = aio->a_prov_data;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+
+ if (ws->txframe->op == WS_CLOSE) {
+ // If this was a close frame, we are done.
+ // No other messages may succeed..
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_set_msg(wm->aio, NULL);
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish(ws->closeaio, 0, 0);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ frame = ws->txframe;
+ wm = frame->wmsg;
+ aio = wm->aio;
+
+ if ((rv = nni_aio_result(ws->txaio)) != 0) {
+
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish_error(aio, rv);
+ }
+
+ ws->closed = true;
+ nni_http_close(ws->http);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // good frame, was it the last
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ if (nni_list_empty(&wm->frames)) {
+ nni_list_remove(&ws->txmsgs, wm);
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ // Write the next frame.
+ ws_start_write(ws);
+
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cancel(nni_aio *aio, int rv)
+{
+ nni_ws * ws;
+ ws_msg * wm;
+ ws_frame *frame;
+
+ // Is this aio active? We can tell by looking at the
+ // active tx frame.
+ wm = aio->a_prov_data;
+ ws = wm->ws;
+ nni_mtx_lock(&ws->mtx);
+ if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
+ nni_aio_cancel(ws->txaio, rv);
+ // We will wait for callback on the txaio to finish aio.
+ } else if (nni_list_active(&ws->txmsgs, wm)) {
+ // If scheduled, just need to remove node and complete it.
+ nni_list_remove(&ws->txmsgs, wm);
+ wm->aio = NULL;
+ nni_aio_finish_error(aio, rv);
+ ws_msg_fini(wm);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_send_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg * wm;
+ uint8_t buf[sizeof(uint16_t)];
+ int rv;
+ nni_aio *aio;
+
+ NNI_PUT16(buf, code);
+
+ if (ws->closed) {
+ return;
+ }
+ ws->closed = true;
+ aio = ws->closeaio;
+
+ // We don't care about cancellation here. If this times out,
+ // we will still shut all the physical I/O down in the callback.
+ if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ return;
+ }
+ ws->wclose = true;
+ rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf));
+ if (rv != 0) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ // Close frames get priority!
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+}
+
+static void
+ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len)
+{
+ ws_msg *wm;
+
+ // Note that we do not care if this works or not. So no AIO needed.
+
+ nni_mtx_lock(&ws->mtx);
+ if ((ws->closed) ||
+ (ws_msg_init_control(&wm, ws, op, buf, sizeof(buf)) != 0)) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Control frames at head of list. (Note that this may preempt
+ // the close frame or other ping/pong requests. Oh well.)
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg * wm;
+ nni_msg *msg;
+ int rv;
+
+ msg = nni_aio_get_msg(aio);
+
+ if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
+ return;
+ }
+
+ nni_mtx_lock(&ws->mtx);
+ nni_aio_set_msg(aio, NULL);
+
+ if (ws->closed) {
+ ws_msg_fini(wm);
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_write_cancel, wm) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ ws_msg_fini(wm);
+ return;
+ }
+ nni_list_append(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_start_read(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+
+ if ((ws->rxframe != NULL) || ws->closed) {
+ return; // already reading or closed
+ }
+
+ if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) {
+ return; // no body expecting a message.
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_finish_error(wm->aio, NNG_ENOMEM);
+ }
+ ws_msg_fini(wm);
+ // XXX: NOW WHAT?
+ return;
+ }
+
+ // Note that the frame is *not* associated with the message
+ // as yet, because we don't know if that's right until we receive it.
+ frame->hlen = 0;
+ frame->len = 0;
+ ws->rxframe = frame;
+
+ aio = ws->rxaio;
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = 2; // We want the first two bytes.
+ aio->a_iov[0].iov_buf = frame->head;
+ nni_http_read_full(ws->http, aio);
+}
+
+static void
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+{
+ ws_msg *wm = nni_list_first(&ws->rxmsgs);
+
+ switch (frame->op) {
+ case WS_CONT:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_BINARY:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (!nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_TEXT:
+ // No support for text mode at present.
+ ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
+ return;
+
+ case WS_PING:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws_send_control(ws, WS_PONG, frame->buf, frame->len);
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_PONG:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_CLOSE:
+ ws->closed = true; // no need to send close reply
+ ws_close(ws, 0);
+ return;
+ default:
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+
+ // If this was the last (final) frame, then complete it. But
+ // we have to look at the msg, since we might have got a
+ // control frame.
+ if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
+ size_t len = 0;
+ nni_msg *msg;
+ uint8_t *body;
+ int rv;
+
+ nni_list_remove(&ws->rxmsgs, wm);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ len += frame->len;
+ }
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(wm->aio, rv);
+ ws_msg_fini(wm);
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ }
+ nni_aio_finish_msg(wm->aio, msg);
+ wm->aio = NULL;
+ ws_msg_fini(wm);
+ }
+}
+
+static void
+ws_read_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ nni_aio * aio = ws->rxaio;
+ ws_frame *frame;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+ if ((frame = ws->rxframe) == NULL) {
+ nni_mtx_unlock(&ws->mtx); // canceled during close
+ return;
+ }
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ ws->closed = true; // do not send a close frame
+ ws_close(ws, 0);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ if (frame->hlen == 0) {
+ frame->hlen = 2;
+ frame->op = frame->head[0] & 0x7f;
+ frame->final = (frame->head[0] & 0x80) ? 1 : 0;
+ frame->masked = (frame->head[1] & 0x80) ? 1 : 0;
+ if (frame->masked) {
+ frame->hlen += 4;
+ }
+ if ((frame->head[1] & 0x7F) == 127) {
+ frame->hlen += 8;
+ } else if ((frame->head[1] & 0x7F) == 126) {
+ frame->hlen += 2;
+ }
+
+ // If we didn't read the full header yet, then read
+ // the rest of it.
+ if (frame->hlen != 2) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->head + 2;
+ aio->a_iov[0].iov_len = frame->hlen - 2;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // If we are returning from a read of additional data, then
+ // the buf will be set. Otherwise we need to determine
+ // how much data to read. As our headers are complete, we take
+ // this time to do some protocol checks -- no point in waiting
+ // to read data. (Frame size check needs to be done first
+ // anyway to prevent DoS.)
+
+ if (frame->buf == NULL) {
+
+ // Determine expected frame size.
+ switch ((frame->len = (frame->head[1] & 0x7F))) {
+ case 127:
+ NNI_GET64(frame->head + 2, frame->len);
+ if (frame->len < 65536) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ break;
+ case 126:
+ NNI_GET16(frame->head + 2, frame->len);
+ if (frame->len < 126) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ break;
+ }
+
+ if (frame->len > ws->maxframe) {
+ ws_close(ws, WS_CLOSE_TOO_BIG);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Check for masking. (We don't actually do the unmask
+ // here, because we don't have data yet.)
+ if (frame->masked) {
+ memcpy(frame->mask, frame->head + frame->hlen - 4, 4);
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ } else if (ws->mode == NNI_EP_MODE_LISTEN) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // If we expected data, then ask for it.
+ if (frame->len != 0) {
+
+ // Short frames can avoid an alloc
+ if (frame->len < 126) {
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+ } else {
+ frame->buf = nni_alloc(frame->len);
+ if (frame->buf == NULL) {
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ frame->bufsz = frame->len;
+ }
+
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->buf;
+ aio->a_iov[0].iov_len = frame->len;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // At this point, we have a complete frame.
+ ws_unmask_frame(frame); // idempotent
+
+ ws_read_frame_cb(ws, frame);
+ ws_start_read(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_read_cancel(nni_aio *aio, int rv)
+{
+ ws_msg *wm = aio->a_prov_data;
+ nni_ws *ws = wm->ws;
+
+ nni_mtx_lock(&ws->mtx);
+ if (wm == nni_list_first(&ws->rxmsgs)) {
+ // Cancellation will percolate back up.
+ nni_aio_cancel(ws->rxaio, rv);
+ } else if (nni_list_active(&ws->rxmsgs, wm)) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ ws_msg_fini(wm);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+ int rv;
+ nni_mtx_lock(&ws->mtx);
+ if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL)) {
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_read_cancel, wm) == 0) {
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close_error(nni_ws *ws, uint16_t code)
+{
+ nni_mtx_lock(&ws->mtx);
+ ws_close(ws, code);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close(nni_ws *ws)
+{
+ nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
+}
+
+nni_http_res *
+nni_ws_response(nni_ws *ws)
+{
+ return (ws->res);
+}
+
+nni_http_req *
+nni_ws_request(nni_ws *ws)
+{
+ return (ws->req);
+}
+
+static void
+ws_fini(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ nni_ws_close(ws);
+
+ // Give a chance for the close frame to drain.
+ if (ws->closeaio) {
+ nni_aio_wait(ws->closeaio);
+ }
+
+ nni_aio_stop(ws->rxaio);
+ nni_aio_stop(ws->txaio);
+ nni_aio_stop(ws->closeaio);
+ nni_aio_stop(ws->httpaio);
+
+ nni_mtx_lock(&ws->mtx);
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe) {
+ ws_frame_fini(ws->rxframe);
+ }
+ nni_mtx_unlock(&ws->mtx);
+
+ if (ws->req) {
+ nni_http_req_fini(ws->req);
+ }
+ if (ws->res) {
+ nni_http_res_fini(ws->res);
+ }
+
+ nni_http_fini(ws->http);
+ nni_aio_fini(ws->rxaio);
+ nni_aio_fini(ws->txaio);
+ nni_aio_fini(ws->closeaio);
+ nni_aio_fini(ws->httpaio);
+ nni_mtx_fini(&ws->mtx);
+ NNI_FREE_STRUCT(ws);
+}
+
+void
+nni_ws_fini(nni_ws *ws)
+{
+ nni_reap(&ws->reap, ws_fini, ws);
+}
+
+static void
+ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
+{
+ // This is only
+ nni_ws_listener *l;
+ l = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&l->mtx);
+ nni_list_remove(&l->reply, ws);
+ if (nni_aio_result(aio) != 0) {
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ ws->ready = true;
+ if ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->pend, ws);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
+{
+ nni_ws_dialer *d;
+ nni_aio * uaio;
+ int rv;
+ uint16_t status;
+ char wskey[29];
+ const char * ptr;
+
+ d = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->httpaios);
+ NNI_ASSERT(uaio != NULL);
+ // We have two steps. In step 1, we just sent the request,
+ // and need to retrieve the reply. In step two we have
+ // received the reply, and need to validate it.
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto err;
+ }
+
+ // If we have no response structure, then this was completion of the
+ // send of the request. Prepare an empty response, and read it.
+ if (ws->res == NULL) {
+ if ((rv = nni_http_res_init(&ws->res)) != 0) {
+ goto err;
+ }
+ nni_http_read_res(ws->http, ws->res, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ status = nni_http_res_get_status(ws->res);
+ switch (status) {
+ case NNI_HTTP_STATUS_SWITCHING:
+ break;
+ case NNI_HTTP_STATUS_FORBIDDEN:
+ case NNI_HTTP_STATUS_UNAUTHORIZED:
+ rv = NNG_EPERM;
+ goto err;
+ case NNI_HTTP_STATUS_NOT_FOUND:
+ case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ rv = NNG_ECONNREFUSED; // Treat these as refusals.
+ goto err;
+ case NNI_HTTP_STATUS_BAD_REQUEST:
+ default:
+ // Perhaps we should use NNG_ETRANERR...
+ rv = NNG_EPROTO;
+ goto err;
+ }
+
+ // Check that the server gave us back the right key.
+ rv = ws_make_accept(
+ nni_http_req_get_header(ws->req, "Sec-WebSocket-Key"), wskey);
+ if (rv != 0) {
+ goto err;
+ }
+
+#define GETH(h) nni_http_res_get_header(ws->res, h)
+
+ if (((ptr = GETH("Sec-WebSocket-Accept")) == NULL) ||
+ (strcmp(ptr, wskey) != 0) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Upgrade")) == NULL) ||
+ (strcmp(ptr, "websocket") != 0)) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ if (d->proto != NULL) {
+ if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) ||
+ (!ws_contains_word(d->proto, ptr))) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ }
+#undef GETH
+
+ // At this point, we are in business!
+ ws->ready = true;
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_pipe(uaio, ws);
+ nni_mtx_unlock(&d->mtx);
+ return;
+err:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&d->mtx);
+}
+
+static void
+ws_http_cb(void *arg)
+{
+ // This is only done on the server/listener side.
+ nni_ws * ws = arg;
+ nni_aio *aio = ws->httpaio;
+
+ switch (ws->mode) {
+ case NNI_EP_MODE_LISTEN:
+ ws_http_cb_listener(ws, aio);
+ break;
+ case NNI_EP_MODE_DIAL:
+ ws_http_cb_dialer(ws, aio);
+ break;
+ }
+}
+
+static int
+ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
+{
+ nni_ws *ws;
+ int rv;
+
+ if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ws->mtx);
+ NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
+ NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+
+ if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) {
+ nni_ws_fini(ws);
+ return (rv);
+ }
+
+ nni_aio_set_timeout(ws->closeaio, 100);
+ nni_aio_set_timeout(ws->httpaio, 1000);
+
+ ws->fragsize = 1 << 20; // we won't send a frame larger than this
+ ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size
+ ws->http = http;
+ ws->req = req;
+ ws->res = res;
+ *wsp = ws;
+ return (0);
+}
+
+void
+nni_ws_listener_fini(nni_ws_listener *l)
+{
+ nni_mtx_fini(&l->mtx);
+ nni_strfree(l->url);
+ nni_strfree(l->proto);
+ nni_strfree(l->host);
+ nni_strfree(l->serv);
+ nni_strfree(l->path);
+ NNI_FREE_STRUCT(l);
+}
+
+static void
+ws_handler(nni_aio *aio)
+{
+ nni_ws_listener *l;
+ nni_ws * ws;
+ nni_http * http;
+ nni_http_req * req;
+ nni_http_res * res;
+ const char * ptr;
+ const char * proto;
+ uint16_t status;
+ int rv;
+ char key[29];
+
+ http = nni_aio_get_input(aio, 0);
+ req = nni_aio_get_input(aio, 1);
+ l = nni_aio_get_input(aio, 2);
+
+ // Now check the headers, etc.
+ if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) {
+ status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP;
+ goto err;
+ }
+
+ if (strcmp(nni_http_req_get_method(req), "GET") != 0) {
+ // HEAD request. We can't really deal with it.
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+#define GETH(h) nni_http_req_get_header(req, h)
+#define SETH(h, v) nni_http_res_set_header(res, h, v)
+
+ if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) ||
+ (((ptr = GETH("Transfer-Encoding")) != NULL) &&
+ (nni_strcasestr(ptr, "chunked") != NULL))) {
+ status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE;
+ goto err;
+ }
+
+ // These headers have to be present.
+ if (((ptr = GETH("Upgrade")) == NULL) ||
+ (!ws_contains_word(ptr, "websocket")) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Sec-WebSocket-Version")) == NULL) ||
+ (strcmp(ptr, "13") != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) ||
+ (ws_make_accept(ptr, key) != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ // If the client has requested a specific subprotocol, then
+ // we need to try to match it to what the handler says we support.
+ // (If no suitable option is found in the handler, we fail the
+ // request.)
+ proto = GETH("Sec-WebSocket-Protocol");
+ if (proto == NULL) {
+ if (l->proto != NULL) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+ } else if ((l->proto == NULL) ||
+ (!ws_contains_word(l->proto, proto))) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if ((rv = nni_http_res_init(&res)) != 0) {
+ // Give a chance to reply to client.
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if (nni_http_res_set_status(
+ res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if ((SETH("Connection", "Upgrade") != 0) ||
+ (SETH("Upgrade", "websocket") != 0) ||
+ (SETH("Sec-WebSocket-Accept", key) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+ if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+
+ if (l->hookfn != NULL) {
+ rv = l->hookfn(l->hookarg, req, res);
+ if (rv != 0) {
+ nni_http_res_fini(res);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (nni_http_res_get_status(res) !=
+ NNI_HTTP_STATUS_SWITCHING) {
+ // The hook has decided to give back a different
+ // reply and we are not upgrading anymore. For
+ // example the Origin might not be permitted, or
+ // another level of authentication may be required.
+ // (Note that the hook can also give back various
+ // other headers, but it would be bad for it to
+ // alter the websocket mandated headers.)
+ nni_http_req_fini(req);
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ return;
+ }
+ }
+
+#undef GETH
+#undef SETH
+
+ // We are good to go, provided we can get the websocket struct,
+ // and send the reply.
+ if ((rv = ws_init(&ws, http, req, res)) != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_LISTEN;
+
+ // XXX: Inherit fragmentation and message size limits!
+
+ nni_list_append(&l->reply, ws);
+ nni_aio_set_data(ws->httpaio, 0, l);
+ nni_http_write_res(http, res, ws->httpaio);
+ nni_aio_set_output(aio, 0, NULL);
+ nni_aio_set_input(aio, 1, NULL);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+err:
+ nni_http_req_fini(req);
+ if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+static int
+ws_parse_url(const char *url, char **schemep, char **hostp, char **servp,
+ char **pathp, char **queryp)
+{
+ size_t scrlen;
+ char * scr;
+ char * pair;
+ char * scheme = NULL;
+ char * path = NULL;
+ char * query = NULL;
+ char * host = NULL;
+ char * serv = NULL;
+ int rv;
+
+ // We need a scratch copy of the url to parse.
+ scrlen = strlen(url) + 1;
+ if ((scr = nni_alloc(scrlen)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_strlcpy(scr, url, scrlen);
+ scheme = scr;
+ pair = strchr(scr, ':');
+ if ((pair == NULL) || (pair[1] != '/') || (pair[2] != '/')) {
+ nni_free(scr, scrlen);
+ return (NNG_EADDRINVAL);
+ }
+
+ *pair = '\0';
+ pair += 3;
+
+ path = strchr(pair, '/');
+ if (path != NULL) {
+ *path = '\0'; // We will restore it shortly.
+ }
+ if ((rv = nni_tran_parse_host_port(pair, hostp, servp)) != 0) {
+ nni_free(scr, scrlen);
+ return (rv);
+ }
+
+ // If service was missing, assume normal defaults.
+ if (*servp == NULL) {
+ if (strcmp(scheme, "wss")) {
+ *servp = nni_strdup("443");
+ } else {
+ *servp = nni_strdup("80");
+ }
+ }
+
+ if (path) {
+ // Restore the path, and trim off the query parameter.
+ *path = '/';
+ if ((query = strchr(path, '?')) != NULL) {
+ *query = '\0';
+ query++;
+ } else {
+ query = "";
+ }
+ } else {
+ path = "/";
+ query = "";
+ }
+
+ if (schemep) {
+ *schemep = nni_strdup(scheme);
+ }
+ if (pathp) {
+ *pathp = nni_strdup(path);
+ }
+ if (queryp) {
+ *queryp = nni_strdup(query);
+ }
+ nni_free(scr, scrlen);
+
+ if ((schemep && (*schemep == NULL)) || (*pathp == NULL) ||
+ (*servp == NULL) || (queryp && (*queryp == NULL))) {
+ nni_strfree(*hostp);
+ nni_strfree(*servp);
+ nni_strfree(*pathp);
+ if (schemep) {
+ nni_strfree(*schemep);
+ }
+ if (queryp) {
+ nni_strfree(*queryp);
+ }
+ return (NNG_ENOMEM);
+ }
+
+ return (0);
+}
+
+int
+nni_ws_listener_init(nni_ws_listener **wslp, const char *url)
+{
+ nni_ws_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->aios);
+
+ NNI_LIST_INIT(&l->pend, nni_ws, node);
+ NNI_LIST_INIT(&l->reply, nni_ws, node);
+
+ rv = ws_parse_url(url, NULL, &l->host, &l->serv, &l->path, NULL);
+ if (rv != 0) {
+ nni_ws_listener_fini(l);
+ return (rv);
+ }
+ l->handler.h_is_dir = false;
+ l->handler.h_is_upgrader = true;
+ l->handler.h_method = "GET";
+ l->handler.h_path = l->path;
+ l->handler.h_host = l->host;
+ l->handler.h_cb = ws_handler;
+
+ *wslp = l;
+ return (0);
+}
+
+int
+nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ rv = NNG_EBUSY;
+ } else if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (l->proto != NULL) {
+ nni_strfree(l->proto);
+ }
+ l->proto = ns;
+ }
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static void
+ws_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_listener *l = aio->a_prov_data;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
+{
+ nni_ws *ws;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (!l->started) {
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if ((ws = nni_list_first(&l->pend)) != NULL) {
+ nni_list_remove(&l->pend, ws);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->aios, aio);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_close(nni_ws_listener *l)
+{
+ nni_aio *aio;
+ nni_ws * ws;
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ l->closed = true;
+ if (l->server != NULL) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+ NNI_LIST_FOREACH (&l->pend, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ NNI_LIST_FOREACH (&l->reply, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ws_listener_listen(nni_ws_listener *l)
+{
+ nng_sockaddr sa;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv(l->host, l->serv, NNG_AF_UNSPEC, true, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ if ((rv = nni_http_server_init(&l->server, &sa)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ rv = nni_http_server_add_handler(&l->hp, l->server, &l->handler, l);
+ if (rv != 0) {
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ // XXX: DEAL WITH HTTPS here.
+
+ if ((rv = nni_http_server_start(l->server)) != 0) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+
+ l->started = true;
+
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+void
+nni_ws_listener_hook(
+ nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
+{
+ nni_mtx_lock(&l->mtx);
+ l->hookfn = hookfn;
+ l->hookarg = hookarg;
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_tls(nni_ws_listener *l, nni_tls_config *tls)
+{
+ // We need to add this later.
+}
+
+void
+ws_conn_cb(void *arg)
+{
+ nni_ws_dialer *d = arg;
+ nni_aio * aio = d->conaio;
+ nni_aio * uaio;
+ nni_http * http;
+ nni_http_req * req = NULL;
+ int rv;
+ uint8_t raw[16];
+ char wskey[25];
+ nni_ws * ws;
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->conaios);
+ rv = nni_aio_result(aio);
+ http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL;
+
+ if (uaio == NULL) {
+ if (http) {
+ // Nobody listening anymore - hard abort.
+ nni_http_fini(http);
+ }
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ nni_aio_list_remove(uaio);
+ nni_aio_set_output(aio, 0, NULL);
+
+ // We are done with this aio, start another connection request while
+ // we finish up, if we have more clients waiting.
+ if (!nni_list_empty(&d->conaios)) {
+ nni_http_client_connect(d->client, aio);
+ }
+
+ if (rv != 0) {
+ goto err;
+ }
+
+ for (int i = 0; i < 16; i++) {
+ raw[i] = nni_random();
+ }
+ nni_base64_encode(raw, 16, wskey, 24);
+ wskey[24] = '\0';
+
+ if (d->qinfo && d->qinfo[0] != '\0') {
+ rv = nni_asprintf(&d->uri, "%s?%s", d->path, d->qinfo);
+ } else if ((d->uri = nni_strdup(d->path)) == NULL) {
+ rv = NNG_ENOMEM;
+ }
+
+#define SETH(h, v) nni_http_req_set_header(req, h, v)
+ if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) ||
+ ((rv = nni_http_req_set_uri(req, d->uri)) != 0) ||
+ ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) ||
+ ((rv = nni_http_req_set_method(req, "GET")) != 0) ||
+ ((rv = SETH("Host", d->host)) != 0) ||
+ ((rv = SETH("Upgrade", "websocket")) != 0) ||
+ ((rv = SETH("Connection", "Upgrade")) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Version", "13")) != 0)) {
+ goto err;
+ }
+
+ // If consumer asked for protocol, pass it on.
+ if ((d->proto != NULL) &&
+ ((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) {
+ goto err;
+ }
+#undef SETH
+
+ if ((rv = ws_init(&ws, http, req, NULL)) != 0) {
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_DIAL;
+
+ // Move this uaio to the http wait list. Note that it is not
+ // required that the uaio will be completed by this connection.
+ // If another connection attempt completes first, then the first
+ // aio queued will get the result.
+ nni_list_append(&d->httpaios, uaio);
+ nni_aio_set_data(ws->httpaio, 0, d);
+ nni_http_write_req(http, req, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+
+err:
+ nni_aio_finish_error(uaio, rv);
+ if (http != NULL) {
+ nni_http_fini(http);
+ }
+ if (req != NULL) {
+ nni_http_req_fini(req);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_fini(nni_ws_dialer *d)
+{
+ nni_aio_fini(d->conaio);
+ nni_strfree(d->proto);
+ nni_strfree(d->addr);
+ nni_strfree(d->uri);
+ nni_strfree(d->host);
+ nni_strfree(d->serv);
+ nni_strfree(d->path);
+ nni_strfree(d->qinfo);
+ if (d->client) {
+ nni_http_client_fini(d->client);
+ }
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_ws_dialer_init(nni_ws_dialer **dp, const char *url)
+{
+ nni_ws_dialer *d;
+ int rv;
+ nni_aio * aio;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->conaios);
+ nni_aio_list_init(&d->httpaios);
+
+ if ((d->addr = nni_strdup(url)) == NULL) {
+ nni_ws_dialer_fini(d);
+ return (NNG_ENOMEM);
+ }
+ if ((rv = ws_parse_url(
+ url, NULL, &d->host, &d->serv, &d->path, &d->qinfo)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ // XXX: this is synchronous. We should fix this in the HTTP layer.
+ aio->a_addr = &d->sa;
+ nni_plat_tcp_resolv(d->host, d->serv, NNG_AF_UNSPEC, false, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ws_dialer_close(nni_ws_dialer *d)
+{
+ // XXX: what to do here?
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ d->closed = true;
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_cancel(d->conaio, NNG_ECLOSED);
+}
+
+int
+nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&d->mtx);
+ if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (d->proto != NULL) {
+ nni_strfree(d->proto);
+ }
+ d->proto = ns;
+ }
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static void
+ws_dial_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_dialer *d = aio->a_prov_data;
+ nni_mtx_lock(&d->mtx);
+ // If we are waiting, then we can cancel. Otherwise we need
+ // to abort.
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ // This does not cancel in-flight client negotiations with HTTP.
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
+{
+ nni_mtx_lock(&d->mtx);
+ // First look up the host.
+ if (nni_aio_start(aio, ws_dial_cancel, d) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ if (d->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_list_append(&d->conaios, aio);
+
+ if (!d->started) {
+ d->started = true;
+ nni_http_client_connect(d->client, d->conaio);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+
+// Dialer does not get a hook chance, as it can examine the request
+// and reply after dial is done; this is not a 3-way handshake, so
+// the dialer does not confirm the server's response at the HTTP
+// level. (It can still issue a websocket close).
+
+// The implementation will send periodic PINGs, and respond with
+// PONGs.
diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h
new file mode 100644
index 00000000..25add55e
--- /dev/null
+++ b/src/supplemental/websocket/websocket.h
@@ -0,0 +1,63 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+#define NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct nni_tls_config nni_tls_config;
+typedef struct nni_http_req nni_http_req;
+typedef struct nni_http_res nni_http_res;
+
+typedef struct nni_ws nni_ws;
+typedef struct nni_ws_listener nni_ws_listener;
+typedef struct nni_ws_dialer nni_ws_dialer;
+
+typedef int (*nni_ws_listen_hook)(void *, nni_http_req *, nni_http_res *);
+
+// Specify URL as ws://[<host>][:port][/path]
+// If host is missing, INADDR_ANY is assumed. If port is missing,
+// then either 80 or 443 are assumed. Note that ws:// means listen
+// on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY
+// makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?)
+
+extern int nni_ws_listener_init(nni_ws_listener **, const char *);
+extern void nni_ws_listener_fini(nni_ws_listener *);
+extern void nni_ws_listener_close(nni_ws_listener *);
+extern int nni_ws_listener_proto(nni_ws_listener *, const char *);
+extern int nni_ws_listener_listen(nni_ws_listener *);
+extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *);
+extern void nni_ws_listener_hook(
+ nni_ws_listener *, nni_ws_listen_hook, void *);
+extern void nni_ws_listener_tls(nni_ws_listener *, nni_tls_config *);
+
+extern int nni_ws_dialer_init(nni_ws_dialer **, const char *);
+extern void nni_ws_dialer_fini(nni_ws_dialer *);
+extern void nni_ws_dialer_close(nni_ws_dialer *);
+extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *);
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *);
+
+// Dialer does not get a hook chance, as it can examine the request and reply
+// after dial is done; this is not a 3-way handshake, so the dialer does
+// not confirm the server's response at the HTTP level. (It can still issue
+// a websocket close).
+
+extern void nni_ws_send_msg(nni_ws *, nni_aio *);
+extern void nni_ws_recv_msg(nni_ws *, nni_aio *);
+extern nni_http_res *nni_ws_response(nni_ws *);
+extern nni_http_req *nni_ws_request(nni_ws *);
+extern void nni_ws_close(nni_ws *);
+extern void nni_ws_close_error(nni_ws *, uint16_t);
+extern void nni_ws_fini(nni_ws *);
+
+// The implementation will send periodic PINGs, and respond with PONGs.
+
+#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H \ No newline at end of file
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 8dcb3f60..9c794e64 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -10,7 +10,6 @@
#include <stdbool.h>
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -498,46 +497,6 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
return (rv);
}
-static int
-nni_tls_parse_pair(char *pair, char **hostp, char **servp)
-{
- char *host, *serv, *end;
-
- if (pair[0] == '[') {
- host = pair + 1;
- // IP address enclosed ... for IPv6 usually.
- if ((end = strchr(host, ']')) == NULL) {
- return (NNG_EADDRINVAL);
- }
- *end = '\0';
- serv = end + 1;
- if (*serv == ':') {
- serv++;
- } else if (*serv != '\0') {
- return (NNG_EADDRINVAL);
- }
- } else {
- host = pair;
- serv = strchr(host, ':');
- if (serv != NULL) {
- *serv = '\0';
- serv++;
- }
- }
- if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
- *hostp = NULL;
- } else {
- *hostp = host;
- }
- if ((serv == NULL) || (strlen(serv) == 0)) {
- *servp = NULL;
- } else {
- *servp = serv;
- }
- // Stash the port in big endian (network) byte order.
- return (0);
-}
-
// Note that the url *must* be in a modifiable buffer.
int
nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
@@ -555,8 +514,9 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
// is the second part.
*h1 = '\0';
h1++;
- if (((rv = nni_tls_parse_pair(h1, rhost, rserv)) != 0) ||
- ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0)) {
+ if (((rv = nni_tran_parse_host_port(h1, rhost, rserv)) != 0) ||
+ ((rv = nni_tran_parse_host_port(url, lhost, lserv)) !=
+ 0)) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -566,7 +526,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
} else if (mode == NNI_EP_MODE_DIAL) {
*lhost = NULL;
*lserv = NULL;
- if ((rv = nni_tls_parse_pair(url, rhost, rserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, rhost, rserv)) != 0) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -577,7 +537,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
NNI_ASSERT(mode == NNI_EP_MODE_LISTEN);
*rhost = NULL;
*rserv = NULL;
- if ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, lhost, lserv)) != 0) {
return (rv);
}
// We have to have a port to listen on!
@@ -657,9 +617,13 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
return (NNG_EADDRINVAL);
}
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// Parse the URLs first.
rv = nni_tls_parse_url(buf, &lhost, &lserv, &rhost, &rserv, mode);
if (rv != 0) {
+ nni_aio_fini(aio);
return (rv);
}
if (mode == NNI_EP_MODE_DIAL) {
@@ -672,10 +636,6 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
authmode = NNI_TLS_CONFIG_AUTH_MODE_NONE;
}
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
@@ -683,7 +643,11 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &rsa;
nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(rserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
nni_aio_fini(aio);
return (rv);
}
@@ -695,7 +659,10 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &lsa;
nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
nni_aio_fini(aio);
return (rv);
}
@@ -705,12 +672,14 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ nni_strfree(rhost);
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
if (nni_strlcpy(ep->addr, url, sizeof(ep->addr)) >= sizeof(ep->addr)) {
NNI_FREE_STRUCT(ep);
+ nni_strfree(rhost);
return (NNG_EADDRINVAL);
}
@@ -718,15 +687,18 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) ||
((rv = nni_tls_config_auth_mode(ep->cfg, authmode)) != 0) ||
((rv = nni_aio_init(&ep->aio, nni_tls_ep_cb, ep)) != 0)) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
if ((tlsmode == NNI_TLS_CONFIG_CLIENT) && (rhost != NULL)) {
if ((rv = nni_tls_config_server_name(ep->cfg, rhost)) != 0) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
}
+ nni_strfree(rhost);
ep->proto = nni_sock_proto(sock);
ep->authmode = authmode;
diff --git a/src/transport/ws/CMakeLists.txt b/src/transport/ws/CMakeLists.txt
new file mode 100644
index 00000000..18842df9
--- /dev/null
+++ b/src/transport/ws/CMakeLists.txt
@@ -0,0 +1,19 @@
+#
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# WebSocket transport
+
+if (NNG_TRANSPORT_WS)
+ set(WS_SOURCES transport/ws/websocket.c transport/ws/websocket.h)
+ install(FILES websocket.h DESTINATION include/nng/transport/ws)
+
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${WS_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/ws/README.adoc b/src/transport/ws/README.adoc
new file mode 100644
index 00000000..e3101297
--- /dev/null
+++ b/src/transport/ws/README.adoc
@@ -0,0 +1,38 @@
+= websocket transport
+
+This transport provides support for SP over websocket using TCP or TLS.
+When using TCP, it is compatible with the libnanomsg legacy transport.
+It also is compatible with mangos (both TCP and TLS).
+
+TLS support requires the mbedTLS library.
+
+We set the "protocol" such as "pair.sp.nanomsg.org" in the
+Sec-WebSocket-Protocol field -- the client sets to the the server's
+protocol - i.e. the protocol that the server speaks. For example,
+if the the server is a REP, then a REQ client would send "rep.sp.nanomsg.org".
+
+The server sends the same value (it's own), per the WebSocket specs. (Note
+that the client's protocol is never sent, but assumed to be complementary
+to the protocol in the Sec-WebSocket-Protocol field.)
+
+Each SP message is a WebSocket message.
+
+WebSocket is defined in RFC 6455.
+
+== Design
+
+We unfortunately need to implement our own design for this -- the only
+reasonable client library would be libcurl, and there is a dearth of
+suitable server libraries. Since we don't have to support full HTTP, but
+just the initial handshake, this isn't too tragic.
+
+== Multiple Server Sockets
+
+In order to support Multiple Server sockets listening on the same port,
+the application must be long lived. We will set up a listener on the
+configured TCP (or TLS) port, and examine the PATH supplied in the GET.
+This will be used to match against the URL requested, and if the URL
+matches we will create the appropriate pipe.
+
+If no server endpoint at that address can be found, we return an
+HTTP error, and close the socket.
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
new file mode 100644
index 00000000..8a73bcfb
--- /dev/null
+++ b/src/transport/ws/websocket.c
@@ -0,0 +1,533 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/websocket/websocket.h"
+
+typedef struct ws_ep ws_ep;
+typedef struct ws_pipe ws_pipe;
+
+struct ws_ep {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ char * addr;
+ uint16_t lproto; // local protocol
+ uint16_t rproto; // remote protocol
+ size_t rcvmax;
+ char * protoname;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_aio * connaio;
+ nni_aio * accaio;
+ nni_ws_listener *listener;
+ nni_ws_dialer * dialer;
+};
+
+struct ws_pipe {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_mtx mtx;
+ size_t rcvmax; // inherited from EP
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_ws * ws;
+};
+
+static void
+ws_pipe_send_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *taio;
+ nni_aio *uaio;
+
+ nni_mtx_lock(&p->mtx);
+ taio = p->txaio;
+ uaio = p->user_txaio;
+ p->user_txaio = NULL;
+
+ if (uaio != NULL) {
+ int rv;
+ if ((rv = nni_aio_result(taio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish(uaio, 0, 0);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *raio = p->rxaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&p->mtx);
+ uaio = p->user_rxaio;
+ p->user_rxaio = NULL;
+ if ((rv = nni_aio_result(raio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_msg *msg = nni_aio_get_msg(raio);
+ if (uaio != NULL) {
+ nni_aio_finish_msg(uaio, msg);
+ } else {
+ nni_msg_free(msg);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_rxaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_cancel(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_rxaio = aio;
+
+ nni_ws_recv_msg(p->ws, p->rxaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_txaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // This aborts the upper send, which will call back with an error
+ // when it is done.
+ nni_aio_cancel(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_txaio = aio;
+ nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+
+ nni_ws_send_msg(p->ws, p->txaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_fini(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+
+ if (p->ws) {
+ nni_ws_fini(p->ws);
+ }
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+static void
+ws_pipe_close(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ nni_ws_close(p->ws);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static int
+ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
+{
+ ws_pipe *p;
+ int rv;
+ nni_aio *aio;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ if (((rv = nni_aio_init(&p->txaio, ws_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, ws_pipe_recv_cb, p)) != 0)) {
+ ws_pipe_fini(p);
+ return (rv);
+ }
+
+ p->mode = ep->mode;
+ p->rcvmax = ep->rcvmax;
+ // p->addr = ep->addr;
+ p->rproto = ep->rproto;
+ p->lproto = ep->lproto;
+ p->ws = ws;
+
+ if ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_pipe(aio, p);
+ }
+
+ *pipep = p;
+ return (0);
+}
+
+static uint16_t
+ws_pipe_peer(void *arg)
+{
+ ws_pipe *p = arg;
+
+ return (p->rproto);
+}
+
+static void
+ws_pipe_start(void *arg, nni_aio *aio)
+{
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+// We have very different approaches for server and client.
+// Servers use the HTTP server framework, and a request methodology.
+
+static int
+ws_ep_bind(void *arg)
+{
+ ws_ep *ep = arg;
+ return (nni_ws_listener_listen(ep->listener));
+}
+
+static void
+ws_ep_cancel(nni_aio *aio, int rv)
+{
+ ws_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_accept(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+
+ // We already bound, so we just need to look for an available
+ // pipe (created by the handler), and match it.
+ // Otherwise we stick the AIO in the accept list.
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_list_append(&ep->aios, aio);
+ if (aio == nni_list_first(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, ep->accaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_connect(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+
+ // If we can't start, then its dying and we can't report
+ // either.
+ if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&ep->aios, aio);
+ nni_ws_dialer_dial(ep->dialer, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz)
+{
+ ws_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_size(v, sz, 0, NNI_MAXSZ));
+ }
+ return (nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ));
+}
+
+static int
+ws_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp)
+{
+ ws_ep *ep = arg;
+ return (nni_getopt_size(ep->rcvmax, v, szp));
+}
+
+static nni_tran_pipe_option ws_pipe_options[] = {
+#if 0
+ // clang-format off
+ { NNG_OPT_LOCADDR, ws_pipe_getopt_locaddr },
+ { NNG_OPT_REMADDR, ws_pipe_getopt_remaddr },
+ // clang-format on
+#endif
+ // terminate list
+ { NULL, NULL }
+};
+
+static nni_tran_pipe ws_pipe_ops = {
+ .p_fini = ws_pipe_fini,
+ .p_start = ws_pipe_start,
+ .p_send = ws_pipe_send,
+ .p_recv = ws_pipe_recv,
+ .p_close = ws_pipe_close,
+ .p_peer = ws_pipe_peer,
+ .p_options = ws_pipe_options,
+};
+
+static nni_tran_ep_option ws_ep_options[] = {
+ {
+ .eo_name = NNG_OPT_RECVMAXSZ,
+ .eo_getopt = ws_ep_getopt_recvmaxsz,
+ .eo_setopt = ws_ep_setopt_recvmaxsz,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
+static void
+ws_ep_fini(void *arg)
+{
+ ws_ep *ep = arg;
+
+ nni_aio_stop(ep->accaio);
+ nni_aio_stop(ep->connaio);
+ nni_aio_fini(ep->accaio);
+ nni_aio_fini(ep->connaio);
+ if (ep->listener != NULL) {
+ nni_ws_listener_fini(ep->listener);
+ }
+ if (ep->dialer != NULL) {
+ nni_ws_dialer_fini(ep->dialer);
+ }
+ nni_strfree(ep->addr);
+ nni_strfree(ep->protoname);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+static void
+ws_ep_conn_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ ws_pipe *p;
+ nni_aio *caio = ep->connaio;
+ nni_aio *uaio;
+ int rv;
+ nni_ws * ws = NULL;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_result(caio) == 0) {
+ ws = nni_aio_get_pipe(caio);
+ }
+ if ((uaio = nni_list_first(&ep->aios)) == NULL) {
+ // The client stopped caring about this!
+ if (ws != NULL) {
+ nni_ws_fini(ws);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(uaio);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+ if ((rv = nni_aio_result(caio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_fini(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_close(void *arg)
+{
+ ws_ep *ep = arg;
+
+ if (ep->mode == NNI_EP_MODE_LISTEN) {
+ nni_ws_listener_close(ep->listener);
+ } else {
+ nni_ws_dialer_close(ep->dialer);
+ }
+}
+
+static void
+ws_ep_acc_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ nni_aio *aaio = ep->accaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ uaio = nni_list_first(&ep->aios);
+ if ((rv = nni_aio_result(aaio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_ws *ws = nni_aio_get_pipe(aaio);
+ if (uaio != NULL) {
+ ws_pipe *p;
+ // Make a pipe
+ nni_aio_list_remove(uaio);
+ if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_close(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ }
+ }
+ if (!nni_list_empty(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, aaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ ws_ep * ep;
+ const char *pname;
+ int rv;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&ep->mtx);
+
+ // List of pipes (server only).
+ nni_aio_list_init(&ep->aios);
+
+ ep->mode = mode;
+ ep->lproto = nni_sock_proto(sock);
+ ep->rproto = nni_sock_peer(sock);
+
+ if (mode == NNI_EP_MODE_DIAL) {
+ pname = nni_sock_peer_name(sock);
+ rv = nni_ws_dialer_init(&ep->dialer, url);
+ } else {
+ pname = nni_sock_proto_name(sock);
+ rv = nni_ws_listener_init(&ep->listener, url);
+ }
+
+ if ((rv == 0) && ((ep->addr = nni_strdup(url)) == NULL)) {
+ rv = NNG_ENOMEM;
+ }
+ if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) !=
+ 0)) {
+ ws_ep_fini(ep);
+ return (rv);
+ }
+
+ *epp = ep;
+ return (0);
+}
+static int
+ws_tran_init(void)
+{
+ return (0);
+}
+
+static void
+ws_tran_fini(void)
+{
+}
+
+static nni_tran_ep ws_ep_ops = {
+ .ep_init = ws_ep_init,
+ .ep_fini = ws_ep_fini,
+ .ep_connect = ws_ep_connect,
+ .ep_bind = ws_ep_bind,
+ .ep_accept = ws_ep_accept,
+ .ep_close = ws_ep_close,
+ .ep_options = ws_ep_options,
+};
+
+static nni_tran ws_tran = {
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ws",
+ .tran_ep = &ws_ep_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
+};
+
+int
+nng_ws_register(void)
+{
+ return (nni_tran_register(&ws_tran));
+}
diff --git a/src/transport/ws/websocket.h b/src/transport/ws/websocket.h
new file mode 100644
index 00000000..1beb6156
--- /dev/null
+++ b/src/transport/ws/websocket.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_WS_WEBSOCKET_H
+#define NNG_TRANSPORT_WS_WEBSOCKET_H
+
+// TLS transport. This is used for communication via TLS v1.2 over TCP/IP.
+
+NNG_DECL int nng_ws_register(void);
+
+// TLS options. Note that these can only be set *before* the endpoint is
+// started. Once started, it is no longer possible to alter the TLS
+// configuration.
+
+// NNG_OPT_TLS_CA_CERT is a string with one or more X.509 certificates,
+// representing the entire CA chain. The content may be either PEM or DER
+// encoded.
+#define NNG_OPT_TLS_CA_CERT "tls:ca-cert"
+
+// NNG_OPT_TLS_CRL is a PEM encoded CRL (revocation list). Multiple lists
+// may be loaded by using this option multiple times.
+#define NNG_OPT_TLS_CRL "tls:crl"
+
+// NNG_OPT_TLS_CERT is used to specify our own certificate. At present
+// only one certificate may be supplied. (In the future it may be
+// possible to call this multiple times, for servers that select different
+// certificates depending upon client capabilities.)
+#define NNG_OPT_TLS_CERT "tls:cert"
+
+// NNG_OPT_TLS_PRIVATE_KEY is used to specify the private key used
+// with the given certificate. This should be called after setting
+// the certificate. The private key may be in PEM or DER format.
+// If in PEM encoded, a terminating ZERO byte should be included.
+#define NNG_OPT_TLS_PRIVATE_KEY "tls:private-key"
+
+// NNG_OPT_TLS_PRIVATE_KEY_PASSWORD is used to specify a password
+// used for the private key. The value is an ASCIIZ string.
+#define NNG_OPT_TLS_PRIVATE_KEY_PASSWORD "tls:private-key-password"
+
+// NNG_OPT_TLS_AUTH_MODE is an integer indicating whether our
+// peer should be verified or not. It is required on clients/dialers,
+// and off on servers/listeners, by default.
+#define NNG_OPT_TLS_AUTH_MODE "tls:auth-mode"
+
+extern int nng_tls_auth_mode_required;
+extern int nng_tls_auth_mode_none;
+extern int nng_tls_auth_mode_optional;
+
+// NNG_OPT_TLS_AUTH_VERIFIED is a boolean that can be read on pipes,
+// indicating whether the peer certificate is verified.
+#define NNG_OPT_TLS_AUTH_VERIFIED "tls:auth-verified"
+
+// XXX: TBD: Ciphersuite selection and reporting. Session reuse?
+
+#endif // NNG_TRANSPORT_WS_WEBSOCKET_H
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 607c353c..16a29da1 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1292,7 +1292,7 @@ zt_wire_packet_send_cb(void *arg)
nni_aio * aio = arg;
zt_send_hdr *hdr;
- hdr = nni_aio_get_data(aio);
+ hdr = nni_aio_get_data(aio, 0);
nni_free(hdr, hdr->len + sizeof(*hdr));
nni_aio_fini_cb(aio);
}
@@ -1355,7 +1355,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
buf += sizeof(*hdr);
memcpy(buf, data, len);
- nni_aio_set_data(aio, hdr);
+ nni_aio_set_data(aio, hdr, 0);
hdr->sa = addr;
hdr->len = len;
@@ -2001,7 +2001,7 @@ zt_pipe_ping_cb(void *arg)
}
if (p->zp_ping_try < p->zp_ping_count) {
nni_time now = nni_clock();
- nni_aio_set_timeout(aio, now + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
// We want pings. We only send one if needed, but we
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
@@ -2034,7 +2034,7 @@ zt_pipe_start(void *arg, nni_aio *aio)
if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) &&
(p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
- nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
0) {
p->zp_ping_active = 1;
@@ -2456,7 +2456,7 @@ zt_ep_conn_req_cb(void *arg)
}
if (nni_list_first(&ep->ze_aios) != NULL) {
- nni_aio_set_timeout(aio, nni_clock() + zt_conn_interval);
+ nni_aio_set_timeout(aio, zt_conn_interval);
if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
ep->ze_creq_active = 1;
ep->ze_creq_try++;
@@ -2479,8 +2479,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&zt_lk);
if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_time now = nni_clock();
- int rv;
+ int rv;
// Clear the port so we get an ephemeral port.
ep->ze_laddr &= ~((uint64_t) zt_port_mask);
@@ -2498,7 +2497,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
ep->ze_running = 1;
- nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval);
+ nni_aio_set_timeout(ep->ze_creq_aio, zt_conn_interval);
if (nni_aio_start(
ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 77eb0bc5..921b5925 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -152,6 +152,9 @@ add_nng_test(pair1 5)
add_nng_test(udp 5)
add_nng_test(zt 60)
add_nng_test(multistress 60)
+add_nng_test(httpclient 30)
+add_nng_test(httpserver 30)
+add_nng_test(ws 30)
# compatbility tests
# We only support these if ALL the legacy protocols are supported. This
diff --git a/tests/base64.c b/tests/base64.c
index 9682475c..714bd4b2 100644
--- a/tests/base64.c
+++ b/tests/base64.c
@@ -8,7 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include <strings.h>
+#include <string.h>
#include "convey.h"
diff --git a/tests/httpclient.c b/tests/httpclient.c
new file mode 100644
index 00000000..ab4f46a2
--- /dev/null
+++ b/tests/httpclient.c
@@ -0,0 +1,120 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "trantest.h"
+
+#ifndef _WIN32
+#include <arpa/inet.h>
+#endif
+
+// Basic HTTP client tests.
+#include "core/nng_impl.h"
+#include "supplemental/http/http.h"
+#include "supplemental/sha1/sha1.h"
+
+const uint8_t utf8_sha1sum[20] = { 0x54, 0xf3, 0xb8, 0xbb, 0xfe, 0xda, 0x6f,
+ 0xb4, 0x96, 0xdd, 0xc9, 0x8b, 0x8c, 0x41, 0xf4, 0xfe, 0xe5, 0xa9, 0x7d,
+ 0xa9 };
+
+TestMain("HTTP Client", {
+
+ nni_init();
+ atexit(nng_fini);
+
+ Convey("Given a TCP connection to httpbin.org", {
+ nni_plat_tcp_ep * ep;
+ nni_plat_tcp_pipe *p;
+ nng_aio * aio;
+ nni_aio * iaio;
+ nng_sockaddr rsa;
+ nni_http_client * cli;
+ nni_http * http;
+
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ iaio = (nni_aio *) aio;
+ iaio->a_addr = &rsa;
+
+ nng_aio_set_timeout(aio, 1000);
+ nni_plat_tcp_resolv("httpbin.org", "80", NNG_AF_INET, 0, iaio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ So(rsa.s_un.s_in.sa_port == htons(80));
+
+ So(nni_http_client_init(&cli, &rsa) == 0);
+ nni_http_client_connect(cli, iaio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ http = nni_aio_get_output(iaio, 0);
+ Reset({
+ nni_http_client_fini(cli);
+ nni_http_fini(http);
+ nng_aio_free(aio);
+ });
+
+ Convey("We can initiate a message", {
+ nni_http_req *req;
+ nni_http_res *res;
+ So(http != NULL);
+
+ So(nni_http_req_init(&req) == 0);
+ So(nni_http_res_init(&res) == 0);
+ Reset({
+ nni_http_close(http);
+ nni_http_req_fini(req);
+ nni_http_res_fini(res);
+ });
+ So(nni_http_req_set_method(req, "GET") == 0);
+ So(nni_http_req_set_version(req, "HTTP/1.1") == 0);
+ So(nni_http_req_set_uri(req, "/encoding/utf8") == 0);
+ So(nni_http_req_set_header(
+ req, "Host", "httpbin.org") == 0);
+ nni_http_write_req(http, req, iaio);
+
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ nni_http_read_res(http, res, iaio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ So(nni_http_res_get_status(res) == 200);
+
+ Convey("The message contents are correct", {
+ uint8_t digest[20];
+ void * data;
+ const char *cstr;
+ size_t sz;
+
+ cstr = nni_http_res_get_header(
+ res, "Content-Length");
+ So(cstr != NULL);
+ sz = atoi(cstr);
+ So(sz > 0);
+
+ data = nni_alloc(sz);
+ So(data != NULL);
+ Reset({ nni_free(data, sz); });
+
+ iaio->a_niov = 1;
+ iaio->a_iov[0].iov_len = sz;
+ iaio->a_iov[0].iov_buf = data;
+
+ nni_aio_wait(iaio);
+ So(nng_aio_result(aio) == 0);
+
+ nni_http_read_full(http, iaio);
+ nni_aio_wait(iaio);
+ So(nni_aio_result(iaio) == 0);
+
+ nni_sha1(data, sz, digest);
+ So(memcmp(digest, utf8_sha1sum, 20) == 0);
+ });
+ });
+ });
+});
diff --git a/tests/httpserver.c b/tests/httpserver.c
new file mode 100644
index 00000000..fa2753ea
--- /dev/null
+++ b/tests/httpserver.c
@@ -0,0 +1,146 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "trantest.h"
+
+#ifndef _WIN32
+#include <arpa/inet.h>
+#endif
+
+// Basic HTTP server tests.
+#include "core/nng_impl.h"
+#include "supplemental/http/http.h"
+#include "supplemental/sha1/sha1.h"
+
+const uint8_t utf8_sha1sum[20] = { 0x54, 0xf3, 0xb8, 0xbb, 0xfe, 0xda, 0x6f,
+ 0xb4, 0x96, 0xdd, 0xc9, 0x8b, 0x8c, 0x41, 0xf4, 0xfe, 0xe5, 0xa9, 0x7d,
+ 0xa9 };
+
+void
+cleanup(void)
+{
+ nng_fini();
+}
+
+TestMain("HTTP Client", {
+
+ nni_http_server *s;
+
+ nni_init();
+ atexit(cleanup);
+
+ Convey("We can start an HTTP server", {
+ nng_sockaddr sa;
+ nni_aio * aio;
+ char portbuf[16];
+ char *doc = "<html><body>Someone <b>is</b> home!</body</html>";
+
+ trantest_next_address(portbuf, "%u");
+
+ So(nni_aio_init(&aio, NULL, NULL) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("127.0.0.1", portbuf, NNG_AF_INET, 0, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+
+ So(nni_http_server_init(&s, &sa) == 0);
+
+ Reset({
+ nni_aio_fini(aio);
+ nni_http_server_fini(s);
+ });
+
+ So(nni_http_server_add_static(s, NULL, "text/html",
+ "/home.html", doc, strlen(doc)) == 0);
+ So(nni_http_server_start(s) == 0);
+
+ Convey("We can connect a client to it", {
+ nni_http_client *cli;
+ nni_http * h;
+ nni_http_req * req;
+ nni_http_res * res;
+
+ So(nni_http_client_init(&cli, &sa) == 0);
+ nni_http_client_connect(cli, aio);
+ nni_aio_wait(aio);
+
+ So(nni_aio_result(aio) == 0);
+ h = nni_aio_get_output(aio, 0);
+ So(h != NULL);
+ So(nni_http_req_init(&req) == 0);
+ So(nni_http_res_init(&res) == 0);
+
+ Reset({
+ nni_http_client_fini(cli);
+ nni_http_fini(h);
+ nni_http_req_fini(req);
+ nni_http_res_fini(res);
+ });
+
+ Convey("404 works", {
+ So(nni_http_req_set_method(req, "GET") == 0);
+ So(nni_http_req_set_version(req, "HTTP/1.1") ==
+ 0);
+ So(nni_http_req_set_uri(req, "/bogus") == 0);
+ So(nni_http_req_set_header(
+ req, "Host", "localhost") == 0);
+ nni_http_write_req(h, req, aio);
+
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+
+ nni_http_read_res(h, res, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+
+ So(nni_http_res_get_status(res) == 404);
+ });
+
+ Convey("Valid data works", {
+ char chunk[256];
+ const void *ptr;
+
+ So(nni_http_req_set_method(req, "GET") == 0);
+ So(nni_http_req_set_version(req, "HTTP/1.1") ==
+ 0);
+ So(nni_http_req_set_uri(req, "/home.html") ==
+ 0);
+ So(nni_http_req_set_header(
+ req, "Host", "localhost") == 0);
+ nni_http_write_req(h, req, aio);
+
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+
+ nni_http_read_res(h, res, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+
+ So(nni_http_res_get_status(res) == 200);
+
+ ptr = nni_http_res_get_header(
+ res, "Content-Length");
+ So(ptr != NULL);
+ So(atoi(ptr) == strlen(doc));
+
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = strlen(doc);
+ aio->a_iov[0].iov_buf = (void *) chunk;
+ nni_http_read_full(h, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+ So(nni_aio_count(aio) == strlen(doc));
+ So(memcmp(chunk, doc, strlen(doc)) == 0);
+ });
+
+ });
+ });
+});
diff --git a/tests/platform.c b/tests/platform.c
index 74dbab50..10278e7d 100644
--- a/tests/platform.c
+++ b/tests/platform.c
@@ -52,7 +52,7 @@ TestMain("Platform Operations", {
nng_msleep(100);
So((getms() - now) >= 100); // cannot be *shorter*!!
- So((getms() - now) < 150); // crummy clock resolution?
+ So((getms() - now) < 200); // crummy clock resolution?
});
Convey("times work", {
uint64_t msend;
@@ -69,8 +69,8 @@ TestMain("Platform Operations", {
usdelta = (int) (usend - usnow);
msdelta = (int) (msend - now);
So(usdelta >= 200);
- So(usdelta < 220);
- So(abs(msdelta - usdelta) < 20);
+ So(usdelta < 250); // increased tolerance for CIs
+ So(abs(msdelta - usdelta) < 50);
});
});
Convey("Mutexes work", {
diff --git a/tests/sha1.c b/tests/sha1.c
index f26a5b9c..9f1901e0 100644
--- a/tests/sha1.c
+++ b/tests/sha1.c
@@ -9,7 +9,7 @@
//
#include <stdint.h>
-#include <strings.h>
+#include <string.h>
#include "convey.h"
diff --git a/tests/tls.c b/tests/tls.c
index 6ec249cf..fa44d9c9 100644
--- a/tests/tls.c
+++ b/tests/tls.c
@@ -128,6 +128,7 @@ TestMain("TLS Transport", {
tt.init = init_tls;
tt.tmpl = "tls+tcp://127.0.0.1:%u";
+ atexit(nng_fini);
trantest_test(&tt);
@@ -170,7 +171,8 @@ TestMain("TLS Transport", {
So(nng_tls_register() == 0);
So(nng_pair_open(&s1) == 0);
Reset({ nng_close(s1); });
- So(nng_dial(s1, "tls+tcp://127.0.0.1", NULL, 0) == NNG_EADDRINVAL);
+ So(nng_dial(s1, "tls+tcp://127.0.0.1", NULL, 0) ==
+ NNG_EADDRINVAL);
So(nng_dial(s1, "tls+tcp://127.0.0.1.32", NULL, 0) ==
NNG_EADDRINVAL);
So(nng_dial(s1, "tls+tcp://127.0.x.1.32", NULL, 0) ==
@@ -183,5 +185,4 @@ TestMain("TLS Transport", {
NNG_EADDRINVAL);
});
- nng_fini();
})
diff --git a/tests/trantest.h b/tests/trantest.h
index 4f6dfe7f..b1b0ff80 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -58,6 +58,9 @@ unsigned trantest_port = 0;
#ifndef NNG_HAVE_TLS
#define nng_tls_register notransport
#endif
+#ifndef NNG_HAVE_WEBSOCKET
+#define nng_ws_register notransport
+#endif
int
notransport(void)
@@ -88,6 +91,9 @@ trantest_checktran(const char *url)
#ifndef NNG_HAVE_TLS
CHKTRAN(url, "tls+tcp:");
#endif
+#ifndef NNG_HAVE_WEBSOCKET
+ CHKTRAN(url, "ws:");
+#endif
(void) url;
}
@@ -99,7 +105,10 @@ trantest_next_address(char *out, const char *template)
if (trantest_port == 0) {
char *pstr;
- trantest_port = 5555;
+
+ // start at a different port each time -- 5000 - 10000 --
+ // unless a specific port is given.
+ trantest_port = nni_clock() % 5000 + 5000;
if (((pstr = ConveyGetEnv("TEST_PORT")) != NULL) &&
(atoi(pstr) != 0)) {
trantest_port = atoi(pstr);
@@ -218,7 +227,7 @@ trantest_send_recv(trantest *tt)
So(l != 0);
So(trantest_dial(tt) == 0);
- nng_msleep(20); // listener may be behind slightly
+ nng_msleep(200); // listener may be behind slightly
send = NULL;
So(nng_msg_alloc(&send, 0) == 0);
@@ -265,7 +274,7 @@ trantest_check_properties(trantest *tt, trantest_proptest_t f)
So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0);
So(d != 0);
- nng_msleep(10); // listener may be behind slightly
+ nng_msleep(200); // listener may be behind slightly
send = NULL;
So(nng_msg_alloc(&send, 0) == 0);
@@ -307,7 +316,7 @@ trantest_send_recv_large(trantest *tt)
So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0);
So(d != 0);
- nng_msleep(10); // listener may be behind slightly
+ nng_msleep(200); // listener may be behind slightly
send = NULL;
So(nng_msg_alloc(&send, size) == 0);
diff --git a/tests/ws.c b/tests/ws.c
new file mode 100644
index 00000000..f6eac685
--- /dev/null
+++ b/tests/ws.c
@@ -0,0 +1,97 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/pair1/pair.h"
+#include "trantest.h"
+
+#include "stubs.h"
+// TCP tests.
+
+#ifndef _WIN32
+#include <arpa/inet.h>
+#endif
+
+static int
+check_props_v4(nng_msg *msg, nng_listener l, nng_dialer d)
+{
+#if 0
+ nng_pipe p;
+ size_t z;
+ p = nng_msg_get_pipe(msg);
+ So(p > 0);
+
+ Convey("Local address property works", {
+ nng_sockaddr la;
+ z = sizeof(nng_sockaddr);
+ So(nng_pipe_getopt(p, NNG_OPT_LOCADDR, &la, &z) == 0);
+ So(z == sizeof(la));
+ So(la.s_un.s_family == NNG_AF_INET);
+ So(la.s_un.s_in.sa_port == htons(trantest_port - 1));
+ So(la.s_un.s_in.sa_port != 0);
+ So(la.s_un.s_in.sa_addr == htonl(0x7f000001));
+ });
+
+ Convey("Remote address property works", {
+ nng_sockaddr ra;
+ z = sizeof(nng_sockaddr);
+ So(nng_pipe_getopt(p, NNG_OPT_REMADDR, &ra, &z) == 0);
+ So(z == sizeof(ra));
+ So(ra.s_un.s_family == NNG_AF_INET);
+ So(ra.s_un.s_in.sa_port != 0);
+ So(ra.s_un.s_in.sa_addr == htonl(0x7f000001));
+ });
+#endif
+ return (0);
+}
+
+TestMain("WebSocket Transport", {
+
+ trantest_test_extended("ws://127.0.0.1:%u/test", check_props_v4);
+
+ Convey("Wild cards work", {
+ nng_socket s1;
+ nng_socket s2;
+ char addr[NNG_MAXADDRLEN];
+
+ So(nng_pair_open(&s1) == 0);
+ So(nng_pair_open(&s2) == 0);
+ Reset({
+ nng_close(s2);
+ nng_close(s1);
+ });
+ trantest_next_address(addr, "ws://*:%u/test");
+ So(nng_listen(s1, addr, NULL, 0) == 0);
+ // reset port back one
+ trantest_prev_address(addr, "ws://127.0.0.1:%u/test");
+ So(nng_dial(s2, addr, NULL, 0) == 0);
+ });
+
+ Convey("Incorrect URL paths do not work", {
+ nng_socket s1;
+ nng_socket s2;
+ char addr[NNG_MAXADDRLEN];
+
+ So(nng_pair_open(&s1) == 0);
+ So(nng_pair_open(&s2) == 0);
+ Reset({
+ nng_close(s2);
+ nng_close(s1);
+ });
+ trantest_next_address(addr, "ws://*:%u/test");
+ So(nng_listen(s1, addr, NULL, 0) == 0);
+ // reset port back one
+ trantest_prev_address(addr, "ws://127.0.0.1:%u/nothere");
+ So(nng_dial(s2, addr, NULL, 0) == NNG_ECONNREFUSED);
+ });
+
+ nng_fini();
+})
diff --git a/tests/zt.c b/tests/zt.c
index 9680f82d..bf423df0 100644
--- a/tests/zt.c
+++ b/tests/zt.c
@@ -194,6 +194,7 @@ TestMain("ZeroTier Transport", {
unsigned port;
port = 5555;
+ atexit(nng_fini);
Convey("We can register the zero tier transport",
{ So(nng_zt_register() == 0); });
@@ -353,5 +354,4 @@ TestMain("ZeroTier Transport", {
trantest_test_extended("zt://" NWID "/*:%u", check_props);
- nng_fini();
})