aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt5
-rw-r--r--src/core/aio.c70
-rw-r--r--src/core/aio.h50
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/device.c2
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/init.c52
-rw-r--r--src/core/init.h23
-rw-r--r--src/core/list.c7
-rw-r--r--src/core/message.c1
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/reap.c89
-rw-r--r--src/core/reap.h36
-rw-r--r--src/core/socket.c30
-rw-r--r--src/core/socket.h18
-rw-r--r--src/core/strs.c91
-rw-r--r--src/core/strs.h3
-rw-r--r--src/core/transport.c72
-rw-r--r--src/core/transport.h3
-rw-r--r--src/nng.c10
-rw-r--r--src/platform/posix/posix_tcp.c4
-rw-r--r--src/supplemental/base64/base64.c2
-rw-r--r--src/supplemental/base64/base64.h2
-rw-r--r--src/supplemental/http/CMakeLists.txt17
-rw-r--r--src/supplemental/http/client.c147
-rw-r--r--src/supplemental/http/http.c604
-rw-r--r--src/supplemental/http/http.h290
-rw-r--r--src/supplemental/http/http_msg.c956
-rw-r--r--src/supplemental/http/server.c1103
-rw-r--r--src/supplemental/mbedtls/tls.c6
-rw-r--r--src/supplemental/websocket/CMakeLists.txt14
-rw-r--r--src/supplemental/websocket/websocket.c1935
-rw-r--r--src/supplemental/websocket/websocket.h63
-rw-r--r--src/transport/tls/tls.c70
-rw-r--r--src/transport/ws/CMakeLists.txt19
-rw-r--r--src/transport/ws/README.adoc38
-rw-r--r--src/transport/ws/websocket.c533
-rw-r--r--src/transport/ws/websocket.h62
-rw-r--r--src/transport/zerotier/zerotier.c15
39 files changed, 6316 insertions, 134 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 93fa390d..6fae89e0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -62,6 +62,8 @@ set (NNG_SOURCES
core/protocol.h
core/random.c
core/random.h
+ core/reap.c
+ core/reap.h
core/socket.c
core/socket.h
core/strs.c
@@ -120,8 +122,10 @@ if (NNG_PLATFORM_WINDOWS)
endif()
add_subdirectory(supplemental/base64)
+add_subdirectory(supplemental/http)
add_subdirectory(supplemental/mbedtls)
add_subdirectory(supplemental/sha1)
+add_subdirectory(supplemental/websocket)
add_subdirectory(protocol/bus0)
add_subdirectory(protocol/pair0)
@@ -135,6 +139,7 @@ add_subdirectory(transport/inproc)
add_subdirectory(transport/ipc)
add_subdirectory(transport/tcp)
add_subdirectory(transport/tls)
+add_subdirectory(transport/ws)
add_subdirectory(transport/zerotier)
include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src
diff --git a/src/core/aio.c b/src/core/aio.c
index cec2ff7c..6ce5641d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -66,7 +67,8 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
}
memset(aio, 0, sizeof(*aio));
nni_cv_init(&aio->a_cv, &nni_aio_lk);
- aio->a_expire = NNI_TIME_NEVER;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_timeout = NNG_DURATION_INFINITE;
if (arg == NULL) {
arg = aio;
}
@@ -116,9 +118,9 @@ nni_aio_stop(nni_aio *aio)
}
void
-nni_aio_set_timeout(nni_aio *aio, nni_time when)
+nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
- aio->a_expire = when;
+ aio->a_timeout = when;
}
void
@@ -158,15 +160,54 @@ nni_aio_get_ep(nni_aio *aio)
}
void
-nni_aio_set_data(nni_aio *aio, void *data)
+nni_aio_set_data(nni_aio *aio, int index, void *data)
{
- aio->a_data = data;
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) {
+ aio->a_user_data[index] = data;
+ }
}
void *
-nni_aio_get_data(nni_aio *aio)
+nni_aio_get_data(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) {
+ return (aio->a_user_data[index]);
+ }
+ return (NULL);
+}
+
+void
+nni_aio_set_input(nni_aio *aio, int index, void *data)
{
- return (aio->a_data);
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) {
+ aio->a_inputs[index] = data;
+ }
+}
+
+void *
+nni_aio_get_input(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) {
+ return (aio->a_inputs[index]);
+ }
+ return (NULL);
+}
+
+void
+nni_aio_set_output(nni_aio *aio, int index, void *data)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) {
+ aio->a_outputs[index] = data;
+ }
+}
+
+void *
+nni_aio_get_output(nni_aio *aio, int index)
+{
+ if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) {
+ return (aio->a_outputs[index]);
+ }
+ return (NULL);
}
int
@@ -219,8 +260,21 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
aio->a_active = 1;
- if (aio->a_expire != NNI_TIME_NEVER) {
+
+ // Convert the relative timeout to an absolute timeout.
+ switch (aio->a_timeout) {
+ case NNG_DURATION_ZERO:
+ aio->a_expire = NNI_TIME_ZERO;
+ nni_aio_expire_add(aio);
+ break;
+ case NNG_DURATION_INFINITE:
+ case NNG_DURATION_DEFAULT:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_timeout;
nni_aio_expire_add(aio);
+ break;
}
nni_mtx_unlock(&nni_aio_lk);
return (0);
diff --git a/src/core/aio.h b/src/core/aio.h
index 3bdcf433..c4c09421 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -22,9 +23,10 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, int);
// An nni_aio is an async I/O handle.
struct nni_aio {
- int a_result; // Result code (nng_errno)
- size_t a_count; // Bytes transferred (I/O only)
- nni_time a_expire;
+ int a_result; // Result code (nng_errno)
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
nni_cv a_cv;
@@ -35,8 +37,6 @@ struct nni_aio {
unsigned a_expiring : 1; // expiration callback in progress
unsigned a_waiting : 1; // a thread is waiting for this to finish
unsigned a_synch : 1; // run completion synchronously
- unsigned a_reltime : 1; // expiration time is relative
- unsigned a_pad : 25; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
@@ -53,13 +53,21 @@ struct nni_aio {
// Resolver operations.
nni_sockaddr *a_addr;
- // Extra user data.
- void *a_data;
+ // User scratch data. Consumers may store values here, which
+ // must be preserved by providers and the framework.
+ void *a_user_data[4];
+
+ // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
+ // specified. The semantics of these will vary, and depend on the
+ // specific operation.
+ void *a_inputs[4];
+ void *a_outputs[4];
// Provider-use fields.
nni_aio_cancelfn a_prov_cancel;
void * a_prov_data;
nni_list_node a_prov_node;
+ void * a_prov_extra[4]; // Extra data used by provider
// Expire node.
nni_list_node a_expire_node;
@@ -96,12 +104,32 @@ extern void nni_aio_stop(nni_aio *);
// nni_aio_set_data sets user data. This should only be done by the
// consumer, initiating the I/O. The intention is to be able to store
// additional data for use when the operation callback is executed.
-extern void nni_aio_set_data(nni_aio *, void *);
+// The index represents the "index" at which to store the data. A maximum
+// of 4 elements can be stored with the (index >= 0 && index < 4).
+extern void nni_aio_set_data(nni_aio *, int, void *);
// nni_aio_get_data returns the user data that was previously stored
// with nni_aio_set_data.
-extern void *nni_aio_get_data(nni_aio *);
+extern void *nni_aio_get_data(nni_aio *, int);
+
+// nni_set_input sets input parameters on the AIO. The semantic details
+// of this will be determined by the specific AIO operation. AIOs can
+// carry up to 4 input parameters.
+extern void nni_aio_set_input(nni_aio *, int, void *);
+
+// nni_get_input returns the input value stored by nni_aio_set_input.
+extern void *nni_aio_get_input(nni_aio *, int);
+
+// nni_set_output sets output results on the AIO, allowing providers to
+// return results to consumers. The semantic details are determined by
+// the AIO operation. Up to 4 outputs can be carried on an AIO.
+extern void nni_aio_set_output(nni_aio *, int, void *);
+
+// nni_get_output returns an output previously stored on the AIO.
+extern void *nni_aio_get_output(nni_aio *, int);
+// XXX: These should be refactored in terms of the generic inputs and
+// outputs.
extern void nni_aio_set_msg(nni_aio *, nni_msg *);
extern nni_msg *nni_aio_get_msg(nni_aio *);
extern void nni_aio_set_pipe(nni_aio *, void *);
@@ -122,10 +150,10 @@ extern void * nni_aio_get_ep(nni_aio *);
// completion callback.
void nni_aio_set_synch(nni_aio *);
-// nni_aio_set_timeout sets the timeout (absolute) when the AIO will
+// nni_aio_set_timeout sets the timeout (relative) when the AIO will
// be canceled. The cancelation does not happen until after nni_aio_start
// is called.
-extern void nni_aio_set_timeout(nni_aio *, nni_time);
+extern void nni_aio_set_timeout(nni_aio *, nni_duration);
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
diff --git a/src/core/defs.h b/src/core/defs.h
index 5a9ded92..3a714f85 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -25,6 +25,9 @@
#define NNI_ASSERT(x)
#endif
+// Returns the size of an array in elements. (Convenience.)
+#define NNI_NUM_ELEMENTS(x) (sizeof(x) / sizeof((x)[0]))
+
// These types are common but have names shared with user space.
typedef struct nng_msg nni_msg;
typedef struct nng_sockaddr nni_sockaddr;
diff --git a/src/core/device.c b/src/core/device.c
index 2e000d7e..e6b75897 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -159,7 +159,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
return (rv);
}
- nni_aio_set_timeout(p->aio, NNI_TIME_NEVER);
+ nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE);
}
dd->npath = npath;
*dp = dd;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fa30bf77..3058f5c0 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -333,8 +333,8 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- nni_aio_set_timeout(ep->ep_tmo_aio,
- nni_clock() + (backoff ? nni_random() % backoff : 0));
+ nni_aio_set_timeout(
+ ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
diff --git a/src/core/init.c b/src/core/init.c
index 4a7fd974..cb6dbeee 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,6 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -10,15 +11,25 @@
#include "core/nng_impl.h"
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
+static nni_mtx nni_init_mtx;
+static nni_list nni_init_list;
+static bool nni_inited = false;
+
static int
nni_init_helper(void)
{
int rv;
+ nni_mtx_init(&nni_init_mtx);
+ NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node);
+ nni_inited = true;
+
if (((rv = nni_taskq_sys_init()) != 0) ||
+ ((rv = nni_reap_sys_init()) != 0) ||
((rv = nni_timer_sys_init()) != 0) ||
((rv = nni_aio_sys_init()) != 0) ||
((rv = nni_random_sys_init()) != 0) ||
@@ -29,6 +40,7 @@ nni_init_helper(void)
((rv = nni_tran_sys_init()) != 0)) {
nni_fini();
}
+
return (rv);
}
@@ -41,14 +53,54 @@ nni_init(void)
void
nni_fini(void)
{
+ if (!nni_inited) {
+ return;
+ }
+ if (!nni_list_empty(&nni_init_list)) {
+ nni_initializer *init;
+
+ nni_mtx_lock(&nni_init_mtx);
+ while ((init = nni_list_first(&nni_init_list)) != NULL) {
+ if (init->i_fini != NULL) {
+ init->i_fini();
+ }
+ init->i_once = 0;
+ nni_list_remove(&nni_init_list, init);
+ }
+ nni_mtx_unlock(&nni_init_mtx);
+ }
nni_tran_sys_fini();
nni_proto_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
nni_sock_sys_fini();
nni_random_sys_fini();
+ nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_aio_sys_fini();
nni_timer_sys_fini();
nni_taskq_sys_fini();
+
+ nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
+ nni_inited = false;
+}
+
+int
+nni_initialize(nni_initializer *init)
+{
+ int rv;
+ if (init->i_once) {
+ return (0);
+ }
+ nni_mtx_lock(&nni_init_mtx);
+ if (init->i_once) {
+ nni_mtx_unlock(&nni_init_mtx);
+ return (0);
+ }
+ if ((rv = init->i_init()) == 0) {
+ init->i_once = 1;
+ nni_list_append(&nni_init_list, init);
+ }
+ nni_mtx_unlock(&nni_init_mtx);
+ return (rv);
}
diff --git a/src/core/init.h b/src/core/init.h
index ffcebf64..d21bb4c5 100644
--- a/src/core/init.h
+++ b/src/core/init.h
@@ -1,5 +1,7 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -21,4 +23,23 @@ int nni_init(void);
// that all resources used by the library are released back to the system.
void nni_fini(void);
+typedef struct nni_initializer {
+ int (*i_init)(void); // i_init is called exactly once
+ void (*i_fini)(void); // i_fini is called on shutdown
+ int i_once; // private -- initialize to zero
+ nni_list_node i_node; // private -- initialize to zero
+} nni_initializer;
+
+// nni_initialize will call the initialization routine exactly once. This is
+// done efficiently, so that if the caller has initialized already, then
+// subsequent calls are "cheap" (no synchronization cost). The initialization
+// function must not itself cause any further calls to nni_initialize; the
+// function should limit itself to initialization of locks and static data
+// structures. When shutting down, the finalizer will be called. The
+// order in which finalizers are called is unspecified.
+//
+// An initializer may fail (due to resource exhaustion), in which case the
+// return value of nni_initialize will be non-zero.
+int nni_initialize(nni_initializer *);
+
#endif // CORE_INIT_H
diff --git a/src/core/list.c b/src/core/list.c
index 8b26b64a..f489a705 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -1,5 +1,7 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -151,7 +153,10 @@ nni_list_active(nni_list *list, void *item)
int
nni_list_empty(nni_list *list)
{
- return (list->ll_head.ln_next == &list->ll_head);
+ // The first check ensures that we treat an uninitialized list
+ // as empty. This use useful for statically initialized lists.
+ return ((list->ll_head.ln_next == NULL) ||
+ (list->ll_head.ln_next == &list->ll_head));
}
int
diff --git a/src/core/message.c b/src/core/message.c
index b44dfdf6..35153f01 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -9,7 +9,6 @@
//
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 87dd2de1..1f2297c1 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -38,6 +38,7 @@
#include "core/panic.h"
#include "core/protocol.h"
#include "core/random.h"
+#include "core/reap.h"
#include "core/strs.h"
#include "core/taskq.h"
#include "core/thread.h"
diff --git a/src/core/reap.c b/src/core/reap.c
new file mode 100644
index 00000000..8191dba3
--- /dev/null
+++ b/src/core/reap.c
@@ -0,0 +1,89 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#include "reap.h"
+
+#include <stdbool.h>
+
+static nni_list nni_reap_list;
+static nni_mtx nni_reap_mtx;
+static nni_cv nni_reap_cv;
+static bool nni_reap_exit = false;
+static nni_thr nni_reap_thr;
+
+static void
+nni_reap_stuff(void *notused)
+{
+ NNI_ARG_UNUSED(notused);
+
+ nni_mtx_lock(&nni_reap_mtx);
+ for (;;) {
+ nni_reap_item *item;
+ if ((item = nni_list_first(&nni_reap_list)) != NULL) {
+ nni_list_remove(&nni_reap_list, item);
+ nni_mtx_unlock(&nni_reap_mtx);
+
+ item->r_func(item->r_ptr);
+ nni_mtx_lock(&nni_reap_mtx);
+ continue;
+ }
+
+ if (nni_reap_exit) {
+ break;
+ }
+
+ nni_cv_wait(&nni_reap_cv);
+ }
+ nni_mtx_unlock(&nni_reap_mtx);
+}
+
+void
+nni_reap(nni_reap_item *item, nni_cb func, void *ptr)
+{
+ nni_mtx_lock(&nni_reap_mtx);
+ item->r_func = func;
+ item->r_ptr = ptr;
+ nni_list_append(&nni_reap_list, item);
+ nni_cv_wake(&nni_reap_cv);
+ nni_mtx_unlock(&nni_reap_mtx);
+}
+
+int
+nni_reap_sys_init(void)
+{
+ int rv;
+
+ NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link);
+ nni_mtx_init(&nni_reap_mtx);
+ nni_cv_init(&nni_reap_cv, &nni_reap_mtx);
+ nni_reap_exit = false;
+
+ // If this fails, we don't fail init, instead we will try to
+ // start up at reap time.
+ if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) {
+ nni_cv_fini(&nni_reap_cv);
+ nni_mtx_fini(&nni_reap_mtx);
+ return (rv);
+ }
+ nni_thr_run(&nni_reap_thr);
+ return (0);
+}
+
+void
+nni_reap_sys_fini(void)
+{
+ nni_mtx_lock(&nni_reap_mtx);
+ nni_reap_exit = true;
+ nni_cv_wake(&nni_reap_cv);
+ nni_mtx_unlock(&nni_reap_mtx);
+ nni_thr_fini(&nni_reap_thr);
+}
diff --git a/src/core/reap.h b/src/core/reap.h
new file mode 100644
index 00000000..fbc008b2
--- /dev/null
+++ b/src/core/reap.h
@@ -0,0 +1,36 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_REAP_H
+#define CORE_REAP_H
+
+#include "core/defs.h"
+#include "core/list.h"
+
+// nni_reap_item is defined here so that it can be inlined into
+// structures. Callers must access its members directly.
+typedef struct nni_reap_item {
+ nni_list_node r_link;
+ void * r_ptr;
+ nni_cb r_func;
+} nni_reap_item;
+
+// nni_reap performs an asynchronous reap of an item. This allows functions
+// it calls to acquire locks or resources without worrying about deadlocks
+// (such as from a completion callback.) The called function should avoid
+// blocking for too long if possible, since only one reap thread is present
+// in the system. The intended usage is for an nni_reap_item to be a member
+// of the structure to be reaped, and and then this function is called to
+// finalize it.
+extern void nni_reap(nni_reap_item *, nni_cb, void *);
+extern int nni_reap_sys_init(void);
+extern void nni_reap_sys_fini(void);
+
+#endif // CORE_REAP_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 54e1bd6a..409e4f66 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -775,22 +775,8 @@ nni_sock_closeall(void)
static void
nni_sock_normalize_expiration(nni_aio *aio, nni_duration def)
{
- if (aio->a_reltime) {
- if (aio->a_expire == (nni_time) -2) {
- aio->a_expire = def;
- }
- switch (aio->a_expire) {
- case (nni_time) 0:
- aio->a_expire = NNI_TIME_ZERO;
- break;
- case (nni_time) -1:
- aio->a_expire = NNI_TIME_NEVER;
- break;
- default:
- aio->a_expire = nni_clock() + aio->a_expire;
- break;
- }
- aio->a_reltime = 0;
+ if (aio->a_timeout == (nni_duration) -2) {
+ aio->a_timeout = def;
}
}
@@ -821,6 +807,18 @@ nni_sock_peer(nni_sock *sock)
return (sock->s_peer_id.p_id);
}
+const char *
+nni_sock_proto_name(nni_sock *sock)
+{
+ return (sock->s_self_id.p_name);
+}
+
+const char *
+nni_sock_peer_name(nni_sock *sock)
+{
+ return (sock->s_peer_id.p_name);
+}
+
void
nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 52310ef3..37c67436 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -14,14 +14,16 @@
extern int nni_sock_sys_init(void);
extern void nni_sock_sys_fini(void);
-extern int nni_sock_find(nni_sock **, uint32_t);
-extern void nni_sock_rele(nni_sock *);
-extern int nni_sock_open(nni_sock **, const nni_proto *);
-extern void nni_sock_close(nni_sock *);
-extern void nni_sock_closeall(void);
-extern int nni_sock_shutdown(nni_sock *);
-extern uint16_t nni_sock_proto(nni_sock *);
-extern uint16_t nni_sock_peer(nni_sock *);
+extern int nni_sock_find(nni_sock **, uint32_t);
+extern void nni_sock_rele(nni_sock *);
+extern int nni_sock_open(nni_sock **, const nni_proto *);
+extern void nni_sock_close(nni_sock *);
+extern void nni_sock_closeall(void);
+extern int nni_sock_shutdown(nni_sock *);
+extern uint16_t nni_sock_proto(nni_sock *);
+extern uint16_t nni_sock_peer(nni_sock *);
+extern const char *nni_sock_proto_name(nni_sock *);
+extern const char *nni_sock_peer_name(nni_sock *);
extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t);
extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *);
extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
diff --git a/src/core/strs.c b/src/core/strs.c
index 6cee605b..a03c0bb5 100644
--- a/src/core/strs.c
+++ b/src/core/strs.c
@@ -8,6 +8,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <ctype.h>
+#include <stdarg.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -17,35 +20,28 @@
// part of standard C99. (C11 has added some things here, but we cannot
// count on them.)
+// Note that we supply our own version of strdup and strfree unconditionally,
+// so that these can be freed with nni_free(strlen(s)+1) if desired. (Likewise
+// a string buffer allocated with nni_alloc can be freed with nni_strfree
+// provided the length is correct.)
+
char *
nni_strdup(const char *src)
{
-#ifdef NNG_HAVE_STRDUP
-#ifdef _WIN32
- return (_strdup(src));
-#else
- return (strdup(src));
-#endif
-#else
char * dst;
- size_t len = strlen(src);
+ size_t len = strlen(src) + 1;
if ((dst = nni_alloc(len)) != NULL) {
memcpy(dst, src, len);
}
return (dst);
-#endif
}
void
nni_strfree(char *s)
{
if (s != NULL) {
-#ifdef NNG_HAVE_STRDUP
- free(s);
-#else
nni_free(s, strlen(s) + 1);
-#endif
}
}
@@ -114,3 +110,72 @@ nni_strnlen(const char *s, size_t len)
return (n);
#endif
}
+
+char *
+nni_strcasestr(const char *s1, const char *s2)
+{
+#ifdef NNG_HAVE_STRCASESTR
+ return (strcasestr(s1, s2));
+#else
+ const char *t1, *t2;
+ while (*s1) {
+ for (t1 = s1, t2 = s2; *t1 && *t2; t2++, t1++) {
+ if (tolower(*t1) != tolower(*t2)) {
+ break;
+ }
+ }
+ if (*t2 == 0) {
+ return ((char *) s1);
+ }
+ s1++;
+ }
+ return (NULL);
+#endif
+}
+
+int
+nni_strncasecmp(const char *s1, const char *s2, size_t n)
+{
+#ifdef NNG_HAVE_STRNCASECMP
+#ifdef _WIN32
+ return (_strnicmp(s1, s2, n));
+#else
+ return (strncasecmp(s1, s2, n));
+#endif
+#else
+ for (int i = 0; i < n; i++) {
+ uint8_t c1 = (uint8_t) tolower(*s1++);
+ uint8_t c2 = (uint8_t) tolower(*s2++);
+ if (c1 == c2) {
+ if (c1 == 0) {
+ return (0);
+ }
+ continue;
+ }
+ return (c1 < c2 ? -1 : 1);
+ }
+ return (0);
+#endif
+}
+
+int
+nni_asprintf(char **sp, const char *fmt, ...)
+{
+ va_list ap;
+ size_t len;
+ char * s;
+
+ va_start(ap, fmt);
+ len = vsnprintf(NULL, 0, fmt, ap);
+ va_end(ap);
+ len++;
+
+ if ((s = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ va_start(ap, fmt);
+ (void) vsnprintf(s, len, fmt, ap);
+ va_end(ap);
+ *sp = s;
+ return (0);
+} \ No newline at end of file
diff --git a/src/core/strs.h b/src/core/strs.h
index 42ec4997..3b369fe4 100644
--- a/src/core/strs.h
+++ b/src/core/strs.h
@@ -18,5 +18,8 @@ extern void nni_strfree(char *);
extern size_t nni_strlcpy(char *, const char *, size_t);
extern size_t nni_strlcat(char *, const char *, size_t);
extern size_t nni_strnlen(const char *, size_t);
+extern char * nni_strcasestr(const char *, const char *);
+extern int nni_strncasecmp(const char *, const char *, size_t);
+extern int nni_asprintf(char **, const char *, ...);
#endif // CORE_STRS_H
diff --git a/src/core/transport.c b/src/core/transport.c
index 359b03fd..31da773f 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -13,6 +13,7 @@
#include "transport/ipc/ipc.h"
#include "transport/tcp/tcp.h"
#include "transport/tls/tls.h"
+#include "transport/ws/websocket.h"
#include "transport/zerotier/zerotier.h"
#include <stdio.h>
@@ -105,6 +106,72 @@ nni_tran_find(const char *addr)
return (NULL);
}
+// nni_tran_parse_host_port is a convenience routine to parse the host portion
+// of a URL (which includes a DNS name or IP address and an optional service
+// name or port, separated by a colon) into its host and port name parts. It
+// understands IPv6 address literals when surrounded by brackets ([]).
+// If either component is empty, then NULL is passed back for the value,
+// otherwise a string suitable for freeing with nni_strfree is supplied.
+int
+nni_tran_parse_host_port(const char *pair, char **hostp, char **portp)
+{
+ const char *hstart;
+ const char *pstart;
+ char * host;
+ char * port;
+ size_t hlen, plen;
+
+ if (pair[0] == '[') {
+ hstart = pair + 1;
+ hlen = 0;
+ while (hstart[hlen] != ']') {
+ if (hstart[hlen] == '\0') {
+ return (NNG_EADDRINVAL);
+ }
+ hlen++;
+ }
+ pstart = hstart + hlen + 1; // skip over the trailing ']'
+ } else {
+ // Normal thing.
+ hstart = pair;
+ hlen = 0;
+ while ((hstart[hlen] != ':') && (hstart[hlen] != '\0')) {
+ hlen++;
+ }
+ pstart = hstart + hlen;
+ }
+ if (pstart[0] == ':') {
+ pstart++;
+ }
+ plen = strlen(pstart);
+
+ host = NULL;
+ if (hostp) {
+ if ((hlen > 1) || ((hlen == 1) && (*hstart != '*'))) {
+ if ((host = nni_alloc(hlen + 1)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ memcpy(host, hstart, hlen);
+ host[hlen] = '\0';
+ }
+ }
+
+ port = NULL;
+ if ((plen != 0) && (portp)) {
+ if ((port = nni_strdup(pstart)) == NULL) {
+ nni_strfree(host);
+ return (NNG_ENOMEM);
+ }
+ }
+ if (hostp) {
+ *hostp = host;
+ }
+ if (portp) {
+ *portp = port;
+ }
+ return (0);
+}
+
int
nni_tran_chkopt(const char *name, const void *v, size_t sz)
{
@@ -154,9 +221,12 @@ static nni_tran_ctor nni_tran_ctors[] = {
#ifdef NNG_HAVE_TLS
nng_tls_register,
#endif
-#ifdef NNI_HAVE_ZEROTIER
+#ifdef NNG_HAVE_ZEROTIER
nng_zt_register,
#endif
+#ifdef NNG_HAVE_WEBSOCKET
+ nng_ws_register,
+#endif
NULL,
};
diff --git a/src/core/transport.h b/src/core/transport.h
index b82e2c92..e8a1f620 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -163,6 +163,9 @@ struct nni_tran_pipe {
nni_tran_pipe_option *p_options;
};
+// Utility for transports.
+extern int nni_tran_parse_host_port(const char *, char **, char **);
+
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(const char *);
diff --git a/src/nng.c b/src/nng.c
index db4a5c7c..6cd78e1d 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -969,8 +969,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
*app = (nng_aio *) aio;
}
- aio->a_expire = (nni_time) NNG_DURATION_DEFAULT;
- aio->a_reltime = 1;
+ aio->a_timeout = NNG_DURATION_DEFAULT;
return (rv);
}
@@ -1020,11 +1019,8 @@ void
nng_aio_set_timeout(nng_aio *ap, nng_duration dur)
{
// Durations here are relative, since we have no notion of a
- // common clock. But underlying aio uses absolute times normally.
- // Fortunately the absolute times are big enough; we just have to
- // make sure that we "convert" the timeout from relative time to
- // absolute time when submitting operations.
- nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur);
+ // common clock..
+ nni_aio_set_timeout((nni_aio *) ap, dur);
}
#if 0
diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c
index e03f8056..69d7e7ce 100644
--- a/src/platform/posix/posix_tcp.c
+++ b/src/platform/posix/posix_tcp.c
@@ -38,11 +38,11 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa,
return (rv);
}
- if (rsa->s_un.s_family != NNG_AF_UNSPEC) {
+ if ((rsa != NULL) && (rsa->s_un.s_family != NNG_AF_UNSPEC)) {
len = nni_posix_nn2sockaddr((void *) &ss, rsa);
nni_posix_epdesc_set_remote(ed, &ss, len);
}
- if (lsa->s_un.s_family != NNG_AF_UNSPEC) {
+ if ((lsa != NULL) && (lsa->s_un.s_family != NNG_AF_UNSPEC)) {
len = nni_posix_nn2sockaddr((void *) &ss, lsa);
nni_posix_epdesc_set_local(ed, &ss, len);
}
diff --git a/src/supplemental/base64/base64.c b/src/supplemental/base64/base64.c
index 95a4a491..2f6b4da2 100644
--- a/src/supplemental/base64/base64.c
+++ b/src/supplemental/base64/base64.c
@@ -1,6 +1,6 @@
//
// Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
-// Copyright 2017 Staysail Systems, Inc.
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"),
diff --git a/src/supplemental/base64/base64.h b/src/supplemental/base64/base64.h
index eb186ae0..68346435 100644
--- a/src/supplemental/base64/base64.h
+++ b/src/supplemental/base64/base64.h
@@ -1,6 +1,6 @@
//
// Copyright (c) 2014 Wirebird Labs LLC. All rights reserved.
-// Copyright 2017 Staysail Systems, Inc.
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"),
diff --git a/src/supplemental/http/CMakeLists.txt b/src/supplemental/http/CMakeLists.txt
new file mode 100644
index 00000000..dff06d70
--- /dev/null
+++ b/src/supplemental/http/CMakeLists.txt
@@ -0,0 +1,17 @@
+#
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+set(HTTP_SOURCES
+ supplemental/http/http.c
+ supplemental/http/http_msg.c
+ supplemental/http/server.c
+ supplemental/http/client.c
+ supplemental/http/http.h)
+set(NNG_SOURCES ${NNG_SOURCES} ${HTTP_SOURCES} PARENT_SCOPE)
diff --git a/src/supplemental/http/client.c b/src/supplemental/http/client.c
new file mode 100644
index 00000000..374ab5bb
--- /dev/null
+++ b/src/supplemental/http/client.c
@@ -0,0 +1,147 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+struct nni_http_client {
+ nng_sockaddr addr;
+ nni_list aios;
+ nni_mtx mtx;
+ bool closed;
+ bool tls;
+ nni_aio * connaio;
+ nni_plat_tcp_ep *tep;
+};
+
+static void
+http_conn_start(nni_http_client *c)
+{
+ nni_plat_tcp_ep_connect(c->tep, c->connaio);
+}
+
+static void
+http_conn_done(void *arg)
+{
+ nni_http_client * c = arg;
+ nni_aio * aio;
+ int rv;
+ nni_plat_tcp_pipe *p;
+ nni_http_tran t;
+ nni_http * http;
+
+ nni_mtx_lock(&c->mtx);
+ rv = nni_aio_result(c->connaio);
+ p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL;
+ if ((aio = nni_list_first(&c->aios)) == NULL) {
+ if (p != NULL) {
+ nni_plat_tcp_pipe_fini(p);
+ }
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+ nni_aio_list_remove(aio);
+
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+
+ t.h_data = p;
+ t.h_write = (void *) nni_plat_tcp_pipe_send;
+ t.h_read = (void *) nni_plat_tcp_pipe_recv;
+ t.h_close = (void *) nni_plat_tcp_pipe_close;
+ t.h_fini = (void *) nni_plat_tcp_pipe_fini;
+
+ if ((rv = nni_http_init(&http, &t)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_plat_tcp_pipe_fini(p);
+ nni_mtx_unlock(&c->mtx);
+ return;
+ }
+
+ nni_aio_set_output(aio, 0, http);
+ nni_aio_finish(aio, 0, 0);
+
+ if (!nni_list_empty(&c->aios)) {
+ http_conn_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_http_client_fini(nni_http_client *c)
+{
+ nni_aio_fini(c->connaio);
+ nni_plat_tcp_ep_fini(c->tep);
+ nni_mtx_fini(&c->mtx);
+ NNI_FREE_STRUCT(c);
+}
+
+int
+nni_http_client_init(nni_http_client **cp, nng_sockaddr *sa)
+{
+ int rv;
+
+ nni_http_client *c;
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->addr = *sa;
+ rv = nni_plat_tcp_ep_init(&c->tep, NULL, &c->addr, NNI_EP_MODE_DIAL);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(c);
+ return (rv);
+ }
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->aios);
+
+ if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) {
+ nni_http_client_fini(c);
+ return (rv);
+ }
+ *cp = c;
+ return (0);
+}
+
+static void
+http_connect_cancel(nni_aio *aio, int rv)
+{
+ nni_http_client *c = aio->a_prov_data;
+ nni_mtx_lock(&c->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ if (nni_list_empty(&c->aios)) {
+ nni_aio_cancel(c->connaio, rv);
+ }
+ nni_mtx_unlock(&c->mtx);
+}
+
+void
+nni_http_client_connect(nni_http_client *c, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_connect_cancel, aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&c->mtx);
+ nni_list_append(&c->aios, aio);
+ if (nni_list_first(&c->aios) == aio) {
+ http_conn_start(c);
+ }
+ nni_mtx_unlock(&c->mtx);
+} \ No newline at end of file
diff --git a/src/supplemental/http/http.c b/src/supplemental/http/http.c
new file mode 100644
index 00000000..3958a738
--- /dev/null
+++ b/src/supplemental/http/http.c
@@ -0,0 +1,604 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+// We insist that individual headers fit in 8K.
+// If you need more than that, you need something we can't do.
+#define HTTP_BUFSIZE 8192
+
+// types of reads
+enum read_flavor {
+ HTTP_RD_RAW,
+ HTTP_RD_FULL,
+ HTTP_RD_REQ,
+ HTTP_RD_RES,
+};
+
+enum write_flavor {
+ HTTP_WR_RAW,
+ HTTP_WR_FULL,
+ HTTP_WR_REQ,
+ HTTP_WR_RES,
+};
+
+#define SET_RD_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_RD_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+#define SET_WR_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f))
+#define GET_WR_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0])
+
+struct nni_http {
+ void *sock;
+ void (*rd)(void *, nni_aio *);
+ void (*wr)(void *, nni_aio *);
+ void (*close)(void *);
+ void (*fini)(void *);
+
+ bool closed;
+
+ nni_list rdq; // high level http read requests
+ nni_list wrq; // high level http write requests
+
+ nni_aio *rd_aio; // bottom half read operations
+ nni_aio *wr_aio; // bottom half write operations
+
+ nni_mtx mtx;
+
+ uint8_t *rd_buf;
+ size_t rd_get;
+ size_t rd_put;
+ size_t rd_bufsz;
+};
+
+static void
+http_close(nni_http *http)
+{
+ // Call with lock held.
+ nni_aio *aio;
+
+ if (http->closed) {
+ return;
+ }
+
+ http->closed = true;
+ if (nni_list_first(&http->wrq)) {
+ nni_aio_cancel(http->wr_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->wrq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+ if (nni_list_first(&http->rdq)) {
+ nni_aio_cancel(http->rd_aio, NNG_ECLOSED);
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ }
+
+ if (http->sock != NULL) {
+ http->close(http->sock);
+ }
+}
+
+void
+nni_http_close(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// http_rd_buf attempts to satisfy the read from data in the buffer.
+static int
+http_rd_buf(nni_http *http, nni_aio *aio)
+{
+ size_t cnt = http->rd_put - http->rd_get;
+ size_t n;
+ uint8_t *rbuf = http->rd_buf;
+ int i;
+ int rv;
+ bool raw = false;
+
+ rbuf += http->rd_get;
+
+ switch (GET_RD_FLAVOR(aio)) {
+ case HTTP_RD_RAW:
+ raw = true; // FALLTHROUGH
+ case HTTP_RD_FULL:
+ for (i = 0; (aio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ n = aio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ memcpy(aio->a_iov[0].iov_buf, rbuf, n);
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ http->rd_get += n;
+ rbuf += n;
+ aio->a_count += n;
+ cnt -= n;
+
+ if (aio->a_iov[0].iov_len == 0) {
+ aio->a_niov--;
+ for (i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ }
+ }
+
+ if ((aio->a_niov == 0) || (raw && (aio->a_count != 0))) {
+ // Finished the read. (We are finished if we either
+ // got *all* the data, or we got *some* data for
+ // a raw read.)
+ return (0);
+ }
+
+ // No more data left in the buffer, so use a physio.
+ // (Note that we get here if we either have not completed
+ // a full transaction on a FULL read, or were not even able
+ // to get *any* data for a partial RAW read.)
+ for (i = 0; i < aio->a_niov; i++) {
+ http->rd_aio->a_iov[i] = aio->a_iov[i];
+ }
+ nni_aio_set_data(http->rd_aio, 1, NULL);
+ http->rd_aio->a_niov = aio->a_niov;
+ http->rd(http->sock, http->rd_aio);
+ return (NNG_EAGAIN);
+
+ case HTTP_RD_REQ:
+ rv = nni_http_req_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+
+ case HTTP_RD_RES:
+ rv = nni_http_res_parse(aio->a_prov_extra[1], rbuf, cnt, &n);
+ http->rd_get += n;
+ if (http->rd_get == http->rd_put) {
+ http->rd_get = http->rd_put = 0;
+ }
+ if (rv == NNG_EAGAIN) {
+ http->rd_aio->a_niov = 1;
+ http->rd_aio->a_iov[0].iov_buf =
+ http->rd_buf + http->rd_put;
+ http->rd_aio->a_iov[0].iov_len =
+ http->rd_bufsz - http->rd_put;
+ nni_aio_set_data(http->rd_aio, 1, aio);
+ http->rd(http->sock, http->rd_aio);
+ }
+ return (rv);
+ }
+ return (NNG_EINVAL);
+}
+
+static void
+http_rd_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&http->rdq)) != NULL) {
+ int rv;
+
+ if (http->closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = http_rd_buf(http, aio);
+ }
+ switch (rv) {
+ case NNG_EAGAIN:
+ return;
+ case 0:
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, 0, aio->a_count);
+ break;
+ default:
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ http_close(http);
+ break;
+ }
+ }
+}
+
+static void
+http_rd_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->rd_aio;
+ nni_aio * uaio;
+ size_t cnt;
+ int rv;
+
+ nni_mtx_lock(&http->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ if ((uaio = nni_list_first(&http->rdq)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ cnt = nni_aio_count(aio);
+
+ // If we were reading into the buffer, then advance location(s).
+ if ((uaio = nni_aio_get_data(aio, 1)) != NULL) {
+ http->rd_put += cnt;
+ NNI_ASSERT(http->rd_put <= http->rd_bufsz);
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ // Otherwise we are completing a USER request, and there should
+ // be no data left in the user buffer.
+ NNI_ASSERT(http->rd_get == http->rd_put);
+
+ uaio = nni_list_first(&http->rdq);
+ NNI_ASSERT(uaio != NULL);
+
+ for (int i = 0; (uaio->a_niov != 0) && (cnt != 0); i++) {
+ // Pull up data from the buffer if possible.
+ size_t n = uaio->a_iov[0].iov_len;
+ if (n > cnt) {
+ n = cnt;
+ }
+ uaio->a_iov[0].iov_len -= n;
+ uaio->a_iov[0].iov_buf += n;
+ uaio->a_count += n;
+ cnt -= n;
+
+ if (uaio->a_iov[0].iov_len == 0) {
+ uaio->a_niov--;
+ for (i = 0; i < uaio->a_niov; i++) {
+ uaio->a_iov[i] = uaio->a_iov[i + 1];
+ }
+ }
+ }
+
+ // Resubmit the start. This will attempt to consume data
+ // from the read buffer (there won't be any), and then either
+ // complete the I/O (for HTTP_RD_RAW, or if there is nothing left),
+ // or submit another physio.
+ http_rd_start(http);
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->rdq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_rd_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_rd_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->rdq, aio);
+ if (nni_list_first(&http->rdq) == aio) {
+ http_rd_start(http);
+ }
+}
+
+static void
+http_wr_start(nni_http *http)
+{
+ nni_aio *aio;
+
+ if ((aio = nni_list_first(&http->wrq)) != NULL) {
+
+ for (int i = 0; i < aio->a_niov; i++) {
+ http->wr_aio->a_iov[i] = aio->a_iov[i];
+ }
+ http->wr_aio->a_niov = aio->a_niov;
+ http->wr(http->sock, http->wr_aio);
+ }
+}
+
+static void
+http_wr_cb(void *arg)
+{
+ nni_http *http = arg;
+ nni_aio * aio = http->wr_aio;
+ nni_aio * uaio;
+ int rv;
+ size_t n;
+
+ nni_mtx_lock(&http->mtx);
+
+ uaio = nni_list_first(&http->wrq);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ // We failed to complete the aio.
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ http_close(http);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ if (uaio == NULL) {
+ // write canceled?
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+ n = nni_aio_count(aio);
+ uaio->a_count += n;
+ if (GET_WR_FLAVOR(uaio) == HTTP_WR_RAW) {
+ // For raw data, we just send partial completion
+ // notices to the consumer.
+ goto done;
+ }
+ while (n) {
+ NNI_ASSERT(aio->a_niov != 0);
+
+ if (aio->a_iov[0].iov_len > n) {
+ aio->a_iov[0].iov_len -= n;
+ aio->a_iov[0].iov_buf += n;
+ break;
+ }
+ n -= aio->a_iov[0].iov_len;
+ for (int i = 0; i < aio->a_niov; i++) {
+ aio->a_iov[i] = aio->a_iov[i + 1];
+ }
+ aio->a_niov--;
+ }
+ if ((aio->a_niov != 0) && (aio->a_iov[0].iov_len != 0)) {
+ // We have more to transmit.
+ http->wr(http->sock, aio);
+ nni_mtx_unlock(&http->mtx);
+ return;
+ }
+
+done:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish(uaio, 0, uaio->a_count);
+
+ // Start next write if another is ready.
+ http_wr_start(http);
+
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_cancel(nni_aio *aio, int rv)
+{
+ nni_http *http = aio->a_prov_data;
+
+ nni_mtx_lock(&http->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ if (aio == nni_list_first(&http->wrq)) {
+ http_close(http);
+ }
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&http->mtx);
+}
+
+static void
+http_wr_submit(nni_http *http, nni_aio *aio)
+{
+ if (nni_aio_start(aio, http_wr_cancel, http) != 0) {
+ return;
+ }
+ if (http->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ nni_list_append(&http->wrq, aio);
+ if (nni_list_first(&http->wrq) == aio) {
+ http_wr_start(http);
+ }
+}
+
+void
+nni_http_read_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_REQ);
+ aio->a_prov_extra[1] = req;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RES);
+ aio->a_prov_extra[1] = res;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read_full(nni_http *http, nni_aio *aio)
+{
+ aio->a_count = 0;
+ SET_RD_FLAVOR(aio, HTTP_RD_FULL);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_read(nni_http *http, nni_aio *aio)
+{
+ SET_RD_FLAVOR(aio, HTTP_RD_RAW);
+ aio->a_prov_extra[1] = NULL;
+
+ nni_mtx_lock(&http->mtx);
+ http_rd_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_req(nni_http *http, nni_http_req *req, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_req_get_buf(req, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_REQ);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_res(nni_http *http, nni_http_res *res, nni_aio *aio)
+{
+ int rv;
+ void * buf;
+ size_t bufsz;
+
+ if ((rv = nni_http_res_get_buf(res, &buf, &bufsz)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = bufsz;
+ aio->a_iov[0].iov_buf = buf;
+ SET_WR_FLAVOR(aio, HTTP_WR_RES);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+// Writer. As with nni_http_conn_write, this is used to write data on
+// a connection that has been "upgraded" (e.g. transformed to
+// websocket). It is an error to perform other HTTP exchanges on an
+// connection after this method is called. (This mostly exists to
+// support websocket.)
+void
+nni_http_write(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_RAW);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_write_full(nni_http *http, nni_aio *aio)
+{
+ SET_WR_FLAVOR(aio, HTTP_WR_FULL);
+
+ nni_mtx_lock(&http->mtx);
+ http_wr_submit(http, aio);
+ nni_mtx_unlock(&http->mtx);
+}
+
+void
+nni_http_fini(nni_http *http)
+{
+ nni_mtx_lock(&http->mtx);
+ http_close(http);
+ if ((http->sock != NULL) && (http->fini != NULL)) {
+ http->fini(http->sock);
+ http->sock = NULL;
+ }
+ nni_mtx_unlock(&http->mtx);
+ nni_aio_stop(http->wr_aio);
+ nni_aio_stop(http->rd_aio);
+ nni_aio_fini(http->wr_aio);
+ nni_aio_fini(http->rd_aio);
+ nni_free(http->rd_buf, http->rd_bufsz);
+ nni_mtx_fini(&http->mtx);
+ NNI_FREE_STRUCT(http);
+}
+
+int
+nni_http_init(nni_http **httpp, nni_http_tran *tran)
+{
+ nni_http *http;
+ int rv;
+
+ if ((http = NNI_ALLOC_STRUCT(http)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ if ((http->rd_buf = nni_alloc(http->rd_bufsz)) == NULL) {
+ NNI_FREE_STRUCT(http);
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&http->mtx);
+ nni_aio_list_init(&http->rdq);
+ nni_aio_list_init(&http->wrq);
+
+ if (((rv = nni_aio_init(&http->wr_aio, http_wr_cb, http)) != 0) ||
+ ((rv = nni_aio_init(&http->rd_aio, http_rd_cb, http)) != 0)) {
+ nni_http_fini(http);
+ return (rv);
+ }
+ http->rd_bufsz = HTTP_BUFSIZE;
+ http->rd = tran->h_read;
+ http->wr = tran->h_write;
+ http->close = tran->h_close;
+ http->fini = tran->h_fini;
+ http->sock = tran->h_data;
+
+ *httpp = http;
+
+ return (0);
+}
diff --git a/src/supplemental/http/http.h b/src/supplemental/http/http.h
new file mode 100644
index 00000000..09c72f8a
--- /dev/null
+++ b/src/supplemental/http/http.h
@@ -0,0 +1,290 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_SUPPLEMENTAL_HTTP_HTTP_H
+#define NNG_SUPPLEMENTAL_HTTP_HTTP_H
+
+#include <stdbool.h>
+
+// nni_http_msg represents an HTTP request or response message.
+typedef struct nni_http_msg nni_http_msg;
+typedef struct nni_http_res nni_http_res;
+typedef struct nni_http_entity nni_http_entity;
+
+typedef struct nni_http_tran {
+ void *h_data;
+ void (*h_read)(void *, nni_aio *);
+ void (*h_write)(void *, nni_aio *);
+ void (*h_close)(void *);
+ void (*h_fini)(void *);
+} nni_http_tran;
+
+typedef struct nni_http_req nni_http_req;
+
+extern int nni_http_req_init(nni_http_req **);
+extern void nni_http_req_fini(nni_http_req *);
+extern void nni_http_req_reset(nni_http_req *);
+extern int nni_http_req_set_header(nni_http_req *, const char *, const char *);
+extern int nni_http_req_add_header(nni_http_req *, const char *, const char *);
+extern int nni_http_req_del_header(nni_http_req *, const char *);
+extern int nni_http_req_get_buf(nni_http_req *, void **, size_t *);
+extern int nni_http_req_set_method(nni_http_req *, const char *);
+extern int nni_http_req_set_version(nni_http_req *, const char *);
+extern int nni_http_req_set_uri(nni_http_req *, const char *);
+extern const char *nni_http_req_get_header(nni_http_req *, const char *);
+extern const char *nni_http_req_get_header(nni_http_req *, const char *);
+extern const char *nni_http_req_get_version(nni_http_req *);
+extern const char *nni_http_req_get_uri(nni_http_req *);
+extern const char *nni_http_req_get_method(nni_http_req *);
+extern int nni_http_req_parse(nni_http_req *, void *, size_t, size_t *);
+
+extern int nni_http_res_init(nni_http_res **);
+extern void nni_http_res_fini(nni_http_res *);
+extern void nni_http_res_reset(nni_http_res *);
+extern int nni_http_res_get_buf(nni_http_res *, void **, size_t *);
+extern int nni_http_res_set_header(nni_http_res *, const char *, const char *);
+extern int nni_http_res_add_header(nni_http_res *, const char *, const char *);
+extern int nni_http_res_del_header(nni_http_res *, const char *);
+extern int nni_http_res_set_version(nni_http_res *, const char *);
+extern int nni_http_res_set_status(nni_http_res *, int, const char *);
+extern const char *nni_http_res_get_header(nni_http_res *, const char *);
+extern const char *nni_http_res_get_version(nni_http_res *);
+extern const char *nni_http_res_get_reason(nni_http_res *);
+extern int nni_http_res_get_status(nni_http_res *);
+extern int nni_http_res_parse(nni_http_res *, void *, size_t, size_t *);
+extern int nni_http_res_set_data(nni_http_res *, const void *, size_t);
+extern int nni_http_res_copy_data(nni_http_res *, const void *, size_t);
+extern int nni_http_res_alloc_data(nni_http_res *, size_t);
+extern void nni_http_res_get_data(nni_http_res *, void **, size_t *);
+extern int nni_http_res_init_error(nni_http_res **, uint16_t);
+
+// HTTP status codes. This list is not exhaustive.
+enum { NNI_HTTP_STATUS_CONTINUE = 100,
+ NNI_HTTP_STATUS_SWITCHING = 101,
+ NNI_HTTP_STATUS_PROCESSING = 102,
+ NNI_HTTP_STATUS_OK = 200,
+ NNI_HTTP_STATUS_CREATED = 201,
+ NNI_HTTP_STATUS_ACCEPTED = 202,
+ NNI_HTTP_STATUS_NOT_AUTHORITATIVE = 203,
+ NNI_HTTP_STATUS_NO_CONTENT = 204,
+ NNI_HTTP_STATUS_RESET_CONTENT = 205,
+ NNI_HTTP_STATUS_PARTIAL_CONTENT = 206,
+ NNI_HTTP_STATUS_MULTI_STATUS = 207,
+ NNI_HTTP_STATUS_ALREADY_REPORTED = 208,
+ NNI_HTTP_STATUS_IM_USED = 226,
+ NNI_HTTP_STATUS_MULTIPLE_CHOICES = 300,
+ NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY = 301,
+ NNI_HTTP_STATUS_FOUND = 302,
+ NNI_HTTP_STATUS_SEE_OTHER = 303,
+ NNI_HTTP_STATUS_NOT_MODIFIED = 304,
+ NNI_HTTP_STATUS_USE_PROXY = 305,
+ NNI_HTTP_STATUS_TEMPORARY_REDIRECT = 307,
+ NNI_HTTP_STATUS_PERMANENT_REDIRECT = 308,
+ NNI_HTTP_STATUS_BAD_REQUEST = 400,
+ NNI_HTTP_STATUS_UNAUTHORIZED = 401,
+ NNI_HTTP_STATUS_PAYMENT_REQUIRED = 402,
+ NNI_HTTP_STATUS_FORBIDDEN = 403,
+ NNI_HTTP_STATUS_NOT_FOUND = 404,
+ NNI_HTTP_STATUS_METHOD_NOT_ALLOWED = 405,
+ NNI_HTTP_STATUS_NOT_ACCEPTABLE = 406,
+ NNI_HTTP_STATUS_PROXY_AUTH_REQUIRED = 407,
+ NNI_HTTP_STATUS_REQUEST_TIMEOUT = 408,
+ NNI_HTTP_STATUS_CONFLICT = 409,
+ NNI_HTTP_STATUS_GONE = 410,
+ NNI_HTTP_STATUS_LENGTH_REQUIRED = 411,
+ NNI_HTTP_STATUS_PRECONDITION_FAILED = 412,
+ NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE = 413,
+ NNI_HTTP_STATUS_URI_TOO_LONG = 414,
+ NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE = 415,
+ NNI_HTTP_STATUS_RANGE_NOT_SATISFIABLE = 416,
+ NNI_HTTP_STATUS_EXPECTATION_FAILED = 417,
+ NNI_HTTP_STATUS_TEAPOT = 418,
+ NNI_HTTP_STATUS_UNPROCESSABLE_ENTITY = 422,
+ NNI_HTTP_STATUS_LOCKED = 423,
+ NNI_HTTP_STATUS_FAILED_DEPENDENCY = 424,
+ NNI_HTTP_STATUS_UPGRADE_REQUIRED = 426,
+ NNI_HTTP_STATUS_PRECONDITION_REQUIRED = 428,
+ NNI_HTTP_STATUS_TOO_MANY_REQUESTS = 429,
+ NNI_HTTP_STATUS_HEADERS_TOO_LARGE = 431,
+ NNI_HTTP_STATUS_UNAVAIL_LEGAL_REASONS = 451,
+ NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR = 500,
+ NNI_HTTP_STATUS_NOT_IMPLEMENTED = 501,
+ NNI_HTTP_STATUS_BAD_GATEWAY = 502,
+ NNI_HTTP_STATUS_SERVICE_UNAVAILABLE = 503,
+ NNI_HTTP_STATUS_GATEWAY_TIMEOUT = 504,
+ NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP = 505,
+ NNI_HTTP_STATUS_VARIANT_ALSO_NEGOTIATES = 506,
+ NNI_HTTP_STATUS_INSUFFICIENT_STORAGE = 507,
+ NNI_HTTP_STATUS_LOOP_DETECTED = 508,
+ NNI_HTTP_STATUS_NOT_EXTENDED = 510,
+ NNI_HTTP_STATUS_NETWORK_AUTH_REQUIRED = 511,
+};
+
+// An HTTP connection is a connection over which messages are exchanged.
+// Generally, clients send request messages, and then read responses.
+// Servers, read requests, and write responses. However, we do not
+// require a 1:1 mapping between request and response here -- the application
+// is responsible for dealing with that.
+//
+// We only support HTTP/1.1, though using the nni_http_conn_read and
+// nni_http_conn_write low level methods, it is possible to write an upgrader
+// (such as websocket!) that might support e.g. HTTP/2 or reading data that
+// follows a legacy HTTP/1.0 message.
+//
+// Any error on the connection, including cancellation of a request, is fatal
+// the connection.
+typedef struct nni_http nni_http;
+
+extern int nni_http_init(nni_http **, nni_http_tran *);
+extern void nni_http_close(nni_http *);
+extern void nni_http_fini(nni_http *);
+
+// Reading messages -- the caller must supply a preinitialized (but otherwise
+// idle) message. We recommend the caller store this in the aio's user data.
+// Note that the iovs of the aio's are clobbered by these methods -- callers
+// must not use them for any other purpose.
+
+extern void nni_http_write_req(nni_http *, nni_http_req *, nni_aio *);
+extern void nni_http_write_res(nni_http *, nni_http_res *, nni_aio *);
+extern void nni_http_read_req(nni_http *, nni_http_req *, nni_aio *);
+extern void nni_http_read_res(nni_http *, nni_http_res *, nni_aio *);
+
+extern void nni_http_read(nni_http *, nni_aio *);
+extern void nni_http_read_full(nni_http *, nni_aio *);
+extern void nni_http_write(nni_http *, nni_aio *);
+extern void nni_http_write_full(nni_http *, nni_aio *);
+
+typedef struct nni_http_server nni_http_server;
+
+typedef struct {
+ // h_path is the relative URI that we are going to match against.
+ // Must not be NULL. Note that query parameters (things following
+ // a "?" at the end of the path) are ignored when matching. This
+ // field may not be NULL.
+ const char *h_path;
+
+ // h_method is the HTTP method to handle such as "GET" or "POST".
+ // Must not be empty or NULL. If the incoming method is HEAD, then
+ // the server will process HEAD the same as GET, but will not send
+ // any response body.
+ const char *h_method;
+
+ // h_host is used to match on a specific Host: entry. If left NULL,
+ // then this handler will match regardless of the Host: value.
+ const char *h_host;
+
+ // h_is_dir indicates that the path represents a directory, and
+ // any path which is a logically below it should also be matched.
+ // This means that "/phone" will match for "/phone/bob" but not
+ // "/phoneme/ma". Be advised that it is not possible to register
+ // a handler for a parent and a different handler for children.
+ // (This restriction may be lifted in the future.)
+ bool h_is_dir;
+
+ // h_is_upgrader is used for callbacks that "upgrade" (or steal)
+ // their connection. When this is true, the server framework
+ // assumes that the handler takes over *all* of the details of
+ // the connection. Consequently, the connection is disassociated
+ // from the framework, and no response is sent. (Upgraders are
+ // responsible for adopting the connection, including closing it
+ // when they are done, and for sending any HTTP response message.
+ // This is true even if an error occurs.)
+ bool h_is_upgrader;
+
+ // h_cb is a callback that handles the request. The conventions
+ // are as follows:
+ //
+ // inputs:
+ // 0 - nni_http * for the actual underlying HTTP channel
+ // 1 - nni_http_req * for the HTTP request object
+ // 2 - void * for the opaque pointer supplied at registration
+ //
+ // outputs:
+ // 0 - (optional) nni_http_res * for an HTTP response (see below)
+ //
+ // The callback may choose to return the a response object in output 0,
+ // in which case the framework will handle sending the reply.
+ // (Response entity content is also sent if the response data
+ // is not NULL.) The callback may instead do it's own replies, in
+ // which case the response output should be NULL.
+ //
+ // Note that any request entity data is *NOT* supplied automatically
+ // with the request object; the callback is expected to call the
+ // nni_http_read_data method to retrieve any message data based upon
+ // the presence of headers. (It may also call nni_http_read or
+ // nni_http_write on the channel as it sees fit.)
+ //
+ // Upgraders should call the completion routine immediately,
+ // once they have collected the request object and HTTP channel.
+ void (*h_cb)(nni_aio *);
+} nni_http_handler;
+
+// nni_http_server will look for an existing server with the same
+// socket address, or create one if one does not exist. The servers
+// are reference counted to permit sharing the server object across
+// multiple subsystems. The sockaddr matching is very limited though,
+// and the addresses must match *exactly*.
+extern int nni_http_server_init(nni_http_server **, nng_sockaddr *);
+
+// nni_http_server_fini drops the reference count on the server, and
+// if this was the last reference, closes down the server and frees
+// all related resources. It will not affect hijacked connections.
+extern void nni_http_server_fini(nni_http_server *);
+
+// nni_http_server_add_handler registers a new handler on the server.
+// This function will return NNG_EADDRINUSE if a conflicting handler
+// is already registered (i.e. a handler with the same value for Host,
+// Method, and URL.) The first parameter receives an opaque handle to
+// the handler, that can be used to unregister the handler later.
+extern int nni_http_server_add_handler(
+ void **, nni_http_server *, nni_http_handler *, void *);
+
+extern void nni_http_server_del_handler(nni_http_server *, void *);
+
+// nni_http_server_start starts listening on the supplied port.
+extern int nni_http_server_start(nni_http_server *);
+
+// nni_http_server_stop stops the server, closing the listening socket.
+// Connections that have been "upgraded" are unaffected. Connections
+// associated with a callback will complete their callback, and then close.
+extern void nni_http_server_stop(nni_http_server *);
+
+// nni_http_server_add_static is a short cut to add static
+// content handler to the server. The host may be NULL, and the
+// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL,
+// then application/octet stream will be the (probably bad) default.
+// The actual data is copied, and so the caller may discard it once
+// this function returns.
+extern int nni_http_server_add_static(nni_http_server *, const char *host,
+ const char *ctype, const char *uri, const void *, size_t);
+
+// nni_http_server_add file is a short cut to add a file-backed static
+// content handler to the server. The host may be NULL, and the
+// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL,
+// then the server will try to guess it based on the filename -- but only
+// a small number of file types are builtin. URI is the absolute URI
+// (sans hostname and scheme), and the path is the path on the local
+// filesystem where the file can be found.
+extern int nni_http_server_add_file(nni_http_server *, const char *host,
+ const char *ctype, const char *uri, const char *path);
+
+// TLS will use
+// extern int nni_http_server_start_tls(nni_http_server *, nng_sockaddr *,
+// nni_tls_config *);
+
+// Client stuff.
+
+typedef struct nni_http_client nni_http_client;
+
+extern int nni_http_client_init(nni_http_client **, nng_sockaddr *);
+extern void nni_http_client_fini(nni_http_client *);
+extern void nni_http_client_connect(nni_http_client *, nni_aio *);
+
+#endif // NNG_SUPPLEMENTAL_HTTP_HTTP_H
diff --git a/src/supplemental/http/http_msg.c b/src/supplemental/http/http_msg.c
new file mode 100644
index 00000000..2e245868
--- /dev/null
+++ b/src/supplemental/http/http_msg.c
@@ -0,0 +1,956 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+// Note that as we parse headers, the rule is that if a header is already
+// present, then we can append it to the existing header, separated by
+// a comma. From experience, for example, Firefox uses a Connection:
+// header with two values, "keepalive", and "upgrade".
+typedef struct http_header {
+ char * name;
+ char * value;
+ nni_list_node node;
+} http_header;
+
+struct nni_http_entity {
+ char * data;
+ size_t size; // allocated/expected size
+ size_t len; // current length
+ bool own; // if true, data is "ours", and should be freed
+};
+
+struct nni_http_req {
+ nni_list hdrs;
+ nni_http_entity data;
+ char * meth;
+ char * uri;
+ char * vers;
+ char * buf;
+ size_t bufsz;
+};
+
+struct nni_http_res {
+ nni_list hdrs;
+ nni_http_entity data;
+ int code;
+ char * rsn;
+ char * vers;
+ char * buf;
+ size_t bufsz;
+};
+
+static int
+http_set_string(char **strp, const char *val)
+{
+ char *news;
+ if ((news = nni_strdup(val)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_strfree(*strp);
+ *strp = news;
+ return (0);
+}
+
+static void
+http_headers_reset(nni_list *hdrs)
+{
+ http_header *h;
+ while ((h = nni_list_first(hdrs)) != NULL) {
+ nni_list_remove(hdrs, h);
+ if (h->name != NULL) {
+ nni_strfree(h->name);
+ }
+ if (h->value != NULL) {
+ nni_free(h->value, strlen(h->value) + 1);
+ }
+ NNI_FREE_STRUCT(h);
+ }
+}
+
+static void
+http_entity_reset(nni_http_entity *entity)
+{
+ if (entity->own && entity->size) {
+ nni_free(entity->data, entity->size);
+ }
+ entity->data = NULL;
+ entity->size = 0;
+ entity->own = false;
+}
+
+void
+nni_http_req_reset(nni_http_req *req)
+{
+ http_headers_reset(&req->hdrs);
+ http_entity_reset(&req->data);
+ nni_strfree(req->vers);
+ nni_strfree(req->meth);
+ nni_strfree(req->uri);
+ req->vers = req->meth = req->uri = NULL;
+ if (req->bufsz) {
+ req->buf[0] = '\0';
+ }
+}
+
+void
+nni_http_res_reset(nni_http_res *res)
+{
+ http_headers_reset(&res->hdrs);
+ http_entity_reset(&res->data);
+ nni_strfree(res->rsn);
+ nni_strfree(res->vers);
+ res->code = 0;
+ if (res->bufsz) {
+ res->buf[0] = '\0';
+ }
+}
+
+void
+nni_http_req_fini(nni_http_req *req)
+{
+ nni_http_req_reset(req);
+ if (req->bufsz) {
+ nni_free(req->buf, req->bufsz);
+ }
+ NNI_FREE_STRUCT(req);
+}
+
+void
+nni_http_res_fini(nni_http_res *res)
+{
+ nni_http_res_reset(res);
+ if (res->bufsz) {
+ nni_free(res->buf, res->bufsz);
+ }
+ NNI_FREE_STRUCT(res);
+}
+
+static int
+http_del_header(nni_list *hdrs, const char *key)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ nni_list_remove(hdrs, h);
+ nni_strfree(h->name);
+ nni_free(h->value, strlen(h->value) + 1);
+ NNI_FREE_STRUCT(h);
+ return (0);
+ }
+ }
+ return (NNG_ENOENT);
+}
+
+int
+nni_req_del_header(nni_http_req *req, const char *key)
+{
+ return (http_del_header(&req->hdrs, key));
+}
+
+int
+nni_res_del_header(nni_http_res *res, const char *key)
+{
+ return (http_del_header(&res->hdrs, key));
+}
+
+static int
+http_set_header(nni_list *hdrs, const char *key, const char *val)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ char * news;
+ size_t len = strlen(val) + 1;
+ if ((news = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ snprintf(news, len, "%s", val);
+ nni_free(h->value, strlen(h->value) + 1);
+ h->value = news;
+ return (0);
+ }
+ }
+
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((h->name = nni_strdup(key)) == NULL) {
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) {
+ nni_strfree(h->name);
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ strncpy(h->value, val, strlen(val) + 1);
+ nni_list_append(hdrs, h);
+ return (0);
+}
+
+int
+nni_http_req_set_header(nni_http_req *req, const char *key, const char *val)
+{
+ return (http_set_header(&req->hdrs, key, val));
+}
+
+int
+nni_http_res_set_header(nni_http_res *res, const char *key, const char *val)
+{
+ return (http_set_header(&res->hdrs, key, val));
+}
+
+static int
+http_add_header(nni_list *hdrs, const char *key, const char *val)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(key, h->name) == 0) {
+ char * news;
+ size_t len = strlen(h->value) + strlen(val) + 3;
+ if ((news = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ snprintf(news, len, "%s, %s", h->value, val);
+ nni_free(h->value, strlen(h->value) + 1);
+ h->value = news;
+ return (0);
+ }
+ }
+
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((h->name = nni_strdup(key)) == NULL) {
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) {
+ nni_strfree(h->name);
+ NNI_FREE_STRUCT(h);
+ return (NNG_ENOMEM);
+ }
+ strncpy(h->value, val, strlen(val) + 1);
+ nni_list_append(hdrs, h);
+ return (0);
+}
+
+int
+nni_http_req_add_header(nni_http_req *req, const char *key, const char *val)
+{
+ return (http_add_header(&req->hdrs, key, val));
+}
+
+int
+nni_http_res_add_header(nni_http_res *res, const char *key, const char *val)
+{
+ return (http_add_header(&res->hdrs, key, val));
+}
+
+static const char *
+http_get_header(nni_list *hdrs, const char *key)
+{
+ http_header *h;
+ NNI_LIST_FOREACH (hdrs, h) {
+ if (strcasecmp(h->name, key) == 0) {
+ return (h->value);
+ }
+ }
+ return (NULL);
+}
+
+const char *
+nni_http_req_get_header(nni_http_req *req, const char *key)
+{
+ return (http_get_header(&req->hdrs, key));
+}
+
+const char *
+nni_http_res_get_header(nni_http_res *res, const char *key)
+{
+ return (http_get_header(&res->hdrs, key));
+}
+
+// http_entity_set_data sets the entity, but does not update the
+// content-length header.
+static void
+http_entity_set_data(nni_http_entity *entity, const void *data, size_t size)
+{
+ if (entity->own) {
+ nni_free(entity->data, entity->size);
+ }
+ entity->data = (void *) data;
+ entity->size = size;
+ entity->own = false;
+}
+
+static int
+http_entity_alloc_data(nni_http_entity *entity, size_t size)
+{
+ void *newdata;
+ if ((newdata = nni_alloc(size)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ http_entity_set_data(entity, newdata, size);
+ entity->own = true;
+ return (0);
+}
+
+static int
+http_entity_copy_data(nni_http_entity *entity, const void *data, size_t size)
+{
+ int rv;
+ if ((rv = http_entity_alloc_data(entity, size)) == 0) {
+ memcpy(entity->data, data, size);
+ }
+ return (rv);
+}
+
+static int
+http_set_content_length(nni_http_entity *entity, nni_list *hdrs)
+{
+ char buf[16];
+ (void) snprintf(buf, sizeof(buf), "%u", (unsigned) entity->size);
+ return (http_set_header(hdrs, "Content-Length", buf));
+}
+
+static void
+http_entity_get_data(nni_http_entity *entity, void **datap, size_t *sizep)
+{
+ *datap = entity->data;
+ *sizep = entity->size;
+}
+
+void
+nni_http_req_get_data(nni_http_req *req, void **datap, size_t *sizep)
+{
+ http_entity_get_data(&req->data, datap, sizep);
+}
+
+void
+nni_http_res_get_data(nni_http_res *res, void **datap, size_t *sizep)
+{
+ http_entity_get_data(&res->data, datap, sizep);
+}
+
+int
+nni_http_req_set_data(nni_http_req *req, const void *data, size_t size)
+{
+ int rv;
+
+ http_entity_set_data(&req->data, data, size);
+ if ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0) {
+ http_entity_set_data(&req->data, NULL, 0);
+ }
+ return (rv);
+}
+
+int
+nni_http_res_set_data(nni_http_res *res, const void *data, size_t size)
+{
+ int rv;
+
+ http_entity_set_data(&res->data, data, size);
+ if ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0) {
+ http_entity_set_data(&res->data, NULL, 0);
+ }
+ return (rv);
+}
+
+int
+nni_http_req_copy_data(nni_http_req *req, const void *data, size_t size)
+{
+ int rv;
+
+ if (((rv = http_entity_copy_data(&req->data, data, size)) != 0) ||
+ ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0)) {
+ http_entity_set_data(&req->data, NULL, 0);
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_http_res_copy_data(nni_http_res *res, const void *data, size_t size)
+{
+ int rv;
+
+ if (((rv = http_entity_copy_data(&res->data, data, size)) != 0) ||
+ ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0)) {
+ http_entity_set_data(&res->data, NULL, 0);
+ return (rv);
+ }
+ return (0);
+}
+
+int
+nni_http_res_alloc_data(nni_http_res *res, size_t size)
+{
+ return (http_entity_alloc_data(&res->data, size));
+}
+
+static int
+http_parse_header(nni_list *hdrs, void *line)
+{
+ http_header *h;
+ char * key = line;
+ char * val;
+ char * end;
+
+ // Find separation between key and value
+ if ((val = strchr(key, ':')) == NULL) {
+ return (NNG_EPROTO);
+ }
+
+ // Trim leading and trailing whitespace from header
+ *val = '\0';
+ val++;
+ while (*val == ' ' || *val == '\t') {
+ val++;
+ }
+ end = val + strlen(val);
+ end--;
+ while ((end > val) && (*end == ' ' || *end == '\t')) {
+ *end = '\0';
+ end--;
+ }
+
+ return (http_add_header(hdrs, key, val));
+}
+
+// http_sprintf_headers makes headers for an HTTP request or an HTTP response
+// object. Each header is dumped from the list. If the buf is NULL,
+// or the sz is 0, then a dryrun is done, in order to allow the caller to
+// determine how much space is needed. Returns the size of the space needed,
+// not including the terminating NULL byte. Truncation occurs if the size
+// returned is >= the requested size.
+static size_t
+http_sprintf_headers(char *buf, size_t sz, nni_list *list)
+{
+ size_t rv = 0;
+ http_header *h;
+
+ if (buf == NULL) {
+ sz = 0;
+ }
+
+ NNI_LIST_FOREACH (list, h) {
+ size_t l;
+ l = snprintf(buf, sz, "%s: %s\r\n", h->name, h->value);
+ if (buf != NULL) {
+ buf += l;
+ }
+ sz = (sz > l) ? sz - l : 0;
+ rv += l;
+ }
+ return (rv);
+}
+
+static int
+http_asprintf(char **bufp, size_t *szp, nni_list *hdrs, const char *fmt, ...)
+{
+ va_list ap;
+ size_t len;
+ size_t n;
+ char * buf;
+
+ va_start(ap, fmt);
+ len = vsnprintf(NULL, 0, fmt, ap);
+ va_end(ap);
+
+ len += http_sprintf_headers(NULL, 0, hdrs);
+ len += 5; // \r\n\r\n\0
+
+ if (len <= *szp) {
+ buf = *bufp;
+ } else {
+ if ((buf = nni_alloc(len)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_free(*bufp, *szp);
+ *bufp = buf;
+ *szp = len;
+ }
+ va_start(ap, fmt);
+ n = vsnprintf(buf, len, fmt, ap);
+ va_end(ap);
+ buf += n;
+ len -= n;
+ n = http_sprintf_headers(buf, len, hdrs);
+ buf += n;
+ len -= n;
+ snprintf(buf, len, "\r\n");
+ return (0);
+}
+
+static int
+http_req_prepare(nni_http_req *req)
+{
+ int rv;
+ if ((req->uri == NULL) || (req->meth == NULL)) {
+ return (NNG_EINVAL);
+ }
+ rv = http_asprintf(&req->buf, &req->bufsz, &req->hdrs, "%s %s %s\r\n",
+ req->meth, req->uri, req->vers != NULL ? req->vers : "HTTP/1.1");
+ return (rv);
+}
+
+static int
+http_res_prepare(nni_http_res *res)
+{
+ int rv;
+ rv = http_asprintf(&res->buf, &res->bufsz, &res->hdrs, "%s %d %s\r\n",
+ res->vers != NULL ? res->vers : "HTTP/1.1", res->code,
+ res->rsn != NULL ? res->rsn : "Unknown Error");
+ return (rv);
+}
+
+int
+nni_http_req_get_buf(nni_http_req *req, void **data, size_t *szp)
+{
+ int rv;
+
+ if ((req->buf == NULL) && (rv = http_req_prepare(req)) != 0) {
+ return (rv);
+ }
+ *data = req->buf;
+ *szp = strlen(req->buf);
+ return (0);
+}
+
+int
+nni_http_res_get_buf(nni_http_res *res, void **data, size_t *szp)
+{
+ int rv;
+
+ if ((res->buf == NULL) && (rv = http_res_prepare(res)) != 0) {
+ return (rv);
+ }
+ *data = res->buf;
+ *szp = strlen(res->buf);
+ return (0);
+}
+
+int
+nni_http_req_init(nni_http_req **reqp)
+{
+ nni_http_req *req;
+ if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&req->hdrs, http_header, node);
+ req->buf = NULL;
+ req->bufsz = 0;
+ req->data.data = NULL;
+ req->data.size = 0;
+ req->data.own = false;
+ req->vers = NULL;
+ req->meth = NULL;
+ req->uri = NULL;
+ *reqp = req;
+ return (0);
+}
+
+int
+nni_http_res_init(nni_http_res **resp)
+{
+ nni_http_res *res;
+ if ((res = NNI_ALLOC_STRUCT(res)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&res->hdrs, http_header, node);
+ res->buf = NULL;
+ res->bufsz = 0;
+ res->data.data = NULL;
+ res->data.size = 0;
+ res->data.own = false;
+ res->vers = NULL;
+ res->rsn = NULL;
+ res->code = 0;
+ *resp = res;
+ return (0);
+}
+
+const char *
+nni_http_req_get_method(nni_http_req *req)
+{
+ return (req->meth);
+}
+
+const char *
+nni_http_req_get_uri(nni_http_req *req)
+{
+ return (req->uri);
+}
+
+const char *
+nni_http_req_get_version(nni_http_req *req)
+{
+ return (req->vers);
+}
+
+const char *
+nni_http_res_get_version(nni_http_res *res)
+{
+ return (res->vers);
+}
+
+int
+nni_http_req_set_version(nni_http_req *req, const char *vers)
+{
+ return (http_set_string(&req->vers, vers));
+}
+
+int
+nni_http_res_set_version(nni_http_res *res, const char *vers)
+{
+ return (http_set_string(&res->vers, vers));
+}
+
+int
+nni_http_req_set_uri(nni_http_req *req, const char *uri)
+{
+ return (http_set_string(&req->uri, uri));
+}
+
+int
+nni_http_req_set_method(nni_http_req *req, const char *meth)
+{
+ return (http_set_string(&req->meth, meth));
+}
+
+int
+nni_http_res_set_status(nni_http_res *res, int status, const char *reason)
+{
+ int rv;
+ if ((rv = http_set_string(&res->rsn, reason)) == 0) {
+ res->code = status;
+ }
+ return (rv);
+}
+
+int
+nni_http_res_get_status(nni_http_res *res)
+{
+ return (res->code);
+}
+
+const char *
+nni_http_res_get_reason(nni_http_res *res)
+{
+ return (res->rsn);
+}
+
+static int
+http_scan_line(void *vbuf, size_t n, size_t *lenp)
+{
+ size_t len;
+ char lc;
+ char * buf = vbuf;
+
+ lc = 0;
+ for (len = 0; len < n; len++) {
+ char c = buf[len];
+ if (c == '\n') {
+ // Technically we should be receiving CRLF, but
+ // debugging is easier with just LF, so we behave
+ // following Postel's Law.
+ if (lc != '\r') {
+ buf[len] = '\0';
+ } else {
+ buf[len - 1] = '\0';
+ }
+ *lenp = len + 1;
+ return (0);
+ }
+ // If we have a control character (other than CR), or a CR
+ // followed by anything other than LF, then its an error.
+ if (((c < ' ') && (c != '\r')) || (lc == '\r')) {
+ return (NNG_EPROTO);
+ }
+ lc = c;
+ }
+ // Scanned the entire content, but did not find a line.
+ return (NNG_EAGAIN);
+}
+
+static int
+http_req_parse_line(nni_http_req *req, void *line)
+{
+ int rv;
+ char *method;
+ char *uri;
+ char *version;
+
+ method = line;
+ if ((uri = strchr(method, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *uri = '\0';
+ uri++;
+
+ if ((version = strchr(uri, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *version = '\0';
+ version++;
+
+ if (((rv = nni_http_req_set_method(req, method)) != 0) ||
+ ((rv = nni_http_req_set_uri(req, uri)) != 0) ||
+ ((rv = nni_http_req_set_version(req, version)) != 0)) {
+ return (rv);
+ }
+ return (0);
+}
+
+static int
+http_res_parse_line(nni_http_res *res, uint8_t *line)
+{
+ int rv;
+ char *reason;
+ char *codestr;
+ char *version;
+ int status;
+
+ version = (char *) line;
+ if ((codestr = strchr(version, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *codestr = '\0';
+ codestr++;
+
+ if ((reason = strchr(codestr, ' ')) == NULL) {
+ return (NNG_EPROTO);
+ }
+ *reason = '\0';
+ reason++;
+
+ status = atoi(codestr);
+ if ((status < 100) || (status > 999)) {
+ return (NNG_EPROTO);
+ }
+
+ if (((rv = nni_http_res_set_status(res, status, reason)) != 0) ||
+ ((rv = nni_http_res_set_version(res, version)) != 0)) {
+ return (rv);
+ }
+ return (0);
+}
+
+// nni_http_req_parse parses a request (but not any attached entity data).
+// The amount of data consumed is returned in lenp. Returns zero on
+// success, NNG_EPROTO on parse failure, NNG_EAGAIN if more data is
+// required, or NNG_ENOMEM on memory exhaustion. Note that lenp may
+// be updated even in the face of errors (esp. NNG_EAGAIN, which is
+// not an error so much as a request for more data.)
+int
+nni_http_req_parse(nni_http_req *req, void *buf, size_t n, size_t *lenp)
+{
+
+ size_t len = 0;
+ size_t cnt;
+ int rv = 0;
+
+ for (;;) {
+ uint8_t *line;
+ if ((rv = http_scan_line(buf, n, &cnt)) != 0) {
+ break;
+ }
+
+ len += cnt;
+ line = buf;
+ buf = line + cnt;
+ n -= cnt;
+
+ if (*line == '\0') {
+ break;
+ }
+
+ if (req->vers != NULL) {
+ rv = http_parse_header(&req->hdrs, line);
+ } else {
+ rv = http_req_parse_line(req, line);
+ }
+
+ if (rv != 0) {
+ break;
+ }
+ }
+
+ *lenp = len;
+ return (rv);
+}
+
+int
+nni_http_res_parse(nni_http_res *res, void *buf, size_t n, size_t *lenp)
+{
+
+ size_t len = 0;
+ size_t cnt;
+ int rv = 0;
+ for (;;) {
+ uint8_t *line;
+ if ((rv = http_scan_line(buf, n, &cnt)) != 0) {
+ break;
+ }
+
+ len += cnt;
+ line = buf;
+ buf = line + cnt;
+ n -= cnt;
+
+ if (*line == '\0') {
+ break;
+ }
+
+ if (res->vers != NULL) {
+ rv = http_parse_header(&res->hdrs, line);
+ } else {
+ rv = http_res_parse_line(res, line);
+ }
+
+ if (rv != 0) {
+ break;
+ }
+ }
+
+ *lenp = len;
+ return (rv);
+}
+
+int
+nni_http_res_init_error(nni_http_res **resp, uint16_t err)
+{
+ char * rsn;
+ char rsnbuf[80];
+ char html[1024];
+ nni_http_res *res;
+
+ if ((nni_http_res_init(&res)) != 0) {
+ return (NNG_ENOMEM);
+ }
+
+ // Note that it is expected that redirect URIs will update the
+ // payload to reflect the target location.
+ switch (err) {
+ case NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY:
+ rsn = "Moved Permanently";
+ break;
+ case NNI_HTTP_STATUS_MULTIPLE_CHOICES:
+ rsn = "Multiple Choices";
+ break;
+ case NNI_HTTP_STATUS_FOUND:
+ rsn = "Found";
+ break;
+ case NNI_HTTP_STATUS_SEE_OTHER:
+ rsn = "See Other";
+ break;
+ case NNI_HTTP_STATUS_TEMPORARY_REDIRECT:
+ rsn = "Temporary Redirect";
+ break;
+ case NNI_HTTP_STATUS_BAD_REQUEST:
+ rsn = "Bad Request";
+ break;
+ case NNI_HTTP_STATUS_UNAUTHORIZED:
+ rsn = "Unauthorized";
+ break;
+ case NNI_HTTP_STATUS_PAYMENT_REQUIRED:
+ rsn = "Payment Required";
+ break;
+ case NNI_HTTP_STATUS_NOT_FOUND:
+ rsn = "Not Found";
+ break;
+ case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ // Caller must also supply an Allow: header
+ rsn = "Method Not Allowed";
+ break;
+ case NNI_HTTP_STATUS_NOT_ACCEPTABLE:
+ rsn = "Not Acceptable";
+ break;
+ case NNI_HTTP_STATUS_REQUEST_TIMEOUT:
+ rsn = "Request Timeout";
+ break;
+ case NNI_HTTP_STATUS_CONFLICT:
+ rsn = "Conflict";
+ break;
+ case NNI_HTTP_STATUS_GONE:
+ rsn = "Gone";
+ break;
+ case NNI_HTTP_STATUS_LENGTH_REQUIRED:
+ rsn = "Length Required";
+ break;
+ case NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE:
+ rsn = "Payload Too Large";
+ break;
+ case NNI_HTTP_STATUS_FORBIDDEN:
+ rsn = "Forbidden";
+ break;
+ case NNI_HTTP_STATUS_URI_TOO_LONG:
+ rsn = "URI Too Long";
+ break;
+ case NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE:
+ rsn = "Unsupported Media Type";
+ break;
+ case NNI_HTTP_STATUS_EXPECTATION_FAILED:
+ rsn = "Expectation Failed";
+ break;
+ case NNI_HTTP_STATUS_UPGRADE_REQUIRED:
+ // Caller must add "Upgrade:" header.
+ rsn = "Upgrade Required";
+ break;
+ case NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR:
+ rsn = "Internal Server Error";
+ break;
+ case NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP:
+ rsn = "HTTP version not supported";
+ break;
+ case NNI_HTTP_STATUS_NOT_IMPLEMENTED:
+ rsn = "Not Implemented";
+ break;
+ case NNI_HTTP_STATUS_SERVICE_UNAVAILABLE:
+ rsn = "Service Unavailable";
+ break;
+ default:
+ snprintf(rsnbuf, sizeof(rsnbuf), "HTTP error code %d", err);
+ rsn = rsnbuf;
+ break;
+ }
+
+ // very simple builtin error page
+ snprintf(html, sizeof(html),
+ "<head><title>%d %s</title></head>"
+ "<body><p/><h1 align=\"center\">"
+ "<span style=\"font-size: 36px; border-radius: 5px; "
+ "background-color: black; color: white; padding: 7px; "
+ "font-family: Arial, sans serif;\">%d</span></h1>"
+ "<p align=\"center\">"
+ "<span style=\"font-size: 24px; font-family: Arial, sans serif;\">"
+ "%s</span></p></body>",
+ err, rsn, err, rsn);
+
+ nni_http_res_set_status(res, err, rsn);
+ nni_http_res_copy_data(res, html, strlen(html));
+ nni_http_res_set_version(res, "HTTP/1.1");
+ nni_http_res_set_header(
+ res, "Content-Type", "text/html; charset=UTF-8");
+ // We could set the date, but we don't necessarily have a portable
+ // way to get the time of day.
+
+ *resp = res;
+ return (0);
+}
diff --git a/src/supplemental/http/server.c b/src/supplemental/http/server.c
new file mode 100644
index 00000000..15b04f80
--- /dev/null
+++ b/src/supplemental/http/server.c
@@ -0,0 +1,1103 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "http.h"
+
+static int http_server_sys_init(void);
+static void http_server_sys_fini(void);
+
+static nni_initializer http_server_initializer = {
+ .i_init = http_server_sys_init,
+ .i_fini = http_server_sys_fini,
+ .i_once = 0,
+};
+
+typedef struct http_handler {
+ nni_list_node node;
+ void * h_arg;
+ char * h_path;
+ char * h_method;
+ char * h_host;
+ bool h_is_upgrader;
+ bool h_is_dir;
+ void (*h_cb)(nni_aio *);
+ void (*h_free)(void *);
+} http_handler;
+
+typedef struct http_sconn {
+ nni_list_node node;
+ nni_http * http;
+ nni_http_server *server;
+ nni_http_req * req;
+ nni_http_res * res;
+ bool close;
+ bool closed;
+ bool finished;
+ nni_aio * cbaio;
+ nni_aio * rxaio;
+ nni_aio * txaio;
+ nni_aio * txdataio;
+ nni_http_tran tran;
+} http_sconn;
+
+struct nni_http_server {
+ nng_sockaddr addr;
+ nni_list_node node;
+ int refcnt;
+ int starts;
+ nni_list handlers;
+ nni_list conns;
+ nni_list reaps;
+ nni_mtx mtx;
+ nni_cv cv;
+ bool closed;
+ bool tls;
+ nni_task cleanup;
+ nni_aio * accaio;
+ nni_plat_tcp_ep *tep;
+};
+
+static nni_list http_servers;
+static nni_mtx http_servers_lk;
+
+static void
+http_sconn_fini(void *arg)
+{
+ http_sconn *sc = arg;
+ NNI_ASSERT(!sc->finished);
+ sc->finished = true;
+ nni_aio_stop(sc->rxaio);
+ nni_aio_stop(sc->txaio);
+ nni_aio_stop(sc->txdataio);
+ nni_aio_stop(sc->cbaio);
+ if (sc->http != NULL) {
+ nni_http_fini(sc->http);
+ }
+ if (sc->req != NULL) {
+ nni_http_req_fini(sc->req);
+ }
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ }
+ nni_aio_fini(sc->rxaio);
+ nni_aio_fini(sc->txaio);
+ nni_aio_fini(sc->txdataio);
+ nni_aio_fini(sc->cbaio);
+ NNI_FREE_STRUCT(sc);
+}
+
+static void
+http_sconn_close(http_sconn *sc)
+{
+ nni_http_server *s;
+ s = sc->server;
+
+ NNI_ASSERT(!sc->finished);
+ nni_mtx_lock(&s->mtx);
+ if (!sc->closed) {
+ nni_http *h;
+ sc->closed = true;
+ // Close the underlying transport.
+ if ((h = sc->http) != NULL) {
+ nni_http_close(h);
+ }
+ if (nni_list_node_active(&sc->node)) {
+ nni_list_remove(&s->conns, sc);
+ nni_list_append(&s->reaps, sc);
+ }
+ nni_task_dispatch(&s->cleanup);
+ } else {
+ nni_panic("DOUBLE CLOSE!\n");
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+http_sconn_txdatdone(void *arg)
+{
+ http_sconn *sc = arg;
+ nni_aio * aio = sc->txdataio;
+
+ if (nni_aio_result(aio) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ sc->res = NULL;
+ }
+
+ if (sc->close) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+}
+
+static void
+http_sconn_txdone(void *arg)
+{
+ http_sconn *sc = arg;
+ nni_aio * aio = sc->txaio;
+ int rv;
+ void * data;
+ size_t size;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ // For HEAD requests, we just treat like "GET" but don't send
+ // the data. (Required per HTTP.)
+ if (strcmp(nni_http_req_get_method(sc->req), "HEAD") == 0) {
+ size = 0;
+ } else {
+ nni_http_res_get_data(sc->res, &data, &size);
+ }
+ if (size) {
+ // Submit data.
+ sc->txdataio->a_niov = 1;
+ sc->txdataio->a_iov[0].iov_buf = data;
+ sc->txdataio->a_iov[0].iov_len = size;
+ nni_http_write_full(sc->http, sc->txdataio);
+ return;
+ }
+
+ if (sc->close) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ if (sc->res != NULL) {
+ nni_http_res_fini(sc->res);
+ sc->res = NULL;
+ }
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+}
+
+static char
+http_hexval(char c)
+{
+ if ((c >= '0') && (c <= '9')) {
+ return (c - '0');
+ }
+ if ((c >= 'a') && (c <= 'f')) {
+ return ((c - 'a') + 10);
+ }
+ if ((c >= 'A') && (c <= 'F')) {
+ return ((c - 'A') + 10);
+ }
+ return (0);
+}
+
+static char *
+http_uri_canonify(char *path)
+{
+ char *tmp;
+ char *dst;
+
+ // Chomp off query string.
+ if ((tmp = strchr(path, '?')) != NULL) {
+ *tmp = '\0';
+ }
+ // If the URI was absolute, make it relative.
+ if ((strncasecmp(path, "http://", strlen("http://")) == 0) ||
+ (strncasecmp(path, "https://", strlen("https://")) == 0)) {
+ // Skip past the ://
+ path = strchr(path, ':');
+ path += 3;
+
+ // scan for the end of the host, distinguished by a /
+ // path delimiter. There might not be one, in which case
+ // the whole thing is the host and we assume the path is
+ // just /.
+ if ((path = strchr(path, '/')) == NULL) {
+ return ("/");
+ }
+ }
+
+ // Now we have to unescape things. Unescaping is a shrinking
+ // operation (strictly), so this is safe. This is just URL decode.
+ // Note that paths with an embedded NUL are going to be treated as
+ // though truncated. Don't be that guy that sends %00 in a URL.
+ tmp = path;
+ dst = path;
+ while (*tmp != '\0') {
+ char c;
+ if ((c = *tmp) != '%') {
+ *dst++ = c;
+ tmp++;
+ continue;
+ }
+ if (isxdigit(tmp[1]) && isxdigit(tmp[2])) {
+ c = http_hexval(tmp[1]);
+ c *= 16;
+ c += http_hexval(tmp[2]);
+ *dst++ = c;
+ tmp += 3;
+ }
+ // garbage in, garbage out
+ *dst++ = c;
+ tmp++;
+ }
+ *dst = '\0';
+ return (path);
+}
+
+static void
+http_sconn_error(http_sconn *sc, uint16_t err)
+{
+ nni_http_res *res;
+
+ if (nni_http_res_init_error(&res, err) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ sc->res = res;
+ nni_http_write_res(sc->http, res, sc->txaio);
+}
+
+static void
+http_sconn_rxdone(void *arg)
+{
+ http_sconn * sc = arg;
+ nni_http_server *s = sc->server;
+ nni_aio * aio = sc->rxaio;
+ int rv;
+ http_handler * h;
+ const char * val;
+ nni_http_req * req = sc->req;
+ char * uri;
+ size_t urisz;
+ char * path;
+ char * tmp;
+ bool badmeth = false;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ http_sconn_close(sc);
+ return;
+ }
+
+ // Validate the request -- it has to at least look like HTTP 1.x
+ // We flatly refuse to deal with HTTP 0.9, and we can't cope with
+ // HTTP/2.
+ if ((val = nni_http_req_get_version(req)) == NULL) {
+ sc->close = true;
+ http_sconn_error(sc, NNI_HTTP_STATUS_BAD_REQUEST);
+ return;
+ }
+ if (strncmp(val, "HTTP/1.", 7) != 0) {
+ sc->close = true;
+ http_sconn_error(sc, NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP);
+ return;
+ }
+ if (strcmp(val, "HTTP/1.1") != 0) {
+ // We treat HTTP/1.0 connections as non-persistent.
+ // No effort is made to handle "persistent" HTTP/1.0
+ // since that was not standard. (Everyone is at 1.1 now
+ // anyways.)
+ sc->close = true;
+ }
+
+ // If the connection was 1.0, or a connection: close was requested,
+ // then mark this close on our end.
+ if ((val = nni_http_req_get_header(req, "Connection")) != NULL) {
+ // HTTP 1.1 says these have to be case insensitive (7230)
+ if (nni_strcasestr(val, "close") != NULL) {
+ // In theory this could falsely match some other weird
+ // connection header that included the word close not
+ // as part of a whole token. No such legal definitions
+ // exist, and so anyone who does that gets what they
+ // deserve. (Fairly harmless actually, since it only
+ // prevents persistent connections.)
+ sc->close = true;
+ }
+ }
+
+ val = nni_http_req_get_uri(req);
+ urisz = strlen(val) + 1;
+ if ((uri = nni_alloc(urisz)) == NULL) {
+ http_sconn_close(sc); // out of memory
+ return;
+ }
+ strncpy(uri, val, urisz);
+ path = http_uri_canonify(uri);
+
+ NNI_LIST_FOREACH (&s->handlers, h) {
+ size_t len;
+ if (h->h_host != NULL) {
+ val = nni_http_req_get_header(req, "Host");
+ if (val == NULL) {
+ // We insist on a matching Host: line for
+ // virtual hosting. This leaves HTTP/1.0
+ // out in the cold basically.
+ continue;
+ }
+
+ // A few ways hosts can match. They might have
+ // a port attached -- we ignore that. (We don't
+ // run multiple ports, so if you got here, presumably
+ // the port at least is correct!) It might also have
+ // a lone trailing dot, so that is ok too.
+
+ // Ignore the trailing dot if the handler supplied it.
+ len = strlen(h->h_host);
+ if ((len > 0) && (h->h_host[len - 1] == '.')) {
+ len--;
+ }
+ if ((nni_strncasecmp(val, h->h_host, len) != 0)) {
+ continue;
+ }
+ if ((val[len] != '\0') && (val[len] != ':') &&
+ ((val[len] != '.') || (val[len + 1] != '\0'))) {
+ continue;
+ }
+ }
+
+ NNI_ASSERT(h->h_method != NULL);
+
+ len = strlen(h->h_path);
+ if (strncmp(path, h->h_path, len) != 0) {
+ continue;
+ }
+ switch (path[len]) {
+ case '\0':
+ break;
+ case '/':
+ if ((path[len + 1] != '\0') && (!h->h_is_dir)) {
+ // trailing component and not a directory.
+ // Note that this should force a failure.
+ continue;
+ }
+ break;
+ default:
+ continue; // some other substring, not matched.
+ }
+
+ // So, what about the method?
+ val = nni_http_req_get_method(req);
+ if (strcmp(val, h->h_method) == 0) {
+ break;
+ }
+ // HEAD is remapped to GET.
+ if ((strcmp(val, "HEAD") == 0) &&
+ (strcmp(h->h_method, "GET") == 0)) {
+ break;
+ }
+ badmeth = 1;
+ }
+
+ nni_free(uri, urisz);
+ if (h == NULL) {
+ if (badmeth) {
+ http_sconn_error(
+ sc, NNI_HTTP_STATUS_METHOD_NOT_ALLOWED);
+ } else {
+ http_sconn_error(sc, NNI_HTTP_STATUS_NOT_FOUND);
+ }
+ return;
+ }
+
+ nni_aio_set_input(sc->cbaio, 0, sc->http);
+ nni_aio_set_input(sc->cbaio, 1, sc->req);
+ nni_aio_set_input(sc->cbaio, 2, h->h_arg);
+ nni_aio_set_data(sc->cbaio, 1, h);
+
+ // Technically, probably callback should initialize this with
+ // start, but we do it instead.
+
+ if (nni_aio_start(sc->cbaio, NULL, NULL) == 0) {
+ h->h_cb(sc->cbaio);
+ }
+}
+
+static void
+http_sconn_cbdone(void *arg)
+{
+ http_sconn * sc = arg;
+ nni_aio * aio = sc->cbaio;
+ nni_http_res *res;
+ http_handler *h;
+
+ if (nni_aio_result(aio) != 0) {
+ // Hard close, no further feedback.
+ http_sconn_close(sc);
+ return;
+ }
+
+ h = nni_aio_get_data(aio, 1);
+ res = nni_aio_get_output(aio, 0);
+
+ // If its an upgrader, and they didn't give us back a response, it
+ // means that they took over, and we should just discard this session,
+ // without closing the underlying channel.
+ if ((h->h_is_upgrader) && (res == NULL)) {
+ sc->http = NULL; // the underlying HTTP is not closed
+ sc->req = NULL;
+ sc->res = NULL;
+ http_sconn_close(sc); // discard server session though
+ return;
+ }
+ if (res != NULL) {
+
+ const char *val;
+ val = nni_http_res_get_header(res, "Connection");
+ if ((val != NULL) && (strstr(val, "close") != NULL)) {
+ sc->close = true;
+ }
+ if (sc->close) {
+ nni_http_res_set_header(res, "Connection", "close");
+ }
+ sc->res = res;
+ nni_http_write_res(sc->http, res, sc->txaio);
+ } else if (sc->close) {
+ http_sconn_close(sc);
+ } else {
+ // Presumably client already sent a response.
+ // Wait for another request.
+ nni_http_req_reset(sc->req);
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+ }
+}
+
+static int
+http_sconn_init(http_sconn **scp, nni_plat_tcp_pipe *tcp)
+{
+ http_sconn *sc;
+ int rv;
+
+ if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_http_req_init(&sc->req)) != 0) ||
+ ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) ||
+ ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) ||
+ ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) !=
+ 0) ||
+ ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) {
+ // Can't even accept the incoming request. Hard close.
+ http_sconn_close(sc);
+ return (rv);
+ }
+ // XXX: for HTTPS we would then try to do the TLS negotiation here.
+ // That would use a different set of tran values.
+
+ sc->tran.h_data = tcp;
+ sc->tran.h_read = (void *) nni_plat_tcp_pipe_recv;
+ sc->tran.h_write = (void *) nni_plat_tcp_pipe_send;
+ sc->tran.h_close = (void *) nni_plat_tcp_pipe_close; // close implied
+ sc->tran.h_fini = (void *) nni_plat_tcp_pipe_fini;
+
+ if ((rv = nni_http_init(&sc->http, &sc->tran)) != 0) {
+ http_sconn_close(sc);
+ return (rv);
+ }
+ *scp = sc;
+ return (0);
+}
+
+static void
+http_server_acccb(void *arg)
+{
+ nni_http_server * s = arg;
+ nni_aio * aio = s->accaio;
+ nni_plat_tcp_pipe *tcp;
+ http_sconn * sc;
+ int rv;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ if (rv == NNG_ECLOSED) {
+ return;
+ }
+ // try again?
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return;
+ }
+ tcp = nni_aio_get_pipe(aio);
+ if (http_sconn_init(&sc, tcp) != 0) {
+ nni_plat_tcp_pipe_close(tcp);
+ nni_plat_tcp_pipe_fini(tcp);
+
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return;
+ }
+ nni_mtx_lock(&s->mtx);
+ sc->server = s;
+ if (s->closed) {
+ nni_http_close(sc->http);
+ sc->closed = true;
+ nni_list_append(&s->reaps, sc);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_append(&s->conns, sc);
+ nni_mtx_unlock(&s->mtx);
+
+ nni_http_read_req(sc->http, sc->req, sc->rxaio);
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+}
+
+static void
+http_server_cleanup(void *arg)
+{
+ nni_http_server *s = arg;
+ http_sconn * sc;
+ nni_mtx_lock(&s->mtx);
+ while ((sc = nni_list_first(&s->reaps)) != NULL) {
+ nni_list_remove(&s->reaps, sc);
+ nni_mtx_unlock(&s->mtx);
+ http_sconn_fini(sc);
+ nni_mtx_lock(&s->mtx);
+ }
+ if (nni_list_empty(&s->reaps) && nni_list_empty(&s->conns)) {
+ nni_cv_wake(&s->cv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+http_handler_fini(http_handler *h)
+{
+ nni_strfree(h->h_path);
+ nni_strfree(h->h_host);
+ nni_strfree(h->h_method);
+ if (h->h_free != NULL) {
+ h->h_free(h->h_arg);
+ }
+ NNI_FREE_STRUCT(h);
+}
+
+static void
+http_server_fini(nni_http_server *s)
+{
+ http_handler *h;
+
+ nni_mtx_lock(&s->mtx);
+ while ((!nni_list_empty(&s->conns)) || (!nni_list_empty(&s->reaps))) {
+ nni_cv_wait(&s->cv);
+ }
+ if (s->tep != NULL) {
+ nni_plat_tcp_ep_fini(s->tep);
+ }
+ while ((h = nni_list_first(&s->handlers)) != NULL) {
+ nni_list_remove(&s->handlers, h);
+ http_handler_fini(h);
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_task_wait(&s->cleanup);
+ nni_aio_fini(s->accaio);
+ nni_cv_fini(&s->cv);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+void
+nni_http_server_fini(nni_http_server *s)
+{
+ nni_mtx_lock(&http_servers_lk);
+ s->refcnt--;
+ if (s->refcnt == 0) {
+ nni_list_remove(&http_servers, s);
+ http_server_fini(s);
+ }
+ nni_mtx_unlock(&http_servers_lk);
+}
+
+static int
+http_server_init(nni_http_server **serverp, nng_sockaddr *sa)
+{
+ nni_http_server *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->mtx);
+ nni_cv_init(&s->cv, &s->mtx);
+ nni_task_init(NULL, &s->cleanup, http_server_cleanup, s);
+ NNI_LIST_INIT(&s->handlers, http_handler, node);
+ NNI_LIST_INIT(&s->conns, http_sconn, node);
+ NNI_LIST_INIT(&s->reaps, http_sconn, node);
+ if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) {
+ http_server_fini(s);
+ return (rv);
+ }
+ s->addr = *sa;
+ *serverp = s;
+ return (0);
+}
+
+int
+nni_http_server_init(nni_http_server **serverp, nng_sockaddr *sa)
+{
+ int rv;
+ nni_http_server *s;
+
+ nni_initialize(&http_server_initializer);
+
+ nni_mtx_lock(&http_servers_lk);
+ NNI_LIST_FOREACH (&http_servers, s) {
+ switch (sa->s_un.s_family) {
+ case NNG_AF_INET:
+ if (memcmp(&s->addr.s_un.s_in, &sa->s_un.s_in,
+ sizeof(sa->s_un.s_in)) == 0) {
+ *serverp = s;
+ s->refcnt++;
+ nni_mtx_unlock(&http_servers_lk);
+ return (0);
+ }
+ break;
+ case NNG_AF_INET6:
+ if (memcmp(&s->addr.s_un.s_in6, &sa->s_un.s_in6,
+ sizeof(sa->s_un.s_in6)) == 0) {
+ *serverp = s;
+ s->refcnt++;
+ nni_mtx_unlock(&http_servers_lk);
+ return (0);
+ }
+ break;
+ }
+ }
+
+ // We didn't find a server, try to make a new one.
+ if ((rv = http_server_init(&s, sa)) == 0) {
+ s->addr = *sa;
+ s->refcnt = 1;
+ nni_list_append(&http_servers, s);
+ *serverp = s;
+ }
+
+ nni_mtx_unlock(&http_servers_lk);
+ return (rv);
+}
+
+static int
+http_server_start(nni_http_server *s)
+{
+ int rv;
+
+ rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN);
+ if (rv != 0) {
+ return (rv);
+ }
+ if ((rv = nni_plat_tcp_ep_listen(s->tep)) != 0) {
+ nni_plat_tcp_ep_fini(s->tep);
+ s->tep = NULL;
+ return (rv);
+ }
+ nni_plat_tcp_ep_accept(s->tep, s->accaio);
+ return (0);
+}
+
+int
+nni_http_server_start(nni_http_server *s)
+{
+ int rv = 0;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->starts == 0) {
+ rv = http_server_start(s);
+ }
+ if (rv == 0) {
+ s->starts++;
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static void
+http_server_stop(nni_http_server *s)
+{
+ http_sconn *sc;
+
+ if (s->closed) {
+ return;
+ }
+
+ s->closed = true;
+ // Close the TCP endpoint that is listening.
+ if (s->tep) {
+ nni_plat_tcp_ep_close(s->tep);
+ }
+
+ // This marks the server as "shutting down" -- existing
+ // connections finish their activity and close.
+ //
+ // XXX: figure out how to shut down connections that are
+ // blocked waiting to receive a request. We won't do this for
+ // upgraded connections...
+ NNI_LIST_FOREACH (&s->conns, sc) {
+ sc->close = true;
+ }
+}
+
+void
+nni_http_server_stop(nni_http_server *s)
+{
+ nni_mtx_lock(&s->mtx);
+ s->starts--;
+ if (s->starts == 0) {
+ http_server_stop(s);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+int
+http_server_add_handler(void **hp, nni_http_server *s, nni_http_handler *hh,
+ void *arg, void (*freeit)(void *))
+{
+ http_handler *h, *h2;
+ size_t l1, l2;
+
+ // Must have a legal method (and not one that is HEAD), path,
+ // and handler. (The reason HEAD is verboten is that we supply
+ // it automatically as part of GET support.)
+ if ((hh->h_method == NULL) || (hh->h_path == NULL) ||
+ (hh->h_cb == NULL) || (strcmp(hh->h_method, "HEAD") == 0)) {
+ return (NNG_EINVAL);
+ }
+ if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ h->h_arg = arg;
+ h->h_cb = hh->h_cb;
+ h->h_is_dir = hh->h_is_dir;
+ h->h_is_upgrader = hh->h_is_upgrader;
+ h->h_free = freeit;
+
+ // Ignore the port part of the host.
+ if (hh->h_host != NULL) {
+ int rv;
+ rv = nni_tran_parse_host_port(hh->h_host, &h->h_host, NULL);
+ if (rv != 0) {
+ http_handler_fini(h);
+ return (rv);
+ }
+ }
+
+ if (((h->h_method = nni_strdup(hh->h_method)) == NULL) ||
+ ((h->h_path = nni_strdup(hh->h_path)) == NULL)) {
+ http_handler_fini(h);
+ return (NNG_ENOMEM);
+ }
+
+ l1 = strlen(h->h_path);
+ // Chop off trailing "/"
+ while (l1 > 0) {
+ if (h->h_path[l1 - 1] != '/') {
+ break;
+ }
+ l1--;
+ h->h_path[l1] = '\0';
+ }
+
+ nni_mtx_lock(&s->mtx);
+ // General rule for finding a conflict is that if either string
+ // is a strict substring of the other, then we have a
+ // collision. (But only if the methods match, and the host
+ // matches.) Note that a wild card host matches both.
+ NNI_LIST_FOREACH (&s->handlers, h2) {
+ if ((h2->h_host != NULL) && (h->h_host != NULL) &&
+ (strcasecmp(h2->h_host, h->h_host) != 0)) {
+ // Hosts don't match, so we are safe.
+ continue;
+ }
+ if (strcmp(h2->h_method, h->h_method) != 0) {
+ // Different methods, so again we are fine.
+ continue;
+ }
+ l2 = strlen(h2->h_path);
+ if (l1 < l2) {
+ l2 = l1;
+ }
+ if (strncmp(h2->h_path, h->h_path, l2) == 0) {
+ // Path collision. NNG_EADDRINUSE.
+ nni_mtx_unlock(&s->mtx);
+ http_handler_fini(h);
+ return (NNG_EADDRINUSE);
+ }
+ }
+ nni_list_append(&s->handlers, h);
+ nni_mtx_unlock(&s->mtx);
+ if (hp != NULL) {
+ *hp = h;
+ }
+ return (0);
+}
+
+int
+nni_http_server_add_handler(
+ void **hp, nni_http_server *s, nni_http_handler *hh, void *arg)
+{
+ return (http_server_add_handler(hp, s, hh, arg, NULL));
+}
+
+void
+nni_http_server_del_handler(nni_http_server *s, void *harg)
+{
+ http_handler *h = harg;
+
+ nni_mtx_lock(&s->mtx);
+ nni_list_node_remove(&h->node);
+ nni_mtx_unlock(&s->mtx);
+
+ http_handler_fini(h);
+}
+
+// Very limited MIME type map. Used only if the handler does not
+// supply it's own.
+static struct content_map {
+ const char *ext;
+ const char *typ;
+} content_map[] = {
+ // clang-format off
+ { ".ai", "application/postscript" },
+ { ".aif", "audio/aiff" },
+ { ".aiff", "audio/aiff" },
+ { ".avi", "video/avi" },
+ { ".au", "audio/basic" },
+ { ".bin", "application/octet-stream" },
+ { ".bmp", "image/bmp" },
+ { ".css", "text/css" },
+ { ".eps", "application/postscript" },
+ { ".gif", "image/gif" },
+ { ".htm", "text/html" },
+ { ".html", "text/html" },
+ { ".ico", "image/x-icon" },
+ { ".jpeg", "image/jpeg" },
+ { ".jpg", "image/jpeg" },
+ { ".js", "application/javascript" },
+ { ".md", "text/markdown" },
+ { ".mp2", "video/mpeg" },
+ { ".mp3", "audio/mpeg3" },
+ { ".mpeg", "video/mpeg" },
+ { ".mpg", "video/mpeg" },
+ { ".pdf", "application/pdf" },
+ { ".png", "image/png" },
+ { ".ps", "application/postscript" },
+ { ".rtf", "text/rtf" },
+ { ".text", "text/plain" },
+ { ".tif", "image/tiff" },
+ { ".tiff", "image/tiff" },
+ { ".txt", "text/plain" },
+ { ".wav", "audio/wav"},
+ { "README", "text/plain" },
+ { NULL, NULL },
+ // clang-format on
+};
+
+const char *
+http_lookup_type(const char *path)
+{
+ size_t l1 = strlen(path);
+ for (int i = 0; content_map[i].ext != NULL; i++) {
+ size_t l2 = strlen(content_map[i].ext);
+ if (l2 > l1) {
+ continue;
+ }
+ if (strcasecmp(&path[l1 - l2], content_map[i].ext) == 0) {
+ return (content_map[i].typ);
+ }
+ }
+ return (NULL);
+}
+
+typedef struct http_file {
+ char *typ;
+ char *pth;
+} http_file;
+
+static void
+http_handle_file(nni_aio *aio)
+{
+ http_file * f = nni_aio_get_input(aio, 2);
+ nni_http_res *res = NULL;
+ void * data;
+ size_t size;
+ int rv;
+
+ if ((rv = nni_plat_file_get(f->pth, &data, &size)) != 0) {
+ uint16_t status;
+ switch (rv) {
+ case NNG_ENOMEM:
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ break;
+ case NNG_ENOENT:
+ status = NNI_HTTP_STATUS_NOT_FOUND;
+ break;
+ case NNG_EPERM:
+ status = NNI_HTTP_STATUS_FORBIDDEN;
+ break;
+ default:
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ break;
+ }
+ if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ } else {
+ if (((rv = nni_http_res_init(&res)) != 0) ||
+ ((rv = nni_http_res_set_status(
+ res, NNI_HTTP_STATUS_OK, "OK")) != 0) ||
+ ((rv = nni_http_res_set_header(
+ res, "Content-Type", f->typ)) != 0) ||
+ ((rv = nni_http_res_set_data(res, data, size)) != 0)) {
+ nni_free(data, size);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ }
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+http_free_file(void *arg)
+{
+ http_file *f = arg;
+ nni_strfree(f->pth);
+ nni_strfree(f->typ);
+ NNI_FREE_STRUCT(f);
+}
+
+int
+nni_http_server_add_file(nni_http_server *s, const char *host,
+ const char *ctype, const char *uri, const char *path)
+{
+ nni_http_handler h;
+ http_file * f;
+ int rv;
+
+ if ((f = NNI_ALLOC_STRUCT(f)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (ctype == NULL) {
+ ctype = http_lookup_type(path);
+ }
+
+ if (((f->pth = nni_strdup(path)) == NULL) ||
+ ((ctype != NULL) && ((f->typ = nni_strdup(ctype)) == NULL))) {
+ http_free_file(f);
+ return (NNG_ENOMEM);
+ }
+ h.h_method = "GET";
+ h.h_path = uri;
+ h.h_host = host;
+ h.h_cb = http_handle_file;
+ h.h_is_dir = false;
+ h.h_is_upgrader = false;
+
+ if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_file)) !=
+ 0) {
+ http_free_file(f);
+ return (rv);
+ }
+ return (0);
+}
+
+typedef struct http_static {
+ char * typ;
+ void * data;
+ size_t size;
+} http_static;
+
+static void
+http_handle_static(nni_aio *aio)
+{
+ http_static * s = nni_aio_get_input(aio, 2);
+ nni_http_res *r = NULL;
+ int rv;
+
+ if (((rv = nni_http_res_init(&r)) != 0) ||
+ ((rv = nni_http_res_set_header(r, "Content-Type", s->typ)) != 0) ||
+ ((rv = nni_http_res_set_status(r, NNI_HTTP_STATUS_OK, "OK")) !=
+ 0) ||
+ ((rv = nni_http_res_set_data(r, s->data, s->size)) != 0)) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_aio_set_output(aio, 0, r);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+http_free_static(void *arg)
+{
+ http_static *s = arg;
+ nni_strfree(s->typ);
+ nni_free(s->data, s->size);
+ NNI_FREE_STRUCT(s);
+}
+
+int
+nni_http_server_add_static(nni_http_server *s, const char *host,
+ const char *ctype, const char *uri, const void *data, size_t size)
+{
+ nni_http_handler h;
+ http_static * f;
+ int rv;
+
+ if ((f = NNI_ALLOC_STRUCT(f)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (ctype == NULL) {
+ ctype = "application/octet-stream";
+ }
+ if (((f->data = nni_alloc(size)) == NULL) ||
+ ((f->typ = nni_strdup(ctype)) == NULL)) {
+ http_free_static(f);
+ return (NNG_ENOMEM);
+ }
+
+ f->size = size;
+ memcpy(f->data, data, size);
+
+ h.h_method = "GET";
+ h.h_path = uri;
+ h.h_host = host;
+ h.h_cb = http_handle_static;
+ h.h_is_dir = false;
+ h.h_is_upgrader = false;
+
+ if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_static)) !=
+ 0) {
+ http_free_static(f);
+ return (rv);
+ }
+ return (0);
+}
+
+static int
+http_server_sys_init(void)
+{
+ NNI_LIST_INIT(&http_servers, nni_http_server, node);
+ nni_mtx_init(&http_servers_lk);
+ return (0);
+}
+
+static void
+http_server_sys_fini(void)
+{
+ nni_mtx_fini(&http_servers_lk);
+} \ No newline at end of file
diff --git a/src/supplemental/mbedtls/tls.c b/src/supplemental/mbedtls/tls.c
index a33c72b4..94503df5 100644
--- a/src/supplemental/mbedtls/tls.c
+++ b/src/supplemental/mbedtls/tls.c
@@ -417,7 +417,7 @@ nni_tls_send_cb(void *ctx)
aio->a_niov = 1;
aio->a_iov[0].iov_buf = tp->sendbuf + tp->sendoff;
aio->a_iov[0].iov_len = tp->sendlen;
- nni_aio_set_timeout(aio, -1); // No timeout.
+ nni_aio_set_timeout(aio, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_send(tp->tcp, aio);
nni_mtx_unlock(&tp->lk);
return;
@@ -455,7 +455,7 @@ nni_tls_recv_start(nni_tls *tp)
aio->a_niov = 1;
aio->a_iov[0].iov_buf = tp->recvbuf;
aio->a_iov[0].iov_len = NNG_TLS_MAX_RECV_SIZE;
- nni_aio_set_timeout(tp->tcp_recv, -1); // No timeout.
+ nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_recv(tp->tcp, aio);
}
@@ -526,7 +526,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len)
tp->tcp_send->a_niov = 1;
tp->tcp_send->a_iov[0].iov_buf = tp->sendbuf;
tp->tcp_send->a_iov[0].iov_len = len;
- nni_aio_set_timeout(tp->tcp_send, -1); // No timeout.
+ nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE);
nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send);
return (len);
}
diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt
new file mode 100644
index 00000000..f3283257
--- /dev/null
+++ b/src/supplemental/websocket/CMakeLists.txt
@@ -0,0 +1,14 @@
+#
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+set(WEBSOCKET_SOURCES
+ supplemental/websocket/websocket.c
+ supplemental/websocket/websocket.h)
+set(NNG_SOURCES ${NNG_SOURCES} ${WEBSOCKET_SOURCES} PARENT_SCOPE)
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
new file mode 100644
index 00000000..fe0a9bd9
--- /dev/null
+++ b/src/supplemental/websocket/websocket.c
@@ -0,0 +1,1935 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/base64/base64.h"
+#include "supplemental/http/http.h"
+#include "supplemental/sha1/sha1.h"
+
+#include "websocket.h"
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct ws_frame ws_frame;
+typedef struct ws_msg ws_msg;
+
+struct nni_ws {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_list_node node;
+ nni_reap_item reap;
+ bool closed;
+ bool ready;
+ bool wclose;
+ nni_mtx mtx;
+ nni_list txmsgs;
+ nni_list rxmsgs;
+ ws_frame * txframe;
+ ws_frame * rxframe;
+ nni_aio * txaio; // physical aios
+ nni_aio * rxaio;
+ nni_aio * closeaio;
+ nni_aio * httpaio; // server only, HTTP reply pending
+ nni_http * http;
+ nni_http_req *req;
+ nni_http_res *res;
+ size_t maxframe;
+ size_t fragsize;
+};
+
+struct nni_ws_listener {
+ nni_tls_config * tls;
+ nni_http_server * server;
+ char * proto;
+ char * url;
+ char * host;
+ char * serv;
+ char * path;
+ nni_mtx mtx;
+ nni_list pend;
+ nni_list reply;
+ nni_list aios;
+ bool started;
+ bool closed;
+ void * hp; // handler pointer
+ nni_http_handler handler;
+ nni_ws_listen_hook hookfn;
+ void * hookarg;
+};
+
+// The dialer tracks user aios in two lists. The first list is for aios
+// waiting for the http connection to be established, while the second
+// are waiting for the HTTP negotiation to complete. We keep two lists
+// so we know whether to initiate another outgoing connection after the
+// completion of an earlier connection. (We don't want to establish
+// requests when we already have connects negotiating.)
+struct nni_ws_dialer {
+ nni_tls_config * tls;
+ nni_http_req * req;
+ nni_http_res * res;
+ nni_http_client *client;
+ nni_mtx mtx;
+ nni_aio * conaio;
+ char * proto;
+ char * host;
+ char * serv;
+ char * path;
+ char * qinfo;
+ char * addr; // full address (a URL really)
+ char * uri; // path + query
+ nni_list conaios; // user aios waiting for connect.
+ nni_list httpaios; // user aios waiting for HTTP nego.
+ bool started;
+ bool closed;
+ nng_sockaddr sa;
+};
+
+typedef enum ws_type {
+ WS_CONT = 0x0,
+ WS_TEXT = 0x1,
+ WS_BINARY = 0x2,
+ WS_CLOSE = 0x8,
+ WS_PING = 0x9,
+ WS_PONG = 0xA,
+} ws_type;
+
+typedef enum ws_reason {
+ WS_CLOSE_NORMAL_CLOSE = 1000,
+ WS_CLOSE_GOING_AWAY = 1001,
+ WS_CLOSE_PROTOCOL_ERR = 1002,
+ WS_CLOSE_UNSUPP_FORMAT = 1003,
+ WS_CLOSE_INVALID_DATA = 1007,
+ WS_CLOSE_POLICY = 1008,
+ WS_CLOSE_TOO_BIG = 1009,
+ WS_CLOSE_NO_EXTENSION = 1010,
+ WS_CLOSE_INTERNAL = 1011,
+} ws_reason;
+
+struct ws_frame {
+ nni_list_node node;
+ uint8_t head[14]; // maximum header size
+ uint8_t mask[4]; // read by server, sent by client
+ uint8_t sdata[125]; // short data (for short frames only)
+ size_t hlen; // header length
+ size_t len; // payload length
+ enum ws_type op;
+ bool final;
+ bool masked;
+ size_t bufsz; // allocated size
+ uint8_t * buf;
+ ws_msg * wmsg;
+};
+
+struct ws_msg {
+ nni_list frames;
+ nni_list_node node;
+ nni_ws * ws;
+ nni_msg * msg;
+ nni_aio * aio;
+};
+
+static void ws_send_close(nni_ws *ws, uint16_t code);
+
+// This looks, case independently for a word in a list, which is either
+// space or comma separated.
+static bool
+ws_contains_word(const char *phrase, const char *word)
+{
+ size_t len = strlen(word);
+
+ while ((phrase != NULL) && (*phrase != '\0')) {
+ if ((nni_strncasecmp(phrase, word, len) == 0) &&
+ ((phrase[len] == 0) || (phrase[len] == ' ') ||
+ (phrase[len] == ','))) {
+ return (true);
+ }
+ // Skip to next word.
+ if ((phrase = strchr(phrase, ' ')) != NULL) {
+ while ((*phrase == ' ') || (*phrase == ',')) {
+ phrase++;
+ }
+ }
+ }
+ return (false);
+}
+
+// input is base64 challenge, output is the accepted. input should be
+// 24 character base 64, output is 28 character base64 reply. (output
+// must be large enough to hold 29 bytes to allow for termination.)
+// Returns 0 on success, NNG_EINVAL if the input is malformed somehow.
+static int
+ws_make_accept(const char *key, char *accept)
+{
+ uint8_t rawkey[16];
+ uint8_t digest[20];
+ nni_sha1_ctx ctx;
+
+#define WS_KEY_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+#define WS_KEY_GUIDLEN 36
+
+ if ((strlen(key) != 24) ||
+ (nni_base64_decode(key, 24, rawkey, 16) != 16)) {
+ return (NNG_EINVAL);
+ }
+
+ nni_sha1_init(&ctx);
+ nni_sha1_update(&ctx, rawkey, 16);
+ nni_sha1_update(&ctx, (uint8_t *) WS_KEY_GUID, WS_KEY_GUIDLEN);
+ nni_sha1_final(&ctx, digest);
+
+ nni_base64_encode(digest, 20, accept, 28);
+ accept[28] = '\0';
+ return (0);
+}
+
+static void
+ws_frame_fini(ws_frame *frame)
+{
+ if (frame->bufsz) {
+ nni_free(frame->buf, frame->bufsz);
+ }
+ NNI_FREE_STRUCT(frame);
+}
+
+static void
+ws_msg_fini(ws_msg *wm)
+{
+ ws_frame *frame;
+
+ NNI_ASSERT(!nni_list_node_active(&wm->node));
+ while ((frame = nni_list_first(&wm->frames)) != NULL) {
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ }
+
+ if (wm->msg != NULL) {
+ nni_msg_free(wm->msg);
+ }
+ NNI_FREE_STRUCT(wm);
+}
+
+static void
+ws_mask_frame(ws_frame *frame)
+{
+ uint32_t r;
+ // frames sent by client need mask.
+ if (frame->masked) {
+ return;
+ }
+ r = nni_random();
+ NNI_PUT32(frame->mask, r);
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ memcpy(frame->head + frame->hlen, frame->mask, 4);
+ frame->hlen += 4;
+ frame->head[1] |= 0x80; // set masked bit
+ frame->masked = true;
+}
+
+static void
+ws_unmask_frame(ws_frame *frame)
+{
+ // frames sent by client need mask.
+ if (!frame->masked) {
+ return;
+ }
+ for (int i = 0; i < frame->len; i++) {
+ frame->buf[i] ^= frame->mask[i % 4];
+ }
+ frame->hlen -= 4;
+ frame->head[1] &= 0x7f; // clear masked bit
+ frame->masked = false;
+}
+
+static int
+ws_msg_init_control(
+ ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len)
+{
+ ws_msg * wm;
+ ws_frame *frame;
+
+ if (len > 125) {
+ return (NNG_EINVAL);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ memcpy(frame->sdata, buf, len);
+
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len;
+ frame->final = true;
+ frame->op = op;
+ frame->head[0] = op | 0x80; // final frame (control)
+ frame->head[1] = len & 0x7F;
+ frame->hlen = 2;
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ wm->aio = NULL;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio)
+{
+ ws_msg * wm;
+ size_t len;
+ size_t maxfrag = ws->fragsize; // make this tunable. (1MB default)
+ uint8_t *buf;
+ uint8_t op;
+
+ // If the message has a header, move it to front of body. Most of
+ // the time this will not cause a reallocation (there should be
+ // headroom). Doing this simplifies our framing, and avoids sending
+ // tiny frames for headers.
+ if ((len = nni_msg_header_len(msg)) != 0) {
+ int rv;
+ buf = nni_msg_header(msg);
+ if ((rv = nni_msg_insert(msg, buf, len)) != 0) {
+ return (rv);
+ }
+ nni_msg_header_clear(msg);
+ }
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+
+ len = nni_msg_len(msg);
+ buf = nni_msg_body(msg);
+ op = WS_BINARY; // to start -- no support for sending TEXT frames
+
+ // do ... while because we want at least one frame (even for empty
+ // messages.) Headers get their own frame, if present. Best bet
+ // is to try not to have a header when coming here.
+ do {
+ ws_frame *frame;
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ ws_msg_fini(wm);
+ return (NNG_ENOMEM);
+ }
+ nni_list_append(&wm->frames, frame);
+ frame->wmsg = wm;
+ frame->len = len > maxfrag ? maxfrag : len;
+ frame->buf = buf;
+ frame->op = op;
+
+ buf += frame->len;
+ len -= frame->len;
+ op = WS_CONT;
+
+ if (len == 0) {
+ frame->final = true;
+ }
+ frame->head[0] = frame->op;
+ frame->hlen = 2;
+ if (frame->final) {
+ frame->head[0] |= 0x80; // final frame bit
+ }
+ if (frame->len < 126) {
+ frame->head[1] = frame->len & 0x7f;
+ } else if (frame->len < 65536) {
+ frame->head[1] = 126;
+ NNI_PUT16(frame->head + 2, (frame->len & 0xffff));
+ frame->hlen += 2;
+ } else {
+ frame->head[1] = 127;
+ NNI_PUT64(frame->head + 2, (uint64_t) frame->len);
+ frame->hlen += 8;
+ }
+
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_mask_frame(frame);
+ } else {
+ frame->masked = false;
+ }
+
+ } while (len);
+
+ wm->msg = msg;
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static int
+ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+
+ if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&wm->frames, ws_frame, node);
+ wm->aio = aio;
+ wm->ws = ws;
+ *wmp = wm;
+ return (0);
+}
+
+static void
+ws_close_cb(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ // Either we sent a close frame, or we didn't. Either way,
+ // we are done, and its time to abort everything else.
+ nni_mtx_lock(&ws->mtx);
+
+ nni_http_close(ws->http);
+ nni_aio_cancel(ws->txaio, NNG_ECLOSED);
+ nni_aio_cancel(ws->rxaio, NNG_ECLOSED);
+
+ // This list (receive) should be empty.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe != NULL) {
+ ws_frame_fini(ws->rxframe);
+ ws->rxframe = NULL;
+ }
+
+ // Any txframe should have been killed with its wmsg.
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg *wm;
+
+ // Receive stuff gets aborted always. No further receives
+ // once we get a close.
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ // If were closing "gracefully", then don't abort in-flight
+ // stuff yet. Note that reads should have stopped already.
+ if (!ws->closed) {
+ ws_send_close(ws, code);
+ return;
+ }
+}
+
+static void
+ws_start_write(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+
+ if ((ws->txframe != NULL) || (!ws->ready)) {
+ return; // busy
+ }
+
+ if ((wm = nni_list_first(&ws->txmsgs)) == NULL) {
+ // Nothing to send.
+ return;
+ }
+
+ frame = nni_list_first(&wm->frames);
+ NNI_ASSERT(frame != NULL);
+
+ // Push it out.
+ ws->txframe = frame;
+ ws->txaio->a_niov = frame->len > 0 ? 2 : 1;
+ ws->txaio->a_iov[0].iov_len = frame->hlen;
+ ws->txaio->a_iov[0].iov_buf = frame->head;
+ if (frame->len > 0) {
+ ws->txaio->a_iov[1].iov_len = frame->len;
+ ws->txaio->a_iov[1].iov_buf = frame->buf;
+ }
+ nni_http_write_full(ws->http, ws->txaio);
+}
+
+static void
+ws_cancel_close(nni_aio *aio, int rv)
+{
+ nni_ws *ws = aio->a_prov_data;
+ nni_mtx_lock(&ws->mtx);
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+
+ if (ws->txframe->op == WS_CLOSE) {
+ // If this was a close frame, we are done.
+ // No other messages may succeed..
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_set_msg(wm->aio, NULL);
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+ if (ws->wclose) {
+ ws->wclose = false;
+ nni_aio_finish(ws->closeaio, 0, 0);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ frame = ws->txframe;
+ wm = frame->wmsg;
+ aio = wm->aio;
+
+ if ((rv = nni_aio_result(ws->txaio)) != 0) {
+
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish_error(aio, rv);
+ }
+
+ ws->closed = true;
+ nni_http_close(ws->http);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // good frame, was it the last
+ nni_list_remove(&wm->frames, frame);
+ ws_frame_fini(frame);
+ if (nni_list_empty(&wm->frames)) {
+ nni_list_remove(&ws->txmsgs, wm);
+ ws_msg_fini(wm);
+ if (aio != NULL) {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ // Write the next frame.
+ ws_start_write(ws);
+
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_write_cancel(nni_aio *aio, int rv)
+{
+ nni_ws * ws;
+ ws_msg * wm;
+ ws_frame *frame;
+
+ // Is this aio active? We can tell by looking at the
+ // active tx frame.
+ wm = aio->a_prov_data;
+ ws = wm->ws;
+ nni_mtx_lock(&ws->mtx);
+ if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) {
+ nni_aio_cancel(ws->txaio, rv);
+ // We will wait for callback on the txaio to finish aio.
+ } else if (nni_list_active(&ws->txmsgs, wm)) {
+ // If scheduled, just need to remove node and complete it.
+ nni_list_remove(&ws->txmsgs, wm);
+ wm->aio = NULL;
+ nni_aio_finish_error(aio, rv);
+ ws_msg_fini(wm);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_send_close(nni_ws *ws, uint16_t code)
+{
+ ws_msg * wm;
+ uint8_t buf[sizeof(uint16_t)];
+ int rv;
+ nni_aio *aio;
+
+ NNI_PUT16(buf, code);
+
+ if (ws->closed) {
+ return;
+ }
+ ws->closed = true;
+ aio = ws->closeaio;
+
+ // We don't care about cancellation here. If this times out,
+ // we will still shut all the physical I/O down in the callback.
+ if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ return;
+ }
+ ws->wclose = true;
+ rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf));
+ if (rv != 0) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ // Close frames get priority!
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+}
+
+static void
+ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len)
+{
+ ws_msg *wm;
+
+ // Note that we do not care if this works or not. So no AIO needed.
+
+ nni_mtx_lock(&ws->mtx);
+ if ((ws->closed) ||
+ (ws_msg_init_control(&wm, ws, op, buf, sizeof(buf)) != 0)) {
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Control frames at head of list. (Note that this may preempt
+ // the close frame or other ping/pong requests. Oh well.)
+ nni_list_prepend(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg * wm;
+ nni_msg *msg;
+ int rv;
+
+ msg = nni_aio_get_msg(aio);
+
+ if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, rv);
+ }
+ return;
+ }
+
+ nni_mtx_lock(&ws->mtx);
+ nni_aio_set_msg(aio, NULL);
+
+ if (ws->closed) {
+ ws_msg_fini(wm);
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_write_cancel, wm) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ ws_msg_fini(wm);
+ return;
+ }
+ nni_list_append(&ws->txmsgs, wm);
+ ws_start_write(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_start_read(nni_ws *ws)
+{
+ ws_frame *frame;
+ ws_msg * wm;
+ nni_aio * aio;
+
+ if ((ws->rxframe != NULL) || ws->closed) {
+ return; // already reading or closed
+ }
+
+ if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) {
+ return; // no body expecting a message.
+ }
+
+ if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio != NULL) {
+ nni_aio_finish_error(wm->aio, NNG_ENOMEM);
+ }
+ ws_msg_fini(wm);
+ // XXX: NOW WHAT?
+ return;
+ }
+
+ // Note that the frame is *not* associated with the message
+ // as yet, because we don't know if that's right until we receive it.
+ frame->hlen = 0;
+ frame->len = 0;
+ ws->rxframe = frame;
+
+ aio = ws->rxaio;
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_len = 2; // We want the first two bytes.
+ aio->a_iov[0].iov_buf = frame->head;
+ nni_http_read_full(ws->http, aio);
+}
+
+static void
+ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
+{
+ ws_msg *wm = nni_list_first(&ws->rxmsgs);
+
+ switch (frame->op) {
+ case WS_CONT:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_BINARY:
+ if (wm == NULL) {
+ ws_close(ws, WS_CLOSE_GOING_AWAY);
+ return;
+ }
+ if (!nni_list_empty(&wm->frames)) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ nni_list_append(&wm->frames, frame);
+ break;
+ case WS_TEXT:
+ // No support for text mode at present.
+ ws_close(ws, WS_CLOSE_UNSUPP_FORMAT);
+ return;
+
+ case WS_PING:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws_send_control(ws, WS_PONG, frame->buf, frame->len);
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_PONG:
+ if (frame->len > 125) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+ ws->rxframe = NULL;
+ ws_frame_fini(frame);
+ break;
+ case WS_CLOSE:
+ ws->closed = true; // no need to send close reply
+ ws_close(ws, 0);
+ return;
+ default:
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ return;
+ }
+
+ // If this was the last (final) frame, then complete it. But
+ // we have to look at the msg, since we might have got a
+ // control frame.
+ if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) {
+ size_t len = 0;
+ nni_msg *msg;
+ uint8_t *body;
+ int rv;
+
+ nni_list_remove(&ws->rxmsgs, wm);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ len += frame->len;
+ }
+ if ((rv = nni_msg_alloc(&msg, len)) != 0) {
+ nni_aio_finish_error(wm->aio, rv);
+ ws_msg_fini(wm);
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ return;
+ }
+ body = nni_msg_body(msg);
+ NNI_LIST_FOREACH (&wm->frames, frame) {
+ memcpy(body, frame->buf, frame->len);
+ body += frame->len;
+ }
+ nni_aio_finish_msg(wm->aio, msg);
+ wm->aio = NULL;
+ ws_msg_fini(wm);
+ }
+}
+
+static void
+ws_read_cb(void *arg)
+{
+ nni_ws * ws = arg;
+ nni_aio * aio = ws->rxaio;
+ ws_frame *frame;
+ int rv;
+
+ nni_mtx_lock(&ws->mtx);
+ if ((frame = ws->rxframe) == NULL) {
+ nni_mtx_unlock(&ws->mtx); // canceled during close
+ return;
+ }
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ ws->closed = true; // do not send a close frame
+ ws_close(ws, 0);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ if (frame->hlen == 0) {
+ frame->hlen = 2;
+ frame->op = frame->head[0] & 0x7f;
+ frame->final = (frame->head[0] & 0x80) ? 1 : 0;
+ frame->masked = (frame->head[1] & 0x80) ? 1 : 0;
+ if (frame->masked) {
+ frame->hlen += 4;
+ }
+ if ((frame->head[1] & 0x7F) == 127) {
+ frame->hlen += 8;
+ } else if ((frame->head[1] & 0x7F) == 126) {
+ frame->hlen += 2;
+ }
+
+ // If we didn't read the full header yet, then read
+ // the rest of it.
+ if (frame->hlen != 2) {
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->head + 2;
+ aio->a_iov[0].iov_len = frame->hlen - 2;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // If we are returning from a read of additional data, then
+ // the buf will be set. Otherwise we need to determine
+ // how much data to read. As our headers are complete, we take
+ // this time to do some protocol checks -- no point in waiting
+ // to read data. (Frame size check needs to be done first
+ // anyway to prevent DoS.)
+
+ if (frame->buf == NULL) {
+
+ // Determine expected frame size.
+ switch ((frame->len = (frame->head[1] & 0x7F))) {
+ case 127:
+ NNI_GET64(frame->head + 2, frame->len);
+ if (frame->len < 65536) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ break;
+ case 126:
+ NNI_GET16(frame->head + 2, frame->len);
+ if (frame->len < 126) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ break;
+ }
+
+ if (frame->len > ws->maxframe) {
+ ws_close(ws, WS_CLOSE_TOO_BIG);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // Check for masking. (We don't actually do the unmask
+ // here, because we don't have data yet.)
+ if (frame->masked) {
+ memcpy(frame->mask, frame->head + frame->hlen - 4, 4);
+ if (ws->mode == NNI_EP_MODE_DIAL) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ } else if (ws->mode == NNI_EP_MODE_LISTEN) {
+ ws_close(ws, WS_CLOSE_PROTOCOL_ERR);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+
+ // If we expected data, then ask for it.
+ if (frame->len != 0) {
+
+ // Short frames can avoid an alloc
+ if (frame->len < 126) {
+ frame->buf = frame->sdata;
+ frame->bufsz = 0;
+ } else {
+ frame->buf = nni_alloc(frame->len);
+ if (frame->buf == NULL) {
+ ws_close(ws, WS_CLOSE_INTERNAL);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ frame->bufsz = frame->len;
+ }
+
+ aio->a_niov = 1;
+ aio->a_iov[0].iov_buf = frame->buf;
+ aio->a_iov[0].iov_len = frame->len;
+ nni_http_read_full(ws->http, aio);
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ }
+
+ // At this point, we have a complete frame.
+ ws_unmask_frame(frame); // idempotent
+
+ ws_read_frame_cb(ws, frame);
+ ws_start_read(ws);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+static void
+ws_read_cancel(nni_aio *aio, int rv)
+{
+ ws_msg *wm = aio->a_prov_data;
+ nni_ws *ws = wm->ws;
+
+ nni_mtx_lock(&ws->mtx);
+ if (wm == nni_list_first(&ws->rxmsgs)) {
+ // Cancellation will percolate back up.
+ nni_aio_cancel(ws->rxaio, rv);
+ } else if (nni_list_active(&ws->rxmsgs, wm)) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ ws_msg_fini(wm);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
+{
+ ws_msg *wm;
+ int rv;
+ nni_mtx_lock(&ws->mtx);
+ if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
+ if (nni_aio_start(aio, NULL, NULL)) {
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ws->mtx);
+ return;
+ }
+ if (nni_aio_start(aio, ws_read_cancel, wm) == 0) {
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+ }
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close_error(nni_ws *ws, uint16_t code)
+{
+ nni_mtx_lock(&ws->mtx);
+ ws_close(ws, code);
+ nni_mtx_unlock(&ws->mtx);
+}
+
+void
+nni_ws_close(nni_ws *ws)
+{
+ nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
+}
+
+nni_http_res *
+nni_ws_response(nni_ws *ws)
+{
+ return (ws->res);
+}
+
+nni_http_req *
+nni_ws_request(nni_ws *ws)
+{
+ return (ws->req);
+}
+
+static void
+ws_fini(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_msg *wm;
+
+ nni_ws_close(ws);
+
+ // Give a chance for the close frame to drain.
+ if (ws->closeaio) {
+ nni_aio_wait(ws->closeaio);
+ }
+
+ nni_aio_stop(ws->rxaio);
+ nni_aio_stop(ws->txaio);
+ nni_aio_stop(ws->closeaio);
+ nni_aio_stop(ws->httpaio);
+
+ nni_mtx_lock(&ws->mtx);
+ while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
+ nni_list_remove(&ws->rxmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ while ((wm = nni_list_first(&ws->txmsgs)) != NULL) {
+ nni_list_remove(&ws->txmsgs, wm);
+ if (wm->aio) {
+ nni_aio_finish_error(wm->aio, NNG_ECLOSED);
+ }
+ ws_msg_fini(wm);
+ }
+
+ if (ws->rxframe) {
+ ws_frame_fini(ws->rxframe);
+ }
+ nni_mtx_unlock(&ws->mtx);
+
+ if (ws->req) {
+ nni_http_req_fini(ws->req);
+ }
+ if (ws->res) {
+ nni_http_res_fini(ws->res);
+ }
+
+ nni_http_fini(ws->http);
+ nni_aio_fini(ws->rxaio);
+ nni_aio_fini(ws->txaio);
+ nni_aio_fini(ws->closeaio);
+ nni_aio_fini(ws->httpaio);
+ nni_mtx_fini(&ws->mtx);
+ NNI_FREE_STRUCT(ws);
+}
+
+void
+nni_ws_fini(nni_ws *ws)
+{
+ nni_reap(&ws->reap, ws_fini, ws);
+}
+
+static void
+ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
+{
+ // This is only
+ nni_ws_listener *l;
+ l = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&l->mtx);
+ nni_list_remove(&l->reply, ws);
+ if (nni_aio_result(aio) != 0) {
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ ws->ready = true;
+ if ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_list_remove(&l->aios, aio);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->pend, ws);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
+{
+ nni_ws_dialer *d;
+ nni_aio * uaio;
+ int rv;
+ uint16_t status;
+ char wskey[29];
+ const char * ptr;
+
+ d = nni_aio_get_data(aio, 0);
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->httpaios);
+ NNI_ASSERT(uaio != NULL);
+ // We have two steps. In step 1, we just sent the request,
+ // and need to retrieve the reply. In step two we have
+ // received the reply, and need to validate it.
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto err;
+ }
+
+ // If we have no response structure, then this was completion of the
+ // send of the request. Prepare an empty response, and read it.
+ if (ws->res == NULL) {
+ if ((rv = nni_http_res_init(&ws->res)) != 0) {
+ goto err;
+ }
+ nni_http_read_res(ws->http, ws->res, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ status = nni_http_res_get_status(ws->res);
+ switch (status) {
+ case NNI_HTTP_STATUS_SWITCHING:
+ break;
+ case NNI_HTTP_STATUS_FORBIDDEN:
+ case NNI_HTTP_STATUS_UNAUTHORIZED:
+ rv = NNG_EPERM;
+ goto err;
+ case NNI_HTTP_STATUS_NOT_FOUND:
+ case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED:
+ rv = NNG_ECONNREFUSED; // Treat these as refusals.
+ goto err;
+ case NNI_HTTP_STATUS_BAD_REQUEST:
+ default:
+ // Perhaps we should use NNG_ETRANERR...
+ rv = NNG_EPROTO;
+ goto err;
+ }
+
+ // Check that the server gave us back the right key.
+ rv = ws_make_accept(
+ nni_http_req_get_header(ws->req, "Sec-WebSocket-Key"), wskey);
+ if (rv != 0) {
+ goto err;
+ }
+
+#define GETH(h) nni_http_res_get_header(ws->res, h)
+
+ if (((ptr = GETH("Sec-WebSocket-Accept")) == NULL) ||
+ (strcmp(ptr, wskey) != 0) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Upgrade")) == NULL) ||
+ (strcmp(ptr, "websocket") != 0)) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ if (d->proto != NULL) {
+ if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) ||
+ (!ws_contains_word(d->proto, ptr))) {
+ nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR);
+ rv = NNG_EPROTO;
+ goto err;
+ }
+ }
+#undef GETH
+
+ // At this point, we are in business!
+ ws->ready = true;
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_pipe(uaio, ws);
+ nni_mtx_unlock(&d->mtx);
+ return;
+err:
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ nni_ws_fini(ws);
+ nni_mtx_unlock(&d->mtx);
+}
+
+static void
+ws_http_cb(void *arg)
+{
+ // This is only done on the server/listener side.
+ nni_ws * ws = arg;
+ nni_aio *aio = ws->httpaio;
+
+ switch (ws->mode) {
+ case NNI_EP_MODE_LISTEN:
+ ws_http_cb_listener(ws, aio);
+ break;
+ case NNI_EP_MODE_DIAL:
+ ws_http_cb_dialer(ws, aio);
+ break;
+ }
+}
+
+static int
+ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res)
+{
+ nni_ws *ws;
+ int rv;
+
+ if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ws->mtx);
+ NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node);
+ NNI_LIST_INIT(&ws->txmsgs, ws_msg, node);
+
+ if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) ||
+ ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) {
+ nni_ws_fini(ws);
+ return (rv);
+ }
+
+ nni_aio_set_timeout(ws->closeaio, 100);
+ nni_aio_set_timeout(ws->httpaio, 1000);
+
+ ws->fragsize = 1 << 20; // we won't send a frame larger than this
+ ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size
+ ws->http = http;
+ ws->req = req;
+ ws->res = res;
+ *wsp = ws;
+ return (0);
+}
+
+void
+nni_ws_listener_fini(nni_ws_listener *l)
+{
+ nni_mtx_fini(&l->mtx);
+ nni_strfree(l->url);
+ nni_strfree(l->proto);
+ nni_strfree(l->host);
+ nni_strfree(l->serv);
+ nni_strfree(l->path);
+ NNI_FREE_STRUCT(l);
+}
+
+static void
+ws_handler(nni_aio *aio)
+{
+ nni_ws_listener *l;
+ nni_ws * ws;
+ nni_http * http;
+ nni_http_req * req;
+ nni_http_res * res;
+ const char * ptr;
+ const char * proto;
+ uint16_t status;
+ int rv;
+ char key[29];
+
+ http = nni_aio_get_input(aio, 0);
+ req = nni_aio_get_input(aio, 1);
+ l = nni_aio_get_input(aio, 2);
+
+ // Now check the headers, etc.
+ if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) {
+ status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP;
+ goto err;
+ }
+
+ if (strcmp(nni_http_req_get_method(req), "GET") != 0) {
+ // HEAD request. We can't really deal with it.
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+#define GETH(h) nni_http_req_get_header(req, h)
+#define SETH(h, v) nni_http_res_set_header(res, h, v)
+
+ if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) ||
+ (((ptr = GETH("Transfer-Encoding")) != NULL) &&
+ (nni_strcasestr(ptr, "chunked") != NULL))) {
+ status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE;
+ goto err;
+ }
+
+ // These headers have to be present.
+ if (((ptr = GETH("Upgrade")) == NULL) ||
+ (!ws_contains_word(ptr, "websocket")) ||
+ ((ptr = GETH("Connection")) == NULL) ||
+ (!ws_contains_word(ptr, "upgrade")) ||
+ ((ptr = GETH("Sec-WebSocket-Version")) == NULL) ||
+ (strcmp(ptr, "13") != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) ||
+ (ws_make_accept(ptr, key) != 0)) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ // If the client has requested a specific subprotocol, then
+ // we need to try to match it to what the handler says we support.
+ // (If no suitable option is found in the handler, we fail the
+ // request.)
+ proto = GETH("Sec-WebSocket-Protocol");
+ if (proto == NULL) {
+ if (l->proto != NULL) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+ } else if ((l->proto == NULL) ||
+ (!ws_contains_word(l->proto, proto))) {
+ status = NNI_HTTP_STATUS_BAD_REQUEST;
+ goto err;
+ }
+
+ if ((rv = nni_http_res_init(&res)) != 0) {
+ // Give a chance to reply to client.
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if (nni_http_res_set_status(
+ res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+
+ if ((SETH("Connection", "Upgrade") != 0) ||
+ (SETH("Upgrade", "websocket") != 0) ||
+ (SETH("Sec-WebSocket-Accept", key) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+ if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) {
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ nni_http_res_fini(res);
+ goto err;
+ }
+
+ if (l->hookfn != NULL) {
+ rv = l->hookfn(l->hookarg, req, res);
+ if (rv != 0) {
+ nni_http_res_fini(res);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if (nni_http_res_get_status(res) !=
+ NNI_HTTP_STATUS_SWITCHING) {
+ // The hook has decided to give back a different
+ // reply and we are not upgrading anymore. For
+ // example the Origin might not be permitted, or
+ // another level of authentication may be required.
+ // (Note that the hook can also give back various
+ // other headers, but it would be bad for it to
+ // alter the websocket mandated headers.)
+ nni_http_req_fini(req);
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ return;
+ }
+ }
+
+#undef GETH
+#undef SETH
+
+ // We are good to go, provided we can get the websocket struct,
+ // and send the reply.
+ if ((rv = ws_init(&ws, http, req, res)) != 0) {
+ nni_http_res_fini(res);
+ status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR;
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_LISTEN;
+
+ // XXX: Inherit fragmentation and message size limits!
+
+ nni_list_append(&l->reply, ws);
+ nni_aio_set_data(ws->httpaio, 0, l);
+ nni_http_write_res(http, res, ws->httpaio);
+ nni_aio_set_output(aio, 0, NULL);
+ nni_aio_set_input(aio, 1, NULL);
+ nni_aio_finish(aio, 0, 0);
+ return;
+
+err:
+ nni_http_req_fini(req);
+ if ((rv = nni_http_res_init_error(&res, status)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_set_output(aio, 0, res);
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+static int
+ws_parse_url(const char *url, char **schemep, char **hostp, char **servp,
+ char **pathp, char **queryp)
+{
+ size_t scrlen;
+ char * scr;
+ char * pair;
+ char * scheme = NULL;
+ char * path = NULL;
+ char * query = NULL;
+ char * host = NULL;
+ char * serv = NULL;
+ int rv;
+
+ // We need a scratch copy of the url to parse.
+ scrlen = strlen(url) + 1;
+ if ((scr = nni_alloc(scrlen)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_strlcpy(scr, url, scrlen);
+ scheme = scr;
+ pair = strchr(scr, ':');
+ if ((pair == NULL) || (pair[1] != '/') || (pair[2] != '/')) {
+ nni_free(scr, scrlen);
+ return (NNG_EADDRINVAL);
+ }
+
+ *pair = '\0';
+ pair += 3;
+
+ path = strchr(pair, '/');
+ if (path != NULL) {
+ *path = '\0'; // We will restore it shortly.
+ }
+ if ((rv = nni_tran_parse_host_port(pair, hostp, servp)) != 0) {
+ nni_free(scr, scrlen);
+ return (rv);
+ }
+
+ // If service was missing, assume normal defaults.
+ if (*servp == NULL) {
+ if (strcmp(scheme, "wss")) {
+ *servp = nni_strdup("443");
+ } else {
+ *servp = nni_strdup("80");
+ }
+ }
+
+ if (path) {
+ // Restore the path, and trim off the query parameter.
+ *path = '/';
+ if ((query = strchr(path, '?')) != NULL) {
+ *query = '\0';
+ query++;
+ } else {
+ query = "";
+ }
+ } else {
+ path = "/";
+ query = "";
+ }
+
+ if (schemep) {
+ *schemep = nni_strdup(scheme);
+ }
+ if (pathp) {
+ *pathp = nni_strdup(path);
+ }
+ if (queryp) {
+ *queryp = nni_strdup(query);
+ }
+ nni_free(scr, scrlen);
+
+ if ((schemep && (*schemep == NULL)) || (*pathp == NULL) ||
+ (*servp == NULL) || (queryp && (*queryp == NULL))) {
+ nni_strfree(*hostp);
+ nni_strfree(*servp);
+ nni_strfree(*pathp);
+ if (schemep) {
+ nni_strfree(*schemep);
+ }
+ if (queryp) {
+ nni_strfree(*queryp);
+ }
+ return (NNG_ENOMEM);
+ }
+
+ return (0);
+}
+
+int
+nni_ws_listener_init(nni_ws_listener **wslp, const char *url)
+{
+ nni_ws_listener *l;
+ int rv;
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->aios);
+
+ NNI_LIST_INIT(&l->pend, nni_ws, node);
+ NNI_LIST_INIT(&l->reply, nni_ws, node);
+
+ rv = ws_parse_url(url, NULL, &l->host, &l->serv, &l->path, NULL);
+ if (rv != 0) {
+ nni_ws_listener_fini(l);
+ return (rv);
+ }
+ l->handler.h_is_dir = false;
+ l->handler.h_is_upgrader = true;
+ l->handler.h_method = "GET";
+ l->handler.h_path = l->path;
+ l->handler.h_host = l->host;
+ l->handler.h_cb = ws_handler;
+
+ *wslp = l;
+ return (0);
+}
+
+int
+nni_ws_listener_proto(nni_ws_listener *l, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
+ rv = NNG_EBUSY;
+ } else if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (l->proto != NULL) {
+ nni_strfree(l->proto);
+ }
+ l->proto = ns;
+ }
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+}
+
+static void
+ws_accept_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_listener *l = aio->a_prov_data;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
+{
+ nni_ws *ws;
+
+ nni_mtx_lock(&l->mtx);
+ if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (l->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if (!l->started) {
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ if ((ws = nni_list_first(&l->pend)) != NULL) {
+ nni_list_remove(&l->pend, ws);
+ nni_aio_finish_pipe(aio, ws);
+ } else {
+ nni_list_append(&l->aios, aio);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_close(nni_ws_listener *l)
+{
+ nni_aio *aio;
+ nni_ws * ws;
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
+ l->closed = true;
+ if (l->server != NULL) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+ NNI_LIST_FOREACH (&l->pend, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ NNI_LIST_FOREACH (&l->reply, ws) {
+ nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY);
+ }
+ nni_mtx_unlock(&l->mtx);
+}
+
+int
+nni_ws_listener_listen(nni_ws_listener *l)
+{
+ nng_sockaddr sa;
+ nni_aio * aio;
+ int rv;
+
+ nni_mtx_lock(&l->mtx);
+ if (l->closed) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ECLOSED);
+ }
+ if (l->started) {
+ nni_mtx_unlock(&l->mtx);
+ return (NNG_ESTATE);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv(l->host, l->serv, NNG_AF_UNSPEC, true, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ if ((rv = nni_http_server_init(&l->server, &sa)) != 0) {
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ rv = nni_http_server_add_handler(&l->hp, l->server, &l->handler, l);
+ if (rv != 0) {
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ nni_mtx_unlock(&l->mtx);
+ return (rv);
+ }
+
+ // XXX: DEAL WITH HTTPS here.
+
+ if ((rv = nni_http_server_start(l->server)) != 0) {
+ nni_http_server_del_handler(l->server, l->hp);
+ nni_http_server_fini(l->server);
+ l->server = NULL;
+ }
+
+ l->started = true;
+
+ nni_mtx_unlock(&l->mtx);
+ return (0);
+}
+
+void
+nni_ws_listener_hook(
+ nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg)
+{
+ nni_mtx_lock(&l->mtx);
+ l->hookfn = hookfn;
+ l->hookarg = hookarg;
+ nni_mtx_unlock(&l->mtx);
+}
+
+void
+nni_ws_listener_tls(nni_ws_listener *l, nni_tls_config *tls)
+{
+ // We need to add this later.
+}
+
+void
+ws_conn_cb(void *arg)
+{
+ nni_ws_dialer *d = arg;
+ nni_aio * aio = d->conaio;
+ nni_aio * uaio;
+ nni_http * http;
+ nni_http_req * req = NULL;
+ int rv;
+ uint8_t raw[16];
+ char wskey[25];
+ nni_ws * ws;
+
+ nni_mtx_lock(&d->mtx);
+ uaio = nni_list_first(&d->conaios);
+ rv = nni_aio_result(aio);
+ http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL;
+
+ if (uaio == NULL) {
+ if (http) {
+ // Nobody listening anymore - hard abort.
+ nni_http_fini(http);
+ }
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+
+ nni_aio_list_remove(uaio);
+ nni_aio_set_output(aio, 0, NULL);
+
+ // We are done with this aio, start another connection request while
+ // we finish up, if we have more clients waiting.
+ if (!nni_list_empty(&d->conaios)) {
+ nni_http_client_connect(d->client, aio);
+ }
+
+ if (rv != 0) {
+ goto err;
+ }
+
+ for (int i = 0; i < 16; i++) {
+ raw[i] = nni_random();
+ }
+ nni_base64_encode(raw, 16, wskey, 24);
+ wskey[24] = '\0';
+
+ if (d->qinfo && d->qinfo[0] != '\0') {
+ rv = nni_asprintf(&d->uri, "%s?%s", d->path, d->qinfo);
+ } else if ((d->uri = nni_strdup(d->path)) == NULL) {
+ rv = NNG_ENOMEM;
+ }
+
+#define SETH(h, v) nni_http_req_set_header(req, h, v)
+ if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) ||
+ ((rv = nni_http_req_set_uri(req, d->uri)) != 0) ||
+ ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) ||
+ ((rv = nni_http_req_set_method(req, "GET")) != 0) ||
+ ((rv = SETH("Host", d->host)) != 0) ||
+ ((rv = SETH("Upgrade", "websocket")) != 0) ||
+ ((rv = SETH("Connection", "Upgrade")) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) ||
+ ((rv = SETH("Sec-WebSocket-Version", "13")) != 0)) {
+ goto err;
+ }
+
+ // If consumer asked for protocol, pass it on.
+ if ((d->proto != NULL) &&
+ ((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) {
+ goto err;
+ }
+#undef SETH
+
+ if ((rv = ws_init(&ws, http, req, NULL)) != 0) {
+ goto err;
+ }
+ ws->mode = NNI_EP_MODE_DIAL;
+
+ // Move this uaio to the http wait list. Note that it is not
+ // required that the uaio will be completed by this connection.
+ // If another connection attempt completes first, then the first
+ // aio queued will get the result.
+ nni_list_append(&d->httpaios, uaio);
+ nni_aio_set_data(ws->httpaio, 0, d);
+ nni_http_write_req(http, req, ws->httpaio);
+ nni_mtx_unlock(&d->mtx);
+ return;
+
+err:
+ nni_aio_finish_error(uaio, rv);
+ if (http != NULL) {
+ nni_http_fini(http);
+ }
+ if (req != NULL) {
+ nni_http_req_fini(req);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_fini(nni_ws_dialer *d)
+{
+ nni_aio_fini(d->conaio);
+ nni_strfree(d->proto);
+ nni_strfree(d->addr);
+ nni_strfree(d->uri);
+ nni_strfree(d->host);
+ nni_strfree(d->serv);
+ nni_strfree(d->path);
+ nni_strfree(d->qinfo);
+ if (d->client) {
+ nni_http_client_fini(d->client);
+ }
+ nni_mtx_fini(&d->mtx);
+ NNI_FREE_STRUCT(d);
+}
+
+int
+nni_ws_dialer_init(nni_ws_dialer **dp, const char *url)
+{
+ nni_ws_dialer *d;
+ int rv;
+ nni_aio * aio;
+
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->conaios);
+ nni_aio_list_init(&d->httpaios);
+
+ if ((d->addr = nni_strdup(url)) == NULL) {
+ nni_ws_dialer_fini(d);
+ return (NNG_ENOMEM);
+ }
+ if ((rv = ws_parse_url(
+ url, NULL, &d->host, &d->serv, &d->path, &d->qinfo)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+ // XXX: this is synchronous. We should fix this in the HTTP layer.
+ aio->a_addr = &d->sa;
+ nni_plat_tcp_resolv(d->host, d->serv, NNG_AF_UNSPEC, false, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
+ if (rv != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) {
+ nni_ws_dialer_fini(d);
+ return (rv);
+ }
+
+ *dp = d;
+ return (0);
+}
+
+void
+nni_ws_dialer_close(nni_ws_dialer *d)
+{
+ // XXX: what to do here?
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ d->closed = true;
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_cancel(d->conaio, NNG_ECLOSED);
+}
+
+int
+nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto)
+{
+ int rv = 0;
+ char *ns;
+ nni_mtx_lock(&d->mtx);
+ if ((ns = nni_strdup(proto)) == NULL) {
+ rv = NNG_ENOMEM;
+ } else {
+ if (d->proto != NULL) {
+ nni_strfree(d->proto);
+ }
+ d->proto = ns;
+ }
+ nni_mtx_unlock(&d->mtx);
+ return (rv);
+}
+
+static void
+ws_dial_cancel(nni_aio *aio, int rv)
+{
+ nni_ws_dialer *d = aio->a_prov_data;
+ nni_mtx_lock(&d->mtx);
+ // If we are waiting, then we can cancel. Otherwise we need
+ // to abort.
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ // This does not cancel in-flight client negotiations with HTTP.
+ nni_mtx_unlock(&d->mtx);
+}
+
+void
+nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
+{
+ nni_mtx_lock(&d->mtx);
+ // First look up the host.
+ if (nni_aio_start(aio, ws_dial_cancel, d) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ if (d->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ nni_list_append(&d->conaios, aio);
+
+ if (!d->started) {
+ d->started = true;
+ nni_http_client_connect(d->client, d->conaio);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+
+// Dialer does not get a hook chance, as it can examine the request
+// and reply after dial is done; this is not a 3-way handshake, so
+// the dialer does not confirm the server's response at the HTTP
+// level. (It can still issue a websocket close).
+
+// The implementation will send periodic PINGs, and respond with
+// PONGs.
diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h
new file mode 100644
index 00000000..25add55e
--- /dev/null
+++ b/src/supplemental/websocket/websocket.h
@@ -0,0 +1,63 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+#define NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
+
+// Pre-defined types for some prototypes. These are from other subsystems.
+typedef struct nni_tls_config nni_tls_config;
+typedef struct nni_http_req nni_http_req;
+typedef struct nni_http_res nni_http_res;
+
+typedef struct nni_ws nni_ws;
+typedef struct nni_ws_listener nni_ws_listener;
+typedef struct nni_ws_dialer nni_ws_dialer;
+
+typedef int (*nni_ws_listen_hook)(void *, nni_http_req *, nni_http_res *);
+
+// Specify URL as ws://[<host>][:port][/path]
+// If host is missing, INADDR_ANY is assumed. If port is missing,
+// then either 80 or 443 are assumed. Note that ws:// means listen
+// on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY
+// makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?)
+
+extern int nni_ws_listener_init(nni_ws_listener **, const char *);
+extern void nni_ws_listener_fini(nni_ws_listener *);
+extern void nni_ws_listener_close(nni_ws_listener *);
+extern int nni_ws_listener_proto(nni_ws_listener *, const char *);
+extern int nni_ws_listener_listen(nni_ws_listener *);
+extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *);
+extern void nni_ws_listener_hook(
+ nni_ws_listener *, nni_ws_listen_hook, void *);
+extern void nni_ws_listener_tls(nni_ws_listener *, nni_tls_config *);
+
+extern int nni_ws_dialer_init(nni_ws_dialer **, const char *);
+extern void nni_ws_dialer_fini(nni_ws_dialer *);
+extern void nni_ws_dialer_close(nni_ws_dialer *);
+extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *);
+extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *);
+extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *);
+
+// Dialer does not get a hook chance, as it can examine the request and reply
+// after dial is done; this is not a 3-way handshake, so the dialer does
+// not confirm the server's response at the HTTP level. (It can still issue
+// a websocket close).
+
+extern void nni_ws_send_msg(nni_ws *, nni_aio *);
+extern void nni_ws_recv_msg(nni_ws *, nni_aio *);
+extern nni_http_res *nni_ws_response(nni_ws *);
+extern nni_http_req *nni_ws_request(nni_ws *);
+extern void nni_ws_close(nni_ws *);
+extern void nni_ws_close_error(nni_ws *, uint16_t);
+extern void nni_ws_fini(nni_ws *);
+
+// The implementation will send periodic PINGs, and respond with PONGs.
+
+#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H \ No newline at end of file
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 8dcb3f60..9c794e64 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -10,7 +10,6 @@
#include <stdbool.h>
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -498,46 +497,6 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
return (rv);
}
-static int
-nni_tls_parse_pair(char *pair, char **hostp, char **servp)
-{
- char *host, *serv, *end;
-
- if (pair[0] == '[') {
- host = pair + 1;
- // IP address enclosed ... for IPv6 usually.
- if ((end = strchr(host, ']')) == NULL) {
- return (NNG_EADDRINVAL);
- }
- *end = '\0';
- serv = end + 1;
- if (*serv == ':') {
- serv++;
- } else if (*serv != '\0') {
- return (NNG_EADDRINVAL);
- }
- } else {
- host = pair;
- serv = strchr(host, ':');
- if (serv != NULL) {
- *serv = '\0';
- serv++;
- }
- }
- if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
- *hostp = NULL;
- } else {
- *hostp = host;
- }
- if ((serv == NULL) || (strlen(serv) == 0)) {
- *servp = NULL;
- } else {
- *servp = serv;
- }
- // Stash the port in big endian (network) byte order.
- return (0);
-}
-
// Note that the url *must* be in a modifiable buffer.
int
nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
@@ -555,8 +514,9 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
// is the second part.
*h1 = '\0';
h1++;
- if (((rv = nni_tls_parse_pair(h1, rhost, rserv)) != 0) ||
- ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0)) {
+ if (((rv = nni_tran_parse_host_port(h1, rhost, rserv)) != 0) ||
+ ((rv = nni_tran_parse_host_port(url, lhost, lserv)) !=
+ 0)) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -566,7 +526,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
} else if (mode == NNI_EP_MODE_DIAL) {
*lhost = NULL;
*lserv = NULL;
- if ((rv = nni_tls_parse_pair(url, rhost, rserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, rhost, rserv)) != 0) {
return (rv);
}
if ((*rserv == NULL) || (*rhost == NULL)) {
@@ -577,7 +537,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost,
NNI_ASSERT(mode == NNI_EP_MODE_LISTEN);
*rhost = NULL;
*rserv = NULL;
- if ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0) {
+ if ((rv = nni_tran_parse_host_port(url, lhost, lserv)) != 0) {
return (rv);
}
// We have to have a port to listen on!
@@ -657,9 +617,13 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
return (NNG_EADDRINVAL);
}
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// Parse the URLs first.
rv = nni_tls_parse_url(buf, &lhost, &lserv, &rhost, &rserv, mode);
if (rv != 0) {
+ nni_aio_fini(aio);
return (rv);
}
if (mode == NNI_EP_MODE_DIAL) {
@@ -672,10 +636,6 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
authmode = NNI_TLS_CONFIG_AUTH_MODE_NONE;
}
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
-
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
@@ -683,7 +643,11 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &rsa;
nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(rserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
nni_aio_fini(aio);
return (rv);
}
@@ -695,7 +659,10 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
aio->a_addr = &lsa;
nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
nni_aio_wait(aio);
+ nni_strfree(lhost);
+ nni_strfree(lserv);
if ((rv = nni_aio_result(aio)) != 0) {
+ nni_strfree(rhost);
nni_aio_fini(aio);
return (rv);
}
@@ -705,12 +672,14 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ nni_strfree(rhost);
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
if (nni_strlcpy(ep->addr, url, sizeof(ep->addr)) >= sizeof(ep->addr)) {
NNI_FREE_STRUCT(ep);
+ nni_strfree(rhost);
return (NNG_EADDRINVAL);
}
@@ -718,15 +687,18 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) ||
((rv = nni_tls_config_auth_mode(ep->cfg, authmode)) != 0) ||
((rv = nni_aio_init(&ep->aio, nni_tls_ep_cb, ep)) != 0)) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
if ((tlsmode == NNI_TLS_CONFIG_CLIENT) && (rhost != NULL)) {
if ((rv = nni_tls_config_server_name(ep->cfg, rhost)) != 0) {
+ nni_strfree(rhost);
nni_tls_ep_fini(ep);
return (rv);
}
}
+ nni_strfree(rhost);
ep->proto = nni_sock_proto(sock);
ep->authmode = authmode;
diff --git a/src/transport/ws/CMakeLists.txt b/src/transport/ws/CMakeLists.txt
new file mode 100644
index 00000000..18842df9
--- /dev/null
+++ b/src/transport/ws/CMakeLists.txt
@@ -0,0 +1,19 @@
+#
+# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# WebSocket transport
+
+if (NNG_TRANSPORT_WS)
+ set(WS_SOURCES transport/ws/websocket.c transport/ws/websocket.h)
+ install(FILES websocket.h DESTINATION include/nng/transport/ws)
+
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${WS_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/ws/README.adoc b/src/transport/ws/README.adoc
new file mode 100644
index 00000000..e3101297
--- /dev/null
+++ b/src/transport/ws/README.adoc
@@ -0,0 +1,38 @@
+= websocket transport
+
+This transport provides support for SP over websocket using TCP or TLS.
+When using TCP, it is compatible with the libnanomsg legacy transport.
+It also is compatible with mangos (both TCP and TLS).
+
+TLS support requires the mbedTLS library.
+
+We set the "protocol" such as "pair.sp.nanomsg.org" in the
+Sec-WebSocket-Protocol field -- the client sets to the the server's
+protocol - i.e. the protocol that the server speaks. For example,
+if the the server is a REP, then a REQ client would send "rep.sp.nanomsg.org".
+
+The server sends the same value (it's own), per the WebSocket specs. (Note
+that the client's protocol is never sent, but assumed to be complementary
+to the protocol in the Sec-WebSocket-Protocol field.)
+
+Each SP message is a WebSocket message.
+
+WebSocket is defined in RFC 6455.
+
+== Design
+
+We unfortunately need to implement our own design for this -- the only
+reasonable client library would be libcurl, and there is a dearth of
+suitable server libraries. Since we don't have to support full HTTP, but
+just the initial handshake, this isn't too tragic.
+
+== Multiple Server Sockets
+
+In order to support Multiple Server sockets listening on the same port,
+the application must be long lived. We will set up a listener on the
+configured TCP (or TLS) port, and examine the PATH supplied in the GET.
+This will be used to match against the URL requested, and if the URL
+matches we will create the appropriate pipe.
+
+If no server endpoint at that address can be found, we return an
+HTTP error, and close the socket.
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
new file mode 100644
index 00000000..8a73bcfb
--- /dev/null
+++ b/src/transport/ws/websocket.c
@@ -0,0 +1,533 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "supplemental/websocket/websocket.h"
+
+typedef struct ws_ep ws_ep;
+typedef struct ws_pipe ws_pipe;
+
+struct ws_ep {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ char * addr;
+ uint16_t lproto; // local protocol
+ uint16_t rproto; // remote protocol
+ size_t rcvmax;
+ char * protoname;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_aio * connaio;
+ nni_aio * accaio;
+ nni_ws_listener *listener;
+ nni_ws_dialer * dialer;
+};
+
+struct ws_pipe {
+ int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN
+ nni_mtx mtx;
+ size_t rcvmax; // inherited from EP
+ bool closed;
+ uint16_t rproto;
+ uint16_t lproto;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_ws * ws;
+};
+
+static void
+ws_pipe_send_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *taio;
+ nni_aio *uaio;
+
+ nni_mtx_lock(&p->mtx);
+ taio = p->txaio;
+ uaio = p->user_txaio;
+ p->user_txaio = NULL;
+
+ if (uaio != NULL) {
+ int rv;
+ if ((rv = nni_aio_result(taio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish(uaio, 0, 0);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cb(void *arg)
+{
+ ws_pipe *p = arg;
+ nni_aio *raio = p->rxaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&p->mtx);
+ uaio = p->user_rxaio;
+ p->user_rxaio = NULL;
+ if ((rv = nni_aio_result(raio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_msg *msg = nni_aio_get_msg(raio);
+ if (uaio != NULL) {
+ nni_aio_finish_msg(uaio, msg);
+ } else {
+ nni_msg_free(msg);
+ }
+ }
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_rxaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ nni_aio_cancel(p->rxaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_recv(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_rxaio = aio;
+
+ nni_ws_recv_msg(p->ws, p->rxaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send_cancel(nni_aio *aio, int rv)
+{
+ ws_pipe *p = aio->a_prov_data;
+ nni_mtx_lock(&p->mtx);
+ if (p->user_txaio != aio) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ // This aborts the upper send, which will call back with an error
+ // when it is done.
+ nni_aio_cancel(p->txaio, rv);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_send(void *arg, nni_aio *aio)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ return;
+ }
+ p->user_txaio = aio;
+ nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+
+ nni_ws_send_msg(p->ws, p->txaio);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static void
+ws_pipe_fini(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+
+ if (p->ws) {
+ nni_ws_fini(p->ws);
+ }
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+static void
+ws_pipe_close(void *arg)
+{
+ ws_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ nni_ws_close(p->ws);
+ nni_mtx_unlock(&p->mtx);
+}
+
+static int
+ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
+{
+ ws_pipe *p;
+ int rv;
+ nni_aio *aio;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ if (((rv = nni_aio_init(&p->txaio, ws_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, ws_pipe_recv_cb, p)) != 0)) {
+ ws_pipe_fini(p);
+ return (rv);
+ }
+
+ p->mode = ep->mode;
+ p->rcvmax = ep->rcvmax;
+ // p->addr = ep->addr;
+ p->rproto = ep->rproto;
+ p->lproto = ep->lproto;
+ p->ws = ws;
+
+ if ((aio = nni_list_first(&ep->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_pipe(aio, p);
+ }
+
+ *pipep = p;
+ return (0);
+}
+
+static uint16_t
+ws_pipe_peer(void *arg)
+{
+ ws_pipe *p = arg;
+
+ return (p->rproto);
+}
+
+static void
+ws_pipe_start(void *arg, nni_aio *aio)
+{
+ if (nni_aio_start(aio, NULL, NULL) == 0) {
+ nni_aio_finish(aio, 0, 0);
+ }
+}
+
+// We have very different approaches for server and client.
+// Servers use the HTTP server framework, and a request methodology.
+
+static int
+ws_ep_bind(void *arg)
+{
+ ws_ep *ep = arg;
+ return (nni_ws_listener_listen(ep->listener));
+}
+
+static void
+ws_ep_cancel(nni_aio *aio, int rv)
+{
+ ws_ep *ep = aio->a_prov_data;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_accept(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+
+ // We already bound, so we just need to look for an available
+ // pipe (created by the handler), and match it.
+ // Otherwise we stick the AIO in the accept list.
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_list_append(&ep->aios, aio);
+ if (aio == nni_list_first(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, ep->accaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_connect(void *arg, nni_aio *aio)
+{
+ ws_ep *ep = arg;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+
+ // If we can't start, then its dying and we can't report
+ // either.
+ if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ nni_list_append(&ep->aios, aio);
+ nni_ws_dialer_dial(ep->dialer, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz)
+{
+ ws_ep *ep = arg;
+ if (ep == NULL) {
+ return (nni_chkopt_size(v, sz, 0, NNI_MAXSZ));
+ }
+ return (nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ));
+}
+
+static int
+ws_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp)
+{
+ ws_ep *ep = arg;
+ return (nni_getopt_size(ep->rcvmax, v, szp));
+}
+
+static nni_tran_pipe_option ws_pipe_options[] = {
+#if 0
+ // clang-format off
+ { NNG_OPT_LOCADDR, ws_pipe_getopt_locaddr },
+ { NNG_OPT_REMADDR, ws_pipe_getopt_remaddr },
+ // clang-format on
+#endif
+ // terminate list
+ { NULL, NULL }
+};
+
+static nni_tran_pipe ws_pipe_ops = {
+ .p_fini = ws_pipe_fini,
+ .p_start = ws_pipe_start,
+ .p_send = ws_pipe_send,
+ .p_recv = ws_pipe_recv,
+ .p_close = ws_pipe_close,
+ .p_peer = ws_pipe_peer,
+ .p_options = ws_pipe_options,
+};
+
+static nni_tran_ep_option ws_ep_options[] = {
+ {
+ .eo_name = NNG_OPT_RECVMAXSZ,
+ .eo_getopt = ws_ep_getopt_recvmaxsz,
+ .eo_setopt = ws_ep_setopt_recvmaxsz,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
+static void
+ws_ep_fini(void *arg)
+{
+ ws_ep *ep = arg;
+
+ nni_aio_stop(ep->accaio);
+ nni_aio_stop(ep->connaio);
+ nni_aio_fini(ep->accaio);
+ nni_aio_fini(ep->connaio);
+ if (ep->listener != NULL) {
+ nni_ws_listener_fini(ep->listener);
+ }
+ if (ep->dialer != NULL) {
+ nni_ws_dialer_fini(ep->dialer);
+ }
+ nni_strfree(ep->addr);
+ nni_strfree(ep->protoname);
+ nni_mtx_fini(&ep->mtx);
+ NNI_FREE_STRUCT(ep);
+}
+
+static void
+ws_ep_conn_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ ws_pipe *p;
+ nni_aio *caio = ep->connaio;
+ nni_aio *uaio;
+ int rv;
+ nni_ws * ws = NULL;
+
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_result(caio) == 0) {
+ ws = nni_aio_get_pipe(caio);
+ }
+ if ((uaio = nni_list_first(&ep->aios)) == NULL) {
+ // The client stopped caring about this!
+ if (ws != NULL) {
+ nni_ws_fini(ws);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ nni_aio_list_remove(uaio);
+ NNI_ASSERT(nni_list_empty(&ep->aios));
+ if ((rv = nni_aio_result(caio)) != 0) {
+ nni_aio_finish_error(uaio, rv);
+ } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_fini(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ws_ep_close(void *arg)
+{
+ ws_ep *ep = arg;
+
+ if (ep->mode == NNI_EP_MODE_LISTEN) {
+ nni_ws_listener_close(ep->listener);
+ } else {
+ nni_ws_dialer_close(ep->dialer);
+ }
+}
+
+static void
+ws_ep_acc_cb(void *arg)
+{
+ ws_ep * ep = arg;
+ nni_aio *aaio = ep->accaio;
+ nni_aio *uaio;
+ int rv;
+
+ nni_mtx_lock(&ep->mtx);
+ uaio = nni_list_first(&ep->aios);
+ if ((rv = nni_aio_result(aaio)) != 0) {
+ if (uaio != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ } else {
+ nni_ws *ws = nni_aio_get_pipe(aaio);
+ if (uaio != NULL) {
+ ws_pipe *p;
+ // Make a pipe
+ nni_aio_list_remove(uaio);
+ if ((rv = ws_pipe_init(&p, ep, ws)) != 0) {
+ nni_ws_close(ws);
+ nni_aio_finish_error(uaio, rv);
+ } else {
+ nni_aio_finish_pipe(uaio, p);
+ }
+ }
+ }
+ if (!nni_list_empty(&ep->aios)) {
+ nni_ws_listener_accept(ep->listener, aaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static int
+ws_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
+{
+ ws_ep * ep;
+ const char *pname;
+ int rv;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&ep->mtx);
+
+ // List of pipes (server only).
+ nni_aio_list_init(&ep->aios);
+
+ ep->mode = mode;
+ ep->lproto = nni_sock_proto(sock);
+ ep->rproto = nni_sock_peer(sock);
+
+ if (mode == NNI_EP_MODE_DIAL) {
+ pname = nni_sock_peer_name(sock);
+ rv = nni_ws_dialer_init(&ep->dialer, url);
+ } else {
+ pname = nni_sock_proto_name(sock);
+ rv = nni_ws_listener_init(&ep->listener, url);
+ }
+
+ if ((rv == 0) && ((ep->addr = nni_strdup(url)) == NULL)) {
+ rv = NNG_ENOMEM;
+ }
+ if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) !=
+ 0)) {
+ ws_ep_fini(ep);
+ return (rv);
+ }
+
+ *epp = ep;
+ return (0);
+}
+static int
+ws_tran_init(void)
+{
+ return (0);
+}
+
+static void
+ws_tran_fini(void)
+{
+}
+
+static nni_tran_ep ws_ep_ops = {
+ .ep_init = ws_ep_init,
+ .ep_fini = ws_ep_fini,
+ .ep_connect = ws_ep_connect,
+ .ep_bind = ws_ep_bind,
+ .ep_accept = ws_ep_accept,
+ .ep_close = ws_ep_close,
+ .ep_options = ws_ep_options,
+};
+
+static nni_tran ws_tran = {
+ .tran_version = NNI_TRANSPORT_VERSION,
+ .tran_scheme = "ws",
+ .tran_ep = &ws_ep_ops,
+ .tran_pipe = &ws_pipe_ops,
+ .tran_init = ws_tran_init,
+ .tran_fini = ws_tran_fini,
+};
+
+int
+nng_ws_register(void)
+{
+ return (nni_tran_register(&ws_tran));
+}
diff --git a/src/transport/ws/websocket.h b/src/transport/ws/websocket.h
new file mode 100644
index 00000000..1beb6156
--- /dev/null
+++ b/src/transport/ws/websocket.h
@@ -0,0 +1,62 @@
+//
+// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_WS_WEBSOCKET_H
+#define NNG_TRANSPORT_WS_WEBSOCKET_H
+
+// TLS transport. This is used for communication via TLS v1.2 over TCP/IP.
+
+NNG_DECL int nng_ws_register(void);
+
+// TLS options. Note that these can only be set *before* the endpoint is
+// started. Once started, it is no longer possible to alter the TLS
+// configuration.
+
+// NNG_OPT_TLS_CA_CERT is a string with one or more X.509 certificates,
+// representing the entire CA chain. The content may be either PEM or DER
+// encoded.
+#define NNG_OPT_TLS_CA_CERT "tls:ca-cert"
+
+// NNG_OPT_TLS_CRL is a PEM encoded CRL (revocation list). Multiple lists
+// may be loaded by using this option multiple times.
+#define NNG_OPT_TLS_CRL "tls:crl"
+
+// NNG_OPT_TLS_CERT is used to specify our own certificate. At present
+// only one certificate may be supplied. (In the future it may be
+// possible to call this multiple times, for servers that select different
+// certificates depending upon client capabilities.)
+#define NNG_OPT_TLS_CERT "tls:cert"
+
+// NNG_OPT_TLS_PRIVATE_KEY is used to specify the private key used
+// with the given certificate. This should be called after setting
+// the certificate. The private key may be in PEM or DER format.
+// If in PEM encoded, a terminating ZERO byte should be included.
+#define NNG_OPT_TLS_PRIVATE_KEY "tls:private-key"
+
+// NNG_OPT_TLS_PRIVATE_KEY_PASSWORD is used to specify a password
+// used for the private key. The value is an ASCIIZ string.
+#define NNG_OPT_TLS_PRIVATE_KEY_PASSWORD "tls:private-key-password"
+
+// NNG_OPT_TLS_AUTH_MODE is an integer indicating whether our
+// peer should be verified or not. It is required on clients/dialers,
+// and off on servers/listeners, by default.
+#define NNG_OPT_TLS_AUTH_MODE "tls:auth-mode"
+
+extern int nng_tls_auth_mode_required;
+extern int nng_tls_auth_mode_none;
+extern int nng_tls_auth_mode_optional;
+
+// NNG_OPT_TLS_AUTH_VERIFIED is a boolean that can be read on pipes,
+// indicating whether the peer certificate is verified.
+#define NNG_OPT_TLS_AUTH_VERIFIED "tls:auth-verified"
+
+// XXX: TBD: Ciphersuite selection and reporting. Session reuse?
+
+#endif // NNG_TRANSPORT_WS_WEBSOCKET_H
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 607c353c..16a29da1 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1292,7 +1292,7 @@ zt_wire_packet_send_cb(void *arg)
nni_aio * aio = arg;
zt_send_hdr *hdr;
- hdr = nni_aio_get_data(aio);
+ hdr = nni_aio_get_data(aio, 0);
nni_free(hdr, hdr->len + sizeof(*hdr));
nni_aio_fini_cb(aio);
}
@@ -1355,7 +1355,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
buf += sizeof(*hdr);
memcpy(buf, data, len);
- nni_aio_set_data(aio, hdr);
+ nni_aio_set_data(aio, hdr, 0);
hdr->sa = addr;
hdr->len = len;
@@ -2001,7 +2001,7 @@ zt_pipe_ping_cb(void *arg)
}
if (p->zp_ping_try < p->zp_ping_count) {
nni_time now = nni_clock();
- nni_aio_set_timeout(aio, now + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
// We want pings. We only send one if needed, but we
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
@@ -2034,7 +2034,7 @@ zt_pipe_start(void *arg, nni_aio *aio)
if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) &&
(p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
- nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time);
+ nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
0) {
p->zp_ping_active = 1;
@@ -2456,7 +2456,7 @@ zt_ep_conn_req_cb(void *arg)
}
if (nni_list_first(&ep->ze_aios) != NULL) {
- nni_aio_set_timeout(aio, nni_clock() + zt_conn_interval);
+ nni_aio_set_timeout(aio, zt_conn_interval);
if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
ep->ze_creq_active = 1;
ep->ze_creq_try++;
@@ -2479,8 +2479,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&zt_lk);
if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_time now = nni_clock();
- int rv;
+ int rv;
// Clear the port so we get an ephemeral port.
ep->ze_laddr &= ~((uint64_t) zt_port_mask);
@@ -2498,7 +2497,7 @@ zt_ep_connect(void *arg, nni_aio *aio)
ep->ze_running = 1;
- nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval);
+ nni_aio_set_timeout(ep->ze_creq_aio, zt_conn_interval);
if (nni_aio_start(
ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {