diff options
50 files changed, 6709 insertions, 146 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 27f0321e..e123c3fd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -168,6 +168,11 @@ if (NNG_TRANSPORT_TCP) add_definitions (-DNNG_HAVE_TCP) endif () +option (NNG_TRANSPORT_WS "Enable WebSocket transport." ON) +if (NNG_TRANSPORT_WS) + add_definitions (-DNNG_HAVE_WEBSOCKET) +endif () + option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) if (NNG_TRANSPORT_ZEROTIER) add_definitions (-DNNG_HAVE_ZEROTIER) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 93fa390d..6fae89e0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,6 +62,8 @@ set (NNG_SOURCES core/protocol.h core/random.c core/random.h + core/reap.c + core/reap.h core/socket.c core/socket.h core/strs.c @@ -120,8 +122,10 @@ if (NNG_PLATFORM_WINDOWS) endif() add_subdirectory(supplemental/base64) +add_subdirectory(supplemental/http) add_subdirectory(supplemental/mbedtls) add_subdirectory(supplemental/sha1) +add_subdirectory(supplemental/websocket) add_subdirectory(protocol/bus0) add_subdirectory(protocol/pair0) @@ -135,6 +139,7 @@ add_subdirectory(transport/inproc) add_subdirectory(transport/ipc) add_subdirectory(transport/tcp) add_subdirectory(transport/tls) +add_subdirectory(transport/ws) add_subdirectory(transport/zerotier) include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src diff --git a/src/core/aio.c b/src/core/aio.c index cec2ff7c..6ce5641d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -1,6 +1,7 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -66,7 +67,8 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) } memset(aio, 0, sizeof(*aio)); nni_cv_init(&aio->a_cv, &nni_aio_lk); - aio->a_expire = NNI_TIME_NEVER; + aio->a_expire = NNI_TIME_NEVER; + aio->a_timeout = NNG_DURATION_INFINITE; if (arg == NULL) { arg = aio; } @@ -116,9 +118,9 @@ nni_aio_stop(nni_aio *aio) } void -nni_aio_set_timeout(nni_aio *aio, nni_time when) +nni_aio_set_timeout(nni_aio *aio, nni_duration when) { - aio->a_expire = when; + aio->a_timeout = when; } void @@ -158,15 +160,54 @@ nni_aio_get_ep(nni_aio *aio) } void -nni_aio_set_data(nni_aio *aio, void *data) +nni_aio_set_data(nni_aio *aio, int index, void *data) { - aio->a_data = data; + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) { + aio->a_user_data[index] = data; + } } void * -nni_aio_get_data(nni_aio *aio) +nni_aio_get_data(nni_aio *aio, int index) +{ + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) { + return (aio->a_user_data[index]); + } + return (NULL); +} + +void +nni_aio_set_input(nni_aio *aio, int index, void *data) { - return (aio->a_data); + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) { + aio->a_inputs[index] = data; + } +} + +void * +nni_aio_get_input(nni_aio *aio, int index) +{ + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_inputs))) { + return (aio->a_inputs[index]); + } + return (NULL); +} + +void +nni_aio_set_output(nni_aio *aio, int index, void *data) +{ + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) { + aio->a_outputs[index] = data; + } +} + +void * +nni_aio_get_output(nni_aio *aio, int index) +{ + if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_outputs))) { + return (aio->a_outputs[index]); + } + return (NULL); } int @@ -219,8 +260,21 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; aio->a_active = 1; - if (aio->a_expire != NNI_TIME_NEVER) { + + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + aio->a_expire = NNI_TIME_ZERO; + nni_aio_expire_add(aio); + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; nni_aio_expire_add(aio); + break; } nni_mtx_unlock(&nni_aio_lk); return (0); diff --git a/src/core/aio.h b/src/core/aio.h index 3bdcf433..c4c09421 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -1,6 +1,7 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -22,9 +23,10 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, int); // An nni_aio is an async I/O handle. struct nni_aio { - int a_result; // Result code (nng_errno) - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; + int a_result; // Result code (nng_errno) + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout // These fields are private to the aio framework. nni_cv a_cv; @@ -35,8 +37,6 @@ struct nni_aio { unsigned a_expiring : 1; // expiration callback in progress unsigned a_waiting : 1; // a thread is waiting for this to finish unsigned a_synch : 1; // run completion synchronously - unsigned a_reltime : 1; // expiration time is relative - unsigned a_pad : 25; // ensure 32-bit alignment nni_task a_task; // Read/write operations. @@ -53,13 +53,21 @@ struct nni_aio { // Resolver operations. nni_sockaddr *a_addr; - // Extra user data. - void *a_data; + // User scratch data. Consumers may store values here, which + // must be preserved by providers and the framework. + void *a_user_data[4]; + + // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be + // specified. The semantics of these will vary, and depend on the + // specific operation. + void *a_inputs[4]; + void *a_outputs[4]; // Provider-use fields. nni_aio_cancelfn a_prov_cancel; void * a_prov_data; nni_list_node a_prov_node; + void * a_prov_extra[4]; // Extra data used by provider // Expire node. nni_list_node a_expire_node; @@ -96,12 +104,32 @@ extern void nni_aio_stop(nni_aio *); // nni_aio_set_data sets user data. This should only be done by the // consumer, initiating the I/O. The intention is to be able to store // additional data for use when the operation callback is executed. -extern void nni_aio_set_data(nni_aio *, void *); +// The index represents the "index" at which to store the data. A maximum +// of 4 elements can be stored with the (index >= 0 && index < 4). +extern void nni_aio_set_data(nni_aio *, int, void *); // nni_aio_get_data returns the user data that was previously stored // with nni_aio_set_data. -extern void *nni_aio_get_data(nni_aio *); +extern void *nni_aio_get_data(nni_aio *, int); + +// nni_set_input sets input parameters on the AIO. The semantic details +// of this will be determined by the specific AIO operation. AIOs can +// carry up to 4 input parameters. +extern void nni_aio_set_input(nni_aio *, int, void *); + +// nni_get_input returns the input value stored by nni_aio_set_input. +extern void *nni_aio_get_input(nni_aio *, int); + +// nni_set_output sets output results on the AIO, allowing providers to +// return results to consumers. The semantic details are determined by +// the AIO operation. Up to 4 outputs can be carried on an AIO. +extern void nni_aio_set_output(nni_aio *, int, void *); + +// nni_get_output returns an output previously stored on the AIO. +extern void *nni_aio_get_output(nni_aio *, int); +// XXX: These should be refactored in terms of the generic inputs and +// outputs. extern void nni_aio_set_msg(nni_aio *, nni_msg *); extern nni_msg *nni_aio_get_msg(nni_aio *); extern void nni_aio_set_pipe(nni_aio *, void *); @@ -122,10 +150,10 @@ extern void * nni_aio_get_ep(nni_aio *); // completion callback. void nni_aio_set_synch(nni_aio *); -// nni_aio_set_timeout sets the timeout (absolute) when the AIO will +// nni_aio_set_timeout sets the timeout (relative) when the AIO will // be canceled. The cancelation does not happen until after nni_aio_start // is called. -extern void nni_aio_set_timeout(nni_aio *, nni_time); +extern void nni_aio_set_timeout(nni_aio *, nni_duration); // nni_aio_result returns the result code (0 on success, or an NNG errno) // for the operation. It is only valid to call this when the operation is diff --git a/src/core/defs.h b/src/core/defs.h index 5a9ded92..3a714f85 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -25,6 +25,9 @@ #define NNI_ASSERT(x) #endif +// Returns the size of an array in elements. (Convenience.) +#define NNI_NUM_ELEMENTS(x) (sizeof(x) / sizeof((x)[0])) + // These types are common but have names shared with user space. typedef struct nng_msg nni_msg; typedef struct nng_sockaddr nni_sockaddr; diff --git a/src/core/device.c b/src/core/device.c index 2e000d7e..e6b75897 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -159,7 +159,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) return (rv); } - nni_aio_set_timeout(p->aio, NNI_TIME_NEVER); + nni_aio_set_timeout(p->aio, NNG_DURATION_INFINITE); } dd->npath = npath; *dp = dd; diff --git a/src/core/endpt.c b/src/core/endpt.c index fa30bf77..3058f5c0 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -333,8 +333,8 @@ nni_ep_tmo_start(nni_ep *ep) // have a statistically perfect distribution with the modulo of // the random number, but this really doesn't matter. - nni_aio_set_timeout(ep->ep_tmo_aio, - nni_clock() + (backoff ? nni_random() % backoff : 0)); + nni_aio_set_timeout( + ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); ep->ep_tmo_run = 1; if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) { diff --git a/src/core/init.c b/src/core/init.c index 4a7fd974..cb6dbeee 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,6 +1,7 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -10,15 +11,25 @@ #include "core/nng_impl.h" +#include <stdbool.h> #include <stdio.h> #include <stdlib.h> +static nni_mtx nni_init_mtx; +static nni_list nni_init_list; +static bool nni_inited = false; + static int nni_init_helper(void) { int rv; + nni_mtx_init(&nni_init_mtx); + NNI_LIST_INIT(&nni_init_list, nni_initializer, i_node); + nni_inited = true; + if (((rv = nni_taskq_sys_init()) != 0) || + ((rv = nni_reap_sys_init()) != 0) || ((rv = nni_timer_sys_init()) != 0) || ((rv = nni_aio_sys_init()) != 0) || ((rv = nni_random_sys_init()) != 0) || @@ -29,6 +40,7 @@ nni_init_helper(void) ((rv = nni_tran_sys_init()) != 0)) { nni_fini(); } + return (rv); } @@ -41,14 +53,54 @@ nni_init(void) void nni_fini(void) { + if (!nni_inited) { + return; + } + if (!nni_list_empty(&nni_init_list)) { + nni_initializer *init; + + nni_mtx_lock(&nni_init_mtx); + while ((init = nni_list_first(&nni_init_list)) != NULL) { + if (init->i_fini != NULL) { + init->i_fini(); + } + init->i_once = 0; + nni_list_remove(&nni_init_list, init); + } + nni_mtx_unlock(&nni_init_mtx); + } nni_tran_sys_fini(); nni_proto_sys_fini(); nni_pipe_sys_fini(); nni_ep_sys_fini(); nni_sock_sys_fini(); nni_random_sys_fini(); + nni_reap_sys_fini(); // must be before timer and aio (expire) nni_aio_sys_fini(); nni_timer_sys_fini(); nni_taskq_sys_fini(); + + nni_mtx_fini(&nni_init_mtx); nni_plat_fini(); + nni_inited = false; +} + +int +nni_initialize(nni_initializer *init) +{ + int rv; + if (init->i_once) { + return (0); + } + nni_mtx_lock(&nni_init_mtx); + if (init->i_once) { + nni_mtx_unlock(&nni_init_mtx); + return (0); + } + if ((rv = init->i_init()) == 0) { + init->i_once = 1; + nni_list_append(&nni_init_list, init); + } + nni_mtx_unlock(&nni_init_mtx); + return (rv); } diff --git a/src/core/init.h b/src/core/init.h index ffcebf64..d21bb4c5 100644 --- a/src/core/init.h +++ b/src/core/init.h @@ -1,5 +1,7 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -21,4 +23,23 @@ int nni_init(void); // that all resources used by the library are released back to the system. void nni_fini(void); +typedef struct nni_initializer { + int (*i_init)(void); // i_init is called exactly once + void (*i_fini)(void); // i_fini is called on shutdown + int i_once; // private -- initialize to zero + nni_list_node i_node; // private -- initialize to zero +} nni_initializer; + +// nni_initialize will call the initialization routine exactly once. This is +// done efficiently, so that if the caller has initialized already, then +// subsequent calls are "cheap" (no synchronization cost). The initialization +// function must not itself cause any further calls to nni_initialize; the +// function should limit itself to initialization of locks and static data +// structures. When shutting down, the finalizer will be called. The +// order in which finalizers are called is unspecified. +// +// An initializer may fail (due to resource exhaustion), in which case the +// return value of nni_initialize will be non-zero. +int nni_initialize(nni_initializer *); + #endif // CORE_INIT_H diff --git a/src/core/list.c b/src/core/list.c index 8b26b64a..f489a705 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -1,5 +1,7 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -151,7 +153,10 @@ nni_list_active(nni_list *list, void *item) int nni_list_empty(nni_list *list) { - return (list->ll_head.ln_next == &list->ll_head); + // The first check ensures that we treat an uninitialized list + // as empty. This use useful for statically initialized lists. + return ((list->ll_head.ln_next == NULL) || + (list->ll_head.ln_next == &list->ll_head)); } int diff --git a/src/core/message.c b/src/core/message.c index b44dfdf6..35153f01 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -9,7 +9,6 @@ // #include <stdio.h> -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 87dd2de1..1f2297c1 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -38,6 +38,7 @@ #include "core/panic.h" #include "core/protocol.h" #include "core/random.h" +#include "core/reap.h" #include "core/strs.h" #include "core/taskq.h" #include "core/thread.h" diff --git a/src/core/reap.c b/src/core/reap.c new file mode 100644 index 00000000..8191dba3 --- /dev/null +++ b/src/core/reap.c @@ -0,0 +1,89 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include "reap.h" + +#include <stdbool.h> + +static nni_list nni_reap_list; +static nni_mtx nni_reap_mtx; +static nni_cv nni_reap_cv; +static bool nni_reap_exit = false; +static nni_thr nni_reap_thr; + +static void +nni_reap_stuff(void *notused) +{ + NNI_ARG_UNUSED(notused); + + nni_mtx_lock(&nni_reap_mtx); + for (;;) { + nni_reap_item *item; + if ((item = nni_list_first(&nni_reap_list)) != NULL) { + nni_list_remove(&nni_reap_list, item); + nni_mtx_unlock(&nni_reap_mtx); + + item->r_func(item->r_ptr); + nni_mtx_lock(&nni_reap_mtx); + continue; + } + + if (nni_reap_exit) { + break; + } + + nni_cv_wait(&nni_reap_cv); + } + nni_mtx_unlock(&nni_reap_mtx); +} + +void +nni_reap(nni_reap_item *item, nni_cb func, void *ptr) +{ + nni_mtx_lock(&nni_reap_mtx); + item->r_func = func; + item->r_ptr = ptr; + nni_list_append(&nni_reap_list, item); + nni_cv_wake(&nni_reap_cv); + nni_mtx_unlock(&nni_reap_mtx); +} + +int +nni_reap_sys_init(void) +{ + int rv; + + NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link); + nni_mtx_init(&nni_reap_mtx); + nni_cv_init(&nni_reap_cv, &nni_reap_mtx); + nni_reap_exit = false; + + // If this fails, we don't fail init, instead we will try to + // start up at reap time. + if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) { + nni_cv_fini(&nni_reap_cv); + nni_mtx_fini(&nni_reap_mtx); + return (rv); + } + nni_thr_run(&nni_reap_thr); + return (0); +} + +void +nni_reap_sys_fini(void) +{ + nni_mtx_lock(&nni_reap_mtx); + nni_reap_exit = true; + nni_cv_wake(&nni_reap_cv); + nni_mtx_unlock(&nni_reap_mtx); + nni_thr_fini(&nni_reap_thr); +} diff --git a/src/core/reap.h b/src/core/reap.h new file mode 100644 index 00000000..fbc008b2 --- /dev/null +++ b/src/core/reap.h @@ -0,0 +1,36 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_REAP_H +#define CORE_REAP_H + +#include "core/defs.h" +#include "core/list.h" + +// nni_reap_item is defined here so that it can be inlined into +// structures. Callers must access its members directly. +typedef struct nni_reap_item { + nni_list_node r_link; + void * r_ptr; + nni_cb r_func; +} nni_reap_item; + +// nni_reap performs an asynchronous reap of an item. This allows functions +// it calls to acquire locks or resources without worrying about deadlocks +// (such as from a completion callback.) The called function should avoid +// blocking for too long if possible, since only one reap thread is present +// in the system. The intended usage is for an nni_reap_item to be a member +// of the structure to be reaped, and and then this function is called to +// finalize it. +extern void nni_reap(nni_reap_item *, nni_cb, void *); +extern int nni_reap_sys_init(void); +extern void nni_reap_sys_fini(void); + +#endif // CORE_REAP_H diff --git a/src/core/socket.c b/src/core/socket.c index 54e1bd6a..409e4f66 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -775,22 +775,8 @@ nni_sock_closeall(void) static void nni_sock_normalize_expiration(nni_aio *aio, nni_duration def) { - if (aio->a_reltime) { - if (aio->a_expire == (nni_time) -2) { - aio->a_expire = def; - } - switch (aio->a_expire) { - case (nni_time) 0: - aio->a_expire = NNI_TIME_ZERO; - break; - case (nni_time) -1: - aio->a_expire = NNI_TIME_NEVER; - break; - default: - aio->a_expire = nni_clock() + aio->a_expire; - break; - } - aio->a_reltime = 0; + if (aio->a_timeout == (nni_duration) -2) { + aio->a_timeout = def; } } @@ -821,6 +807,18 @@ nni_sock_peer(nni_sock *sock) return (sock->s_peer_id.p_id); } +const char * +nni_sock_proto_name(nni_sock *sock) +{ + return (sock->s_self_id.p_name); +} + +const char * +nni_sock_peer_name(nni_sock *sock) +{ + return (sock->s_peer_id.p_name); +} + void nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) { diff --git a/src/core/socket.h b/src/core/socket.h index 52310ef3..37c67436 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,14 +14,16 @@ extern int nni_sock_sys_init(void); extern void nni_sock_sys_fini(void); -extern int nni_sock_find(nni_sock **, uint32_t); -extern void nni_sock_rele(nni_sock *); -extern int nni_sock_open(nni_sock **, const nni_proto *); -extern void nni_sock_close(nni_sock *); -extern void nni_sock_closeall(void); -extern int nni_sock_shutdown(nni_sock *); -extern uint16_t nni_sock_proto(nni_sock *); -extern uint16_t nni_sock_peer(nni_sock *); +extern int nni_sock_find(nni_sock **, uint32_t); +extern void nni_sock_rele(nni_sock *); +extern int nni_sock_open(nni_sock **, const nni_proto *); +extern void nni_sock_close(nni_sock *); +extern void nni_sock_closeall(void); +extern int nni_sock_shutdown(nni_sock *); +extern uint16_t nni_sock_proto(nni_sock *); +extern uint16_t nni_sock_peer(nni_sock *); +extern const char *nni_sock_proto_name(nni_sock *); +extern const char *nni_sock_peer_name(nni_sock *); extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t); extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *); extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int); diff --git a/src/core/strs.c b/src/core/strs.c index 6cee605b..a03c0bb5 100644 --- a/src/core/strs.c +++ b/src/core/strs.c @@ -8,6 +8,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include <ctype.h> +#include <stdarg.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> @@ -17,35 +20,28 @@ // part of standard C99. (C11 has added some things here, but we cannot // count on them.) +// Note that we supply our own version of strdup and strfree unconditionally, +// so that these can be freed with nni_free(strlen(s)+1) if desired. (Likewise +// a string buffer allocated with nni_alloc can be freed with nni_strfree +// provided the length is correct.) + char * nni_strdup(const char *src) { -#ifdef NNG_HAVE_STRDUP -#ifdef _WIN32 - return (_strdup(src)); -#else - return (strdup(src)); -#endif -#else char * dst; - size_t len = strlen(src); + size_t len = strlen(src) + 1; if ((dst = nni_alloc(len)) != NULL) { memcpy(dst, src, len); } return (dst); -#endif } void nni_strfree(char *s) { if (s != NULL) { -#ifdef NNG_HAVE_STRDUP - free(s); -#else nni_free(s, strlen(s) + 1); -#endif } } @@ -114,3 +110,72 @@ nni_strnlen(const char *s, size_t len) return (n); #endif } + +char * +nni_strcasestr(const char *s1, const char *s2) +{ +#ifdef NNG_HAVE_STRCASESTR + return (strcasestr(s1, s2)); +#else + const char *t1, *t2; + while (*s1) { + for (t1 = s1, t2 = s2; *t1 && *t2; t2++, t1++) { + if (tolower(*t1) != tolower(*t2)) { + break; + } + } + if (*t2 == 0) { + return ((char *) s1); + } + s1++; + } + return (NULL); +#endif +} + +int +nni_strncasecmp(const char *s1, const char *s2, size_t n) +{ +#ifdef NNG_HAVE_STRNCASECMP +#ifdef _WIN32 + return (_strnicmp(s1, s2, n)); +#else + return (strncasecmp(s1, s2, n)); +#endif +#else + for (int i = 0; i < n; i++) { + uint8_t c1 = (uint8_t) tolower(*s1++); + uint8_t c2 = (uint8_t) tolower(*s2++); + if (c1 == c2) { + if (c1 == 0) { + return (0); + } + continue; + } + return (c1 < c2 ? -1 : 1); + } + return (0); +#endif +} + +int +nni_asprintf(char **sp, const char *fmt, ...) +{ + va_list ap; + size_t len; + char * s; + + va_start(ap, fmt); + len = vsnprintf(NULL, 0, fmt, ap); + va_end(ap); + len++; + + if ((s = nni_alloc(len)) == NULL) { + return (NNG_ENOMEM); + } + va_start(ap, fmt); + (void) vsnprintf(s, len, fmt, ap); + va_end(ap); + *sp = s; + return (0); +}
\ No newline at end of file diff --git a/src/core/strs.h b/src/core/strs.h index 42ec4997..3b369fe4 100644 --- a/src/core/strs.h +++ b/src/core/strs.h @@ -18,5 +18,8 @@ extern void nni_strfree(char *); extern size_t nni_strlcpy(char *, const char *, size_t); extern size_t nni_strlcat(char *, const char *, size_t); extern size_t nni_strnlen(const char *, size_t); +extern char * nni_strcasestr(const char *, const char *); +extern int nni_strncasecmp(const char *, const char *, size_t); +extern int nni_asprintf(char **, const char *, ...); #endif // CORE_STRS_H diff --git a/src/core/transport.c b/src/core/transport.c index 359b03fd..31da773f 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -13,6 +13,7 @@ #include "transport/ipc/ipc.h" #include "transport/tcp/tcp.h" #include "transport/tls/tls.h" +#include "transport/ws/websocket.h" #include "transport/zerotier/zerotier.h" #include <stdio.h> @@ -105,6 +106,72 @@ nni_tran_find(const char *addr) return (NULL); } +// nni_tran_parse_host_port is a convenience routine to parse the host portion +// of a URL (which includes a DNS name or IP address and an optional service +// name or port, separated by a colon) into its host and port name parts. It +// understands IPv6 address literals when surrounded by brackets ([]). +// If either component is empty, then NULL is passed back for the value, +// otherwise a string suitable for freeing with nni_strfree is supplied. +int +nni_tran_parse_host_port(const char *pair, char **hostp, char **portp) +{ + const char *hstart; + const char *pstart; + char * host; + char * port; + size_t hlen, plen; + + if (pair[0] == '[') { + hstart = pair + 1; + hlen = 0; + while (hstart[hlen] != ']') { + if (hstart[hlen] == '\0') { + return (NNG_EADDRINVAL); + } + hlen++; + } + pstart = hstart + hlen + 1; // skip over the trailing ']' + } else { + // Normal thing. + hstart = pair; + hlen = 0; + while ((hstart[hlen] != ':') && (hstart[hlen] != '\0')) { + hlen++; + } + pstart = hstart + hlen; + } + if (pstart[0] == ':') { + pstart++; + } + plen = strlen(pstart); + + host = NULL; + if (hostp) { + if ((hlen > 1) || ((hlen == 1) && (*hstart != '*'))) { + if ((host = nni_alloc(hlen + 1)) == NULL) { + return (NNG_ENOMEM); + } + memcpy(host, hstart, hlen); + host[hlen] = '\0'; + } + } + + port = NULL; + if ((plen != 0) && (portp)) { + if ((port = nni_strdup(pstart)) == NULL) { + nni_strfree(host); + return (NNG_ENOMEM); + } + } + if (hostp) { + *hostp = host; + } + if (portp) { + *portp = port; + } + return (0); +} + int nni_tran_chkopt(const char *name, const void *v, size_t sz) { @@ -154,9 +221,12 @@ static nni_tran_ctor nni_tran_ctors[] = { #ifdef NNG_HAVE_TLS nng_tls_register, #endif -#ifdef NNI_HAVE_ZEROTIER +#ifdef NNG_HAVE_ZEROTIER nng_zt_register, #endif +#ifdef NNG_HAVE_WEBSOCKET + nng_ws_register, +#endif NULL, }; diff --git a/src/core/transport.h b/src/core/transport.h index b82e2c92..e8a1f620 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -163,6 +163,9 @@ struct nni_tran_pipe { nni_tran_pipe_option *p_options; }; +// Utility for transports. +extern int nni_tran_parse_host_port(const char *, char **, char **); + // These APIs are used by the framework internally, and not for use by // transport implementations. extern nni_tran *nni_tran_find(const char *); @@ -969,8 +969,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { *app = (nng_aio *) aio; } - aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; - aio->a_reltime = 1; + aio->a_timeout = NNG_DURATION_DEFAULT; return (rv); } @@ -1020,11 +1019,8 @@ void nng_aio_set_timeout(nng_aio *ap, nng_duration dur) { // Durations here are relative, since we have no notion of a - // common clock. But underlying aio uses absolute times normally. - // Fortunately the absolute times are big enough; we just have to - // make sure that we "convert" the timeout from relative time to - // absolute time when submitting operations. - nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); + // common clock.. + nni_aio_set_timeout((nni_aio *) ap, dur); } #if 0 diff --git a/src/platform/posix/posix_tcp.c b/src/platform/posix/posix_tcp.c index e03f8056..69d7e7ce 100644 --- a/src/platform/posix/posix_tcp.c +++ b/src/platform/posix/posix_tcp.c @@ -38,11 +38,11 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const nni_sockaddr *lsa, return (rv); } - if (rsa->s_un.s_family != NNG_AF_UNSPEC) { + if ((rsa != NULL) && (rsa->s_un.s_family != NNG_AF_UNSPEC)) { len = nni_posix_nn2sockaddr((void *) &ss, rsa); nni_posix_epdesc_set_remote(ed, &ss, len); } - if (lsa->s_un.s_family != NNG_AF_UNSPEC) { + if ((lsa != NULL) && (lsa->s_un.s_family != NNG_AF_UNSPEC)) { len = nni_posix_nn2sockaddr((void *) &ss, lsa); nni_posix_epdesc_set_local(ed, &ss, len); } diff --git a/src/supplemental/base64/base64.c b/src/supplemental/base64/base64.c index 95a4a491..2f6b4da2 100644 --- a/src/supplemental/base64/base64.c +++ b/src/supplemental/base64/base64.c @@ -1,6 +1,6 @@ // // Copyright (c) 2014 Wirebird Labs LLC. All rights reserved. -// Copyright 2017 Staysail Systems, Inc. +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), diff --git a/src/supplemental/base64/base64.h b/src/supplemental/base64/base64.h index eb186ae0..68346435 100644 --- a/src/supplemental/base64/base64.h +++ b/src/supplemental/base64/base64.h @@ -1,6 +1,6 @@ // // Copyright (c) 2014 Wirebird Labs LLC. All rights reserved. -// Copyright 2017 Staysail Systems, Inc. +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), diff --git a/src/supplemental/http/CMakeLists.txt b/src/supplemental/http/CMakeLists.txt new file mode 100644 index 00000000..dff06d70 --- /dev/null +++ b/src/supplemental/http/CMakeLists.txt @@ -0,0 +1,17 @@ +# +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +set(HTTP_SOURCES + supplemental/http/http.c + supplemental/http/http_msg.c + supplemental/http/server.c + supplemental/http/client.c + supplemental/http/http.h) +set(NNG_SOURCES ${NNG_SOURCES} ${HTTP_SOURCES} PARENT_SCOPE) diff --git a/src/supplemental/http/client.c b/src/supplemental/http/client.c new file mode 100644 index 00000000..374ab5bb --- /dev/null +++ b/src/supplemental/http/client.c @@ -0,0 +1,147 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "http.h" + +struct nni_http_client { + nng_sockaddr addr; + nni_list aios; + nni_mtx mtx; + bool closed; + bool tls; + nni_aio * connaio; + nni_plat_tcp_ep *tep; +}; + +static void +http_conn_start(nni_http_client *c) +{ + nni_plat_tcp_ep_connect(c->tep, c->connaio); +} + +static void +http_conn_done(void *arg) +{ + nni_http_client * c = arg; + nni_aio * aio; + int rv; + nni_plat_tcp_pipe *p; + nni_http_tran t; + nni_http * http; + + nni_mtx_lock(&c->mtx); + rv = nni_aio_result(c->connaio); + p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL; + if ((aio = nni_list_first(&c->aios)) == NULL) { + if (p != NULL) { + nni_plat_tcp_pipe_fini(p); + } + nni_mtx_unlock(&c->mtx); + return; + } + nni_aio_list_remove(aio); + + if (rv != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&c->mtx); + return; + } + + t.h_data = p; + t.h_write = (void *) nni_plat_tcp_pipe_send; + t.h_read = (void *) nni_plat_tcp_pipe_recv; + t.h_close = (void *) nni_plat_tcp_pipe_close; + t.h_fini = (void *) nni_plat_tcp_pipe_fini; + + if ((rv = nni_http_init(&http, &t)) != 0) { + nni_aio_finish_error(aio, rv); + nni_plat_tcp_pipe_fini(p); + nni_mtx_unlock(&c->mtx); + return; + } + + nni_aio_set_output(aio, 0, http); + nni_aio_finish(aio, 0, 0); + + if (!nni_list_empty(&c->aios)) { + http_conn_start(c); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_http_client_fini(nni_http_client *c) +{ + nni_aio_fini(c->connaio); + nni_plat_tcp_ep_fini(c->tep); + nni_mtx_fini(&c->mtx); + NNI_FREE_STRUCT(c); +} + +int +nni_http_client_init(nni_http_client **cp, nng_sockaddr *sa) +{ + int rv; + + nni_http_client *c; + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + c->addr = *sa; + rv = nni_plat_tcp_ep_init(&c->tep, NULL, &c->addr, NNI_EP_MODE_DIAL); + if (rv != 0) { + NNI_FREE_STRUCT(c); + return (rv); + } + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->aios); + + if ((rv = nni_aio_init(&c->connaio, http_conn_done, c)) != 0) { + nni_http_client_fini(c); + return (rv); + } + *cp = c; + return (0); +} + +static void +http_connect_cancel(nni_aio *aio, int rv) +{ + nni_http_client *c = aio->a_prov_data; + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + if (nni_list_empty(&c->aios)) { + nni_aio_cancel(c->connaio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +void +nni_http_client_connect(nni_http_client *c, nni_aio *aio) +{ + if (nni_aio_start(aio, http_connect_cancel, aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + nni_list_append(&c->aios, aio); + if (nni_list_first(&c->aios) == aio) { + http_conn_start(c); + } + nni_mtx_unlock(&c->mtx); +}
\ No newline at end of file diff --git a/src/supplemental/http/http.c b/src/supplemental/http/http.c new file mode 100644 index 00000000..3958a738 --- /dev/null +++ b/src/supplemental/http/http.c @@ -0,0 +1,604 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "http.h" + +// We insist that individual headers fit in 8K. +// If you need more than that, you need something we can't do. +#define HTTP_BUFSIZE 8192 + +// types of reads +enum read_flavor { + HTTP_RD_RAW, + HTTP_RD_FULL, + HTTP_RD_REQ, + HTTP_RD_RES, +}; + +enum write_flavor { + HTTP_WR_RAW, + HTTP_WR_FULL, + HTTP_WR_REQ, + HTTP_WR_RES, +}; + +#define SET_RD_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f)) +#define GET_RD_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0]) +#define SET_WR_FLAVOR(aio, f) (aio)->a_prov_extra[0] = ((void *) (intptr_t)(f)) +#define GET_WR_FLAVOR(aio) (int) ((intptr_t) aio->a_prov_extra[0]) + +struct nni_http { + void *sock; + void (*rd)(void *, nni_aio *); + void (*wr)(void *, nni_aio *); + void (*close)(void *); + void (*fini)(void *); + + bool closed; + + nni_list rdq; // high level http read requests + nni_list wrq; // high level http write requests + + nni_aio *rd_aio; // bottom half read operations + nni_aio *wr_aio; // bottom half write operations + + nni_mtx mtx; + + uint8_t *rd_buf; + size_t rd_get; + size_t rd_put; + size_t rd_bufsz; +}; + +static void +http_close(nni_http *http) +{ + // Call with lock held. + nni_aio *aio; + + if (http->closed) { + return; + } + + http->closed = true; + if (nni_list_first(&http->wrq)) { + nni_aio_cancel(http->wr_aio, NNG_ECLOSED); + while ((aio = nni_list_first(&http->wrq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + if (nni_list_first(&http->rdq)) { + nni_aio_cancel(http->rd_aio, NNG_ECLOSED); + while ((aio = nni_list_first(&http->rdq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + + if (http->sock != NULL) { + http->close(http->sock); + } +} + +void +nni_http_close(nni_http *http) +{ + nni_mtx_lock(&http->mtx); + http_close(http); + nni_mtx_unlock(&http->mtx); +} + +// http_rd_buf attempts to satisfy the read from data in the buffer. +static int +http_rd_buf(nni_http *http, nni_aio *aio) +{ + size_t cnt = http->rd_put - http->rd_get; + size_t n; + uint8_t *rbuf = http->rd_buf; + int i; + int rv; + bool raw = false; + + rbuf += http->rd_get; + + switch (GET_RD_FLAVOR(aio)) { + case HTTP_RD_RAW: + raw = true; // FALLTHROUGH + case HTTP_RD_FULL: + for (i = 0; (aio->a_niov != 0) && (cnt != 0); i++) { + // Pull up data from the buffer if possible. + n = aio->a_iov[0].iov_len; + if (n > cnt) { + n = cnt; + } + memcpy(aio->a_iov[0].iov_buf, rbuf, n); + aio->a_iov[0].iov_len -= n; + aio->a_iov[0].iov_buf += n; + http->rd_get += n; + rbuf += n; + aio->a_count += n; + cnt -= n; + + if (aio->a_iov[0].iov_len == 0) { + aio->a_niov--; + for (i = 0; i < aio->a_niov; i++) { + aio->a_iov[i] = aio->a_iov[i + 1]; + } + } + } + + if ((aio->a_niov == 0) || (raw && (aio->a_count != 0))) { + // Finished the read. (We are finished if we either + // got *all* the data, or we got *some* data for + // a raw read.) + return (0); + } + + // No more data left in the buffer, so use a physio. + // (Note that we get here if we either have not completed + // a full transaction on a FULL read, or were not even able + // to get *any* data for a partial RAW read.) + for (i = 0; i < aio->a_niov; i++) { + http->rd_aio->a_iov[i] = aio->a_iov[i]; + } + nni_aio_set_data(http->rd_aio, 1, NULL); + http->rd_aio->a_niov = aio->a_niov; + http->rd(http->sock, http->rd_aio); + return (NNG_EAGAIN); + + case HTTP_RD_REQ: + rv = nni_http_req_parse(aio->a_prov_extra[1], rbuf, cnt, &n); + http->rd_get += n; + if (http->rd_get == http->rd_put) { + http->rd_get = http->rd_put = 0; + } + if (rv == NNG_EAGAIN) { + http->rd_aio->a_niov = 1; + http->rd_aio->a_iov[0].iov_buf = + http->rd_buf + http->rd_put; + http->rd_aio->a_iov[0].iov_len = + http->rd_bufsz - http->rd_put; + nni_aio_set_data(http->rd_aio, 1, aio); + http->rd(http->sock, http->rd_aio); + } + return (rv); + + case HTTP_RD_RES: + rv = nni_http_res_parse(aio->a_prov_extra[1], rbuf, cnt, &n); + http->rd_get += n; + if (http->rd_get == http->rd_put) { + http->rd_get = http->rd_put = 0; + } + if (rv == NNG_EAGAIN) { + http->rd_aio->a_niov = 1; + http->rd_aio->a_iov[0].iov_buf = + http->rd_buf + http->rd_put; + http->rd_aio->a_iov[0].iov_len = + http->rd_bufsz - http->rd_put; + nni_aio_set_data(http->rd_aio, 1, aio); + http->rd(http->sock, http->rd_aio); + } + return (rv); + } + return (NNG_EINVAL); +} + +static void +http_rd_start(nni_http *http) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&http->rdq)) != NULL) { + int rv; + + if (http->closed) { + rv = NNG_ECLOSED; + } else { + rv = http_rd_buf(http, aio); + } + switch (rv) { + case NNG_EAGAIN: + return; + case 0: + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, aio->a_count); + break; + default: + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + http_close(http); + break; + } + } +} + +static void +http_rd_cb(void *arg) +{ + nni_http *http = arg; + nni_aio * aio = http->rd_aio; + nni_aio * uaio; + size_t cnt; + int rv; + + nni_mtx_lock(&http->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + if ((uaio = nni_list_first(&http->rdq)) != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + http_close(http); + nni_mtx_unlock(&http->mtx); + return; + } + + cnt = nni_aio_count(aio); + + // If we were reading into the buffer, then advance location(s). + if ((uaio = nni_aio_get_data(aio, 1)) != NULL) { + http->rd_put += cnt; + NNI_ASSERT(http->rd_put <= http->rd_bufsz); + http_rd_start(http); + nni_mtx_unlock(&http->mtx); + return; + } + + // Otherwise we are completing a USER request, and there should + // be no data left in the user buffer. + NNI_ASSERT(http->rd_get == http->rd_put); + + uaio = nni_list_first(&http->rdq); + NNI_ASSERT(uaio != NULL); + + for (int i = 0; (uaio->a_niov != 0) && (cnt != 0); i++) { + // Pull up data from the buffer if possible. + size_t n = uaio->a_iov[0].iov_len; + if (n > cnt) { + n = cnt; + } + uaio->a_iov[0].iov_len -= n; + uaio->a_iov[0].iov_buf += n; + uaio->a_count += n; + cnt -= n; + + if (uaio->a_iov[0].iov_len == 0) { + uaio->a_niov--; + for (i = 0; i < uaio->a_niov; i++) { + uaio->a_iov[i] = uaio->a_iov[i + 1]; + } + } + } + + // Resubmit the start. This will attempt to consume data + // from the read buffer (there won't be any), and then either + // complete the I/O (for HTTP_RD_RAW, or if there is nothing left), + // or submit another physio. + http_rd_start(http); + nni_mtx_unlock(&http->mtx); +} + +static void +http_rd_cancel(nni_aio *aio, int rv) +{ + nni_http *http = aio->a_prov_data; + + nni_mtx_lock(&http->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + if (aio == nni_list_first(&http->rdq)) { + http_close(http); + } + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&http->mtx); +} + +static void +http_rd_submit(nni_http *http, nni_aio *aio) +{ + if (nni_aio_start(aio, http_rd_cancel, http) != 0) { + return; + } + if (http->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + nni_list_append(&http->rdq, aio); + if (nni_list_first(&http->rdq) == aio) { + http_rd_start(http); + } +} + +static void +http_wr_start(nni_http *http) +{ + nni_aio *aio; + + if ((aio = nni_list_first(&http->wrq)) != NULL) { + + for (int i = 0; i < aio->a_niov; i++) { + http->wr_aio->a_iov[i] = aio->a_iov[i]; + } + http->wr_aio->a_niov = aio->a_niov; + http->wr(http->sock, http->wr_aio); + } +} + +static void +http_wr_cb(void *arg) +{ + nni_http *http = arg; + nni_aio * aio = http->wr_aio; + nni_aio * uaio; + int rv; + size_t n; + + nni_mtx_lock(&http->mtx); + + uaio = nni_list_first(&http->wrq); + + if ((rv = nni_aio_result(aio)) != 0) { + // We failed to complete the aio. + if (uaio != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + http_close(http); + nni_mtx_unlock(&http->mtx); + return; + } + + if (uaio == NULL) { + // write canceled? + nni_mtx_unlock(&http->mtx); + return; + } + + n = nni_aio_count(aio); + uaio->a_count += n; + if (GET_WR_FLAVOR(uaio) == HTTP_WR_RAW) { + // For raw data, we just send partial completion + // notices to the consumer. + goto done; + } + while (n) { + NNI_ASSERT(aio->a_niov != 0); + + if (aio->a_iov[0].iov_len > n) { + aio->a_iov[0].iov_len -= n; + aio->a_iov[0].iov_buf += n; + break; + } + n -= aio->a_iov[0].iov_len; + for (int i = 0; i < aio->a_niov; i++) { + aio->a_iov[i] = aio->a_iov[i + 1]; + } + aio->a_niov--; + } + if ((aio->a_niov != 0) && (aio->a_iov[0].iov_len != 0)) { + // We have more to transmit. + http->wr(http->sock, aio); + nni_mtx_unlock(&http->mtx); + return; + } + +done: + nni_aio_list_remove(uaio); + nni_aio_finish(uaio, 0, uaio->a_count); + + // Start next write if another is ready. + http_wr_start(http); + + nni_mtx_unlock(&http->mtx); +} + +static void +http_wr_cancel(nni_aio *aio, int rv) +{ + nni_http *http = aio->a_prov_data; + + nni_mtx_lock(&http->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + if (aio == nni_list_first(&http->wrq)) { + http_close(http); + } + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&http->mtx); +} + +static void +http_wr_submit(nni_http *http, nni_aio *aio) +{ + if (nni_aio_start(aio, http_wr_cancel, http) != 0) { + return; + } + if (http->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + nni_list_append(&http->wrq, aio); + if (nni_list_first(&http->wrq) == aio) { + http_wr_start(http); + } +} + +void +nni_http_read_req(nni_http *http, nni_http_req *req, nni_aio *aio) +{ + SET_RD_FLAVOR(aio, HTTP_RD_REQ); + aio->a_prov_extra[1] = req; + + nni_mtx_lock(&http->mtx); + http_rd_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_read_res(nni_http *http, nni_http_res *res, nni_aio *aio) +{ + SET_RD_FLAVOR(aio, HTTP_RD_RES); + aio->a_prov_extra[1] = res; + + nni_mtx_lock(&http->mtx); + http_rd_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_read_full(nni_http *http, nni_aio *aio) +{ + aio->a_count = 0; + SET_RD_FLAVOR(aio, HTTP_RD_FULL); + aio->a_prov_extra[1] = NULL; + + nni_mtx_lock(&http->mtx); + http_rd_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_read(nni_http *http, nni_aio *aio) +{ + SET_RD_FLAVOR(aio, HTTP_RD_RAW); + aio->a_prov_extra[1] = NULL; + + nni_mtx_lock(&http->mtx); + http_rd_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_write_req(nni_http *http, nni_http_req *req, nni_aio *aio) +{ + int rv; + void * buf; + size_t bufsz; + + if ((rv = nni_http_req_get_buf(req, &buf, &bufsz)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + aio->a_niov = 1; + aio->a_iov[0].iov_len = bufsz; + aio->a_iov[0].iov_buf = buf; + SET_WR_FLAVOR(aio, HTTP_WR_REQ); + + nni_mtx_lock(&http->mtx); + http_wr_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_write_res(nni_http *http, nni_http_res *res, nni_aio *aio) +{ + int rv; + void * buf; + size_t bufsz; + + if ((rv = nni_http_res_get_buf(res, &buf, &bufsz)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + aio->a_niov = 1; + aio->a_iov[0].iov_len = bufsz; + aio->a_iov[0].iov_buf = buf; + SET_WR_FLAVOR(aio, HTTP_WR_RES); + + nni_mtx_lock(&http->mtx); + http_wr_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +// Writer. As with nni_http_conn_write, this is used to write data on +// a connection that has been "upgraded" (e.g. transformed to +// websocket). It is an error to perform other HTTP exchanges on an +// connection after this method is called. (This mostly exists to +// support websocket.) +void +nni_http_write(nni_http *http, nni_aio *aio) +{ + SET_WR_FLAVOR(aio, HTTP_WR_RAW); + + nni_mtx_lock(&http->mtx); + http_wr_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_write_full(nni_http *http, nni_aio *aio) +{ + SET_WR_FLAVOR(aio, HTTP_WR_FULL); + + nni_mtx_lock(&http->mtx); + http_wr_submit(http, aio); + nni_mtx_unlock(&http->mtx); +} + +void +nni_http_fini(nni_http *http) +{ + nni_mtx_lock(&http->mtx); + http_close(http); + if ((http->sock != NULL) && (http->fini != NULL)) { + http->fini(http->sock); + http->sock = NULL; + } + nni_mtx_unlock(&http->mtx); + nni_aio_stop(http->wr_aio); + nni_aio_stop(http->rd_aio); + nni_aio_fini(http->wr_aio); + nni_aio_fini(http->rd_aio); + nni_free(http->rd_buf, http->rd_bufsz); + nni_mtx_fini(&http->mtx); + NNI_FREE_STRUCT(http); +} + +int +nni_http_init(nni_http **httpp, nni_http_tran *tran) +{ + nni_http *http; + int rv; + + if ((http = NNI_ALLOC_STRUCT(http)) == NULL) { + return (NNG_ENOMEM); + } + http->rd_bufsz = HTTP_BUFSIZE; + if ((http->rd_buf = nni_alloc(http->rd_bufsz)) == NULL) { + NNI_FREE_STRUCT(http); + return (NNG_ENOMEM); + } + nni_mtx_init(&http->mtx); + nni_aio_list_init(&http->rdq); + nni_aio_list_init(&http->wrq); + + if (((rv = nni_aio_init(&http->wr_aio, http_wr_cb, http)) != 0) || + ((rv = nni_aio_init(&http->rd_aio, http_rd_cb, http)) != 0)) { + nni_http_fini(http); + return (rv); + } + http->rd_bufsz = HTTP_BUFSIZE; + http->rd = tran->h_read; + http->wr = tran->h_write; + http->close = tran->h_close; + http->fini = tran->h_fini; + http->sock = tran->h_data; + + *httpp = http; + + return (0); +} diff --git a/src/supplemental/http/http.h b/src/supplemental/http/http.h new file mode 100644 index 00000000..09c72f8a --- /dev/null +++ b/src/supplemental/http/http.h @@ -0,0 +1,290 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_SUPPLEMENTAL_HTTP_HTTP_H +#define NNG_SUPPLEMENTAL_HTTP_HTTP_H + +#include <stdbool.h> + +// nni_http_msg represents an HTTP request or response message. +typedef struct nni_http_msg nni_http_msg; +typedef struct nni_http_res nni_http_res; +typedef struct nni_http_entity nni_http_entity; + +typedef struct nni_http_tran { + void *h_data; + void (*h_read)(void *, nni_aio *); + void (*h_write)(void *, nni_aio *); + void (*h_close)(void *); + void (*h_fini)(void *); +} nni_http_tran; + +typedef struct nni_http_req nni_http_req; + +extern int nni_http_req_init(nni_http_req **); +extern void nni_http_req_fini(nni_http_req *); +extern void nni_http_req_reset(nni_http_req *); +extern int nni_http_req_set_header(nni_http_req *, const char *, const char *); +extern int nni_http_req_add_header(nni_http_req *, const char *, const char *); +extern int nni_http_req_del_header(nni_http_req *, const char *); +extern int nni_http_req_get_buf(nni_http_req *, void **, size_t *); +extern int nni_http_req_set_method(nni_http_req *, const char *); +extern int nni_http_req_set_version(nni_http_req *, const char *); +extern int nni_http_req_set_uri(nni_http_req *, const char *); +extern const char *nni_http_req_get_header(nni_http_req *, const char *); +extern const char *nni_http_req_get_header(nni_http_req *, const char *); +extern const char *nni_http_req_get_version(nni_http_req *); +extern const char *nni_http_req_get_uri(nni_http_req *); +extern const char *nni_http_req_get_method(nni_http_req *); +extern int nni_http_req_parse(nni_http_req *, void *, size_t, size_t *); + +extern int nni_http_res_init(nni_http_res **); +extern void nni_http_res_fini(nni_http_res *); +extern void nni_http_res_reset(nni_http_res *); +extern int nni_http_res_get_buf(nni_http_res *, void **, size_t *); +extern int nni_http_res_set_header(nni_http_res *, const char *, const char *); +extern int nni_http_res_add_header(nni_http_res *, const char *, const char *); +extern int nni_http_res_del_header(nni_http_res *, const char *); +extern int nni_http_res_set_version(nni_http_res *, const char *); +extern int nni_http_res_set_status(nni_http_res *, int, const char *); +extern const char *nni_http_res_get_header(nni_http_res *, const char *); +extern const char *nni_http_res_get_version(nni_http_res *); +extern const char *nni_http_res_get_reason(nni_http_res *); +extern int nni_http_res_get_status(nni_http_res *); +extern int nni_http_res_parse(nni_http_res *, void *, size_t, size_t *); +extern int nni_http_res_set_data(nni_http_res *, const void *, size_t); +extern int nni_http_res_copy_data(nni_http_res *, const void *, size_t); +extern int nni_http_res_alloc_data(nni_http_res *, size_t); +extern void nni_http_res_get_data(nni_http_res *, void **, size_t *); +extern int nni_http_res_init_error(nni_http_res **, uint16_t); + +// HTTP status codes. This list is not exhaustive. +enum { NNI_HTTP_STATUS_CONTINUE = 100, + NNI_HTTP_STATUS_SWITCHING = 101, + NNI_HTTP_STATUS_PROCESSING = 102, + NNI_HTTP_STATUS_OK = 200, + NNI_HTTP_STATUS_CREATED = 201, + NNI_HTTP_STATUS_ACCEPTED = 202, + NNI_HTTP_STATUS_NOT_AUTHORITATIVE = 203, + NNI_HTTP_STATUS_NO_CONTENT = 204, + NNI_HTTP_STATUS_RESET_CONTENT = 205, + NNI_HTTP_STATUS_PARTIAL_CONTENT = 206, + NNI_HTTP_STATUS_MULTI_STATUS = 207, + NNI_HTTP_STATUS_ALREADY_REPORTED = 208, + NNI_HTTP_STATUS_IM_USED = 226, + NNI_HTTP_STATUS_MULTIPLE_CHOICES = 300, + NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY = 301, + NNI_HTTP_STATUS_FOUND = 302, + NNI_HTTP_STATUS_SEE_OTHER = 303, + NNI_HTTP_STATUS_NOT_MODIFIED = 304, + NNI_HTTP_STATUS_USE_PROXY = 305, + NNI_HTTP_STATUS_TEMPORARY_REDIRECT = 307, + NNI_HTTP_STATUS_PERMANENT_REDIRECT = 308, + NNI_HTTP_STATUS_BAD_REQUEST = 400, + NNI_HTTP_STATUS_UNAUTHORIZED = 401, + NNI_HTTP_STATUS_PAYMENT_REQUIRED = 402, + NNI_HTTP_STATUS_FORBIDDEN = 403, + NNI_HTTP_STATUS_NOT_FOUND = 404, + NNI_HTTP_STATUS_METHOD_NOT_ALLOWED = 405, + NNI_HTTP_STATUS_NOT_ACCEPTABLE = 406, + NNI_HTTP_STATUS_PROXY_AUTH_REQUIRED = 407, + NNI_HTTP_STATUS_REQUEST_TIMEOUT = 408, + NNI_HTTP_STATUS_CONFLICT = 409, + NNI_HTTP_STATUS_GONE = 410, + NNI_HTTP_STATUS_LENGTH_REQUIRED = 411, + NNI_HTTP_STATUS_PRECONDITION_FAILED = 412, + NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE = 413, + NNI_HTTP_STATUS_URI_TOO_LONG = 414, + NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE = 415, + NNI_HTTP_STATUS_RANGE_NOT_SATISFIABLE = 416, + NNI_HTTP_STATUS_EXPECTATION_FAILED = 417, + NNI_HTTP_STATUS_TEAPOT = 418, + NNI_HTTP_STATUS_UNPROCESSABLE_ENTITY = 422, + NNI_HTTP_STATUS_LOCKED = 423, + NNI_HTTP_STATUS_FAILED_DEPENDENCY = 424, + NNI_HTTP_STATUS_UPGRADE_REQUIRED = 426, + NNI_HTTP_STATUS_PRECONDITION_REQUIRED = 428, + NNI_HTTP_STATUS_TOO_MANY_REQUESTS = 429, + NNI_HTTP_STATUS_HEADERS_TOO_LARGE = 431, + NNI_HTTP_STATUS_UNAVAIL_LEGAL_REASONS = 451, + NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR = 500, + NNI_HTTP_STATUS_NOT_IMPLEMENTED = 501, + NNI_HTTP_STATUS_BAD_GATEWAY = 502, + NNI_HTTP_STATUS_SERVICE_UNAVAILABLE = 503, + NNI_HTTP_STATUS_GATEWAY_TIMEOUT = 504, + NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP = 505, + NNI_HTTP_STATUS_VARIANT_ALSO_NEGOTIATES = 506, + NNI_HTTP_STATUS_INSUFFICIENT_STORAGE = 507, + NNI_HTTP_STATUS_LOOP_DETECTED = 508, + NNI_HTTP_STATUS_NOT_EXTENDED = 510, + NNI_HTTP_STATUS_NETWORK_AUTH_REQUIRED = 511, +}; + +// An HTTP connection is a connection over which messages are exchanged. +// Generally, clients send request messages, and then read responses. +// Servers, read requests, and write responses. However, we do not +// require a 1:1 mapping between request and response here -- the application +// is responsible for dealing with that. +// +// We only support HTTP/1.1, though using the nni_http_conn_read and +// nni_http_conn_write low level methods, it is possible to write an upgrader +// (such as websocket!) that might support e.g. HTTP/2 or reading data that +// follows a legacy HTTP/1.0 message. +// +// Any error on the connection, including cancellation of a request, is fatal +// the connection. +typedef struct nni_http nni_http; + +extern int nni_http_init(nni_http **, nni_http_tran *); +extern void nni_http_close(nni_http *); +extern void nni_http_fini(nni_http *); + +// Reading messages -- the caller must supply a preinitialized (but otherwise +// idle) message. We recommend the caller store this in the aio's user data. +// Note that the iovs of the aio's are clobbered by these methods -- callers +// must not use them for any other purpose. + +extern void nni_http_write_req(nni_http *, nni_http_req *, nni_aio *); +extern void nni_http_write_res(nni_http *, nni_http_res *, nni_aio *); +extern void nni_http_read_req(nni_http *, nni_http_req *, nni_aio *); +extern void nni_http_read_res(nni_http *, nni_http_res *, nni_aio *); + +extern void nni_http_read(nni_http *, nni_aio *); +extern void nni_http_read_full(nni_http *, nni_aio *); +extern void nni_http_write(nni_http *, nni_aio *); +extern void nni_http_write_full(nni_http *, nni_aio *); + +typedef struct nni_http_server nni_http_server; + +typedef struct { + // h_path is the relative URI that we are going to match against. + // Must not be NULL. Note that query parameters (things following + // a "?" at the end of the path) are ignored when matching. This + // field may not be NULL. + const char *h_path; + + // h_method is the HTTP method to handle such as "GET" or "POST". + // Must not be empty or NULL. If the incoming method is HEAD, then + // the server will process HEAD the same as GET, but will not send + // any response body. + const char *h_method; + + // h_host is used to match on a specific Host: entry. If left NULL, + // then this handler will match regardless of the Host: value. + const char *h_host; + + // h_is_dir indicates that the path represents a directory, and + // any path which is a logically below it should also be matched. + // This means that "/phone" will match for "/phone/bob" but not + // "/phoneme/ma". Be advised that it is not possible to register + // a handler for a parent and a different handler for children. + // (This restriction may be lifted in the future.) + bool h_is_dir; + + // h_is_upgrader is used for callbacks that "upgrade" (or steal) + // their connection. When this is true, the server framework + // assumes that the handler takes over *all* of the details of + // the connection. Consequently, the connection is disassociated + // from the framework, and no response is sent. (Upgraders are + // responsible for adopting the connection, including closing it + // when they are done, and for sending any HTTP response message. + // This is true even if an error occurs.) + bool h_is_upgrader; + + // h_cb is a callback that handles the request. The conventions + // are as follows: + // + // inputs: + // 0 - nni_http * for the actual underlying HTTP channel + // 1 - nni_http_req * for the HTTP request object + // 2 - void * for the opaque pointer supplied at registration + // + // outputs: + // 0 - (optional) nni_http_res * for an HTTP response (see below) + // + // The callback may choose to return the a response object in output 0, + // in which case the framework will handle sending the reply. + // (Response entity content is also sent if the response data + // is not NULL.) The callback may instead do it's own replies, in + // which case the response output should be NULL. + // + // Note that any request entity data is *NOT* supplied automatically + // with the request object; the callback is expected to call the + // nni_http_read_data method to retrieve any message data based upon + // the presence of headers. (It may also call nni_http_read or + // nni_http_write on the channel as it sees fit.) + // + // Upgraders should call the completion routine immediately, + // once they have collected the request object and HTTP channel. + void (*h_cb)(nni_aio *); +} nni_http_handler; + +// nni_http_server will look for an existing server with the same +// socket address, or create one if one does not exist. The servers +// are reference counted to permit sharing the server object across +// multiple subsystems. The sockaddr matching is very limited though, +// and the addresses must match *exactly*. +extern int nni_http_server_init(nni_http_server **, nng_sockaddr *); + +// nni_http_server_fini drops the reference count on the server, and +// if this was the last reference, closes down the server and frees +// all related resources. It will not affect hijacked connections. +extern void nni_http_server_fini(nni_http_server *); + +// nni_http_server_add_handler registers a new handler on the server. +// This function will return NNG_EADDRINUSE if a conflicting handler +// is already registered (i.e. a handler with the same value for Host, +// Method, and URL.) The first parameter receives an opaque handle to +// the handler, that can be used to unregister the handler later. +extern int nni_http_server_add_handler( + void **, nni_http_server *, nni_http_handler *, void *); + +extern void nni_http_server_del_handler(nni_http_server *, void *); + +// nni_http_server_start starts listening on the supplied port. +extern int nni_http_server_start(nni_http_server *); + +// nni_http_server_stop stops the server, closing the listening socket. +// Connections that have been "upgraded" are unaffected. Connections +// associated with a callback will complete their callback, and then close. +extern void nni_http_server_stop(nni_http_server *); + +// nni_http_server_add_static is a short cut to add static +// content handler to the server. The host may be NULL, and the +// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL, +// then application/octet stream will be the (probably bad) default. +// The actual data is copied, and so the caller may discard it once +// this function returns. +extern int nni_http_server_add_static(nni_http_server *, const char *host, + const char *ctype, const char *uri, const void *, size_t); + +// nni_http_server_add file is a short cut to add a file-backed static +// content handler to the server. The host may be NULL, and the +// ctype (aka Content-Type) may be NULL. If the Content-Type is NULL, +// then the server will try to guess it based on the filename -- but only +// a small number of file types are builtin. URI is the absolute URI +// (sans hostname and scheme), and the path is the path on the local +// filesystem where the file can be found. +extern int nni_http_server_add_file(nni_http_server *, const char *host, + const char *ctype, const char *uri, const char *path); + +// TLS will use +// extern int nni_http_server_start_tls(nni_http_server *, nng_sockaddr *, +// nni_tls_config *); + +// Client stuff. + +typedef struct nni_http_client nni_http_client; + +extern int nni_http_client_init(nni_http_client **, nng_sockaddr *); +extern void nni_http_client_fini(nni_http_client *); +extern void nni_http_client_connect(nni_http_client *, nni_aio *); + +#endif // NNG_SUPPLEMENTAL_HTTP_HTTP_H diff --git a/src/supplemental/http/http_msg.c b/src/supplemental/http/http_msg.c new file mode 100644 index 00000000..2e245868 --- /dev/null +++ b/src/supplemental/http/http_msg.c @@ -0,0 +1,956 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdarg.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "http.h" + +// Note that as we parse headers, the rule is that if a header is already +// present, then we can append it to the existing header, separated by +// a comma. From experience, for example, Firefox uses a Connection: +// header with two values, "keepalive", and "upgrade". +typedef struct http_header { + char * name; + char * value; + nni_list_node node; +} http_header; + +struct nni_http_entity { + char * data; + size_t size; // allocated/expected size + size_t len; // current length + bool own; // if true, data is "ours", and should be freed +}; + +struct nni_http_req { + nni_list hdrs; + nni_http_entity data; + char * meth; + char * uri; + char * vers; + char * buf; + size_t bufsz; +}; + +struct nni_http_res { + nni_list hdrs; + nni_http_entity data; + int code; + char * rsn; + char * vers; + char * buf; + size_t bufsz; +}; + +static int +http_set_string(char **strp, const char *val) +{ + char *news; + if ((news = nni_strdup(val)) == NULL) { + return (NNG_ENOMEM); + } + nni_strfree(*strp); + *strp = news; + return (0); +} + +static void +http_headers_reset(nni_list *hdrs) +{ + http_header *h; + while ((h = nni_list_first(hdrs)) != NULL) { + nni_list_remove(hdrs, h); + if (h->name != NULL) { + nni_strfree(h->name); + } + if (h->value != NULL) { + nni_free(h->value, strlen(h->value) + 1); + } + NNI_FREE_STRUCT(h); + } +} + +static void +http_entity_reset(nni_http_entity *entity) +{ + if (entity->own && entity->size) { + nni_free(entity->data, entity->size); + } + entity->data = NULL; + entity->size = 0; + entity->own = false; +} + +void +nni_http_req_reset(nni_http_req *req) +{ + http_headers_reset(&req->hdrs); + http_entity_reset(&req->data); + nni_strfree(req->vers); + nni_strfree(req->meth); + nni_strfree(req->uri); + req->vers = req->meth = req->uri = NULL; + if (req->bufsz) { + req->buf[0] = '\0'; + } +} + +void +nni_http_res_reset(nni_http_res *res) +{ + http_headers_reset(&res->hdrs); + http_entity_reset(&res->data); + nni_strfree(res->rsn); + nni_strfree(res->vers); + res->code = 0; + if (res->bufsz) { + res->buf[0] = '\0'; + } +} + +void +nni_http_req_fini(nni_http_req *req) +{ + nni_http_req_reset(req); + if (req->bufsz) { + nni_free(req->buf, req->bufsz); + } + NNI_FREE_STRUCT(req); +} + +void +nni_http_res_fini(nni_http_res *res) +{ + nni_http_res_reset(res); + if (res->bufsz) { + nni_free(res->buf, res->bufsz); + } + NNI_FREE_STRUCT(res); +} + +static int +http_del_header(nni_list *hdrs, const char *key) +{ + http_header *h; + NNI_LIST_FOREACH (hdrs, h) { + if (strcasecmp(key, h->name) == 0) { + nni_list_remove(hdrs, h); + nni_strfree(h->name); + nni_free(h->value, strlen(h->value) + 1); + NNI_FREE_STRUCT(h); + return (0); + } + } + return (NNG_ENOENT); +} + +int +nni_req_del_header(nni_http_req *req, const char *key) +{ + return (http_del_header(&req->hdrs, key)); +} + +int +nni_res_del_header(nni_http_res *res, const char *key) +{ + return (http_del_header(&res->hdrs, key)); +} + +static int +http_set_header(nni_list *hdrs, const char *key, const char *val) +{ + http_header *h; + NNI_LIST_FOREACH (hdrs, h) { + if (strcasecmp(key, h->name) == 0) { + char * news; + size_t len = strlen(val) + 1; + if ((news = nni_alloc(len)) == NULL) { + return (NNG_ENOMEM); + } + snprintf(news, len, "%s", val); + nni_free(h->value, strlen(h->value) + 1); + h->value = news; + return (0); + } + } + + if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { + return (NNG_ENOMEM); + } + if ((h->name = nni_strdup(key)) == NULL) { + NNI_FREE_STRUCT(h); + return (NNG_ENOMEM); + } + if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) { + nni_strfree(h->name); + NNI_FREE_STRUCT(h); + return (NNG_ENOMEM); + } + strncpy(h->value, val, strlen(val) + 1); + nni_list_append(hdrs, h); + return (0); +} + +int +nni_http_req_set_header(nni_http_req *req, const char *key, const char *val) +{ + return (http_set_header(&req->hdrs, key, val)); +} + +int +nni_http_res_set_header(nni_http_res *res, const char *key, const char *val) +{ + return (http_set_header(&res->hdrs, key, val)); +} + +static int +http_add_header(nni_list *hdrs, const char *key, const char *val) +{ + http_header *h; + NNI_LIST_FOREACH (hdrs, h) { + if (strcasecmp(key, h->name) == 0) { + char * news; + size_t len = strlen(h->value) + strlen(val) + 3; + if ((news = nni_alloc(len)) == NULL) { + return (NNG_ENOMEM); + } + snprintf(news, len, "%s, %s", h->value, val); + nni_free(h->value, strlen(h->value) + 1); + h->value = news; + return (0); + } + } + + if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { + return (NNG_ENOMEM); + } + if ((h->name = nni_strdup(key)) == NULL) { + NNI_FREE_STRUCT(h); + return (NNG_ENOMEM); + } + if ((h->value = nni_alloc(strlen(val) + 1)) == NULL) { + nni_strfree(h->name); + NNI_FREE_STRUCT(h); + return (NNG_ENOMEM); + } + strncpy(h->value, val, strlen(val) + 1); + nni_list_append(hdrs, h); + return (0); +} + +int +nni_http_req_add_header(nni_http_req *req, const char *key, const char *val) +{ + return (http_add_header(&req->hdrs, key, val)); +} + +int +nni_http_res_add_header(nni_http_res *res, const char *key, const char *val) +{ + return (http_add_header(&res->hdrs, key, val)); +} + +static const char * +http_get_header(nni_list *hdrs, const char *key) +{ + http_header *h; + NNI_LIST_FOREACH (hdrs, h) { + if (strcasecmp(h->name, key) == 0) { + return (h->value); + } + } + return (NULL); +} + +const char * +nni_http_req_get_header(nni_http_req *req, const char *key) +{ + return (http_get_header(&req->hdrs, key)); +} + +const char * +nni_http_res_get_header(nni_http_res *res, const char *key) +{ + return (http_get_header(&res->hdrs, key)); +} + +// http_entity_set_data sets the entity, but does not update the +// content-length header. +static void +http_entity_set_data(nni_http_entity *entity, const void *data, size_t size) +{ + if (entity->own) { + nni_free(entity->data, entity->size); + } + entity->data = (void *) data; + entity->size = size; + entity->own = false; +} + +static int +http_entity_alloc_data(nni_http_entity *entity, size_t size) +{ + void *newdata; + if ((newdata = nni_alloc(size)) == NULL) { + return (NNG_ENOMEM); + } + http_entity_set_data(entity, newdata, size); + entity->own = true; + return (0); +} + +static int +http_entity_copy_data(nni_http_entity *entity, const void *data, size_t size) +{ + int rv; + if ((rv = http_entity_alloc_data(entity, size)) == 0) { + memcpy(entity->data, data, size); + } + return (rv); +} + +static int +http_set_content_length(nni_http_entity *entity, nni_list *hdrs) +{ + char buf[16]; + (void) snprintf(buf, sizeof(buf), "%u", (unsigned) entity->size); + return (http_set_header(hdrs, "Content-Length", buf)); +} + +static void +http_entity_get_data(nni_http_entity *entity, void **datap, size_t *sizep) +{ + *datap = entity->data; + *sizep = entity->size; +} + +void +nni_http_req_get_data(nni_http_req *req, void **datap, size_t *sizep) +{ + http_entity_get_data(&req->data, datap, sizep); +} + +void +nni_http_res_get_data(nni_http_res *res, void **datap, size_t *sizep) +{ + http_entity_get_data(&res->data, datap, sizep); +} + +int +nni_http_req_set_data(nni_http_req *req, const void *data, size_t size) +{ + int rv; + + http_entity_set_data(&req->data, data, size); + if ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0) { + http_entity_set_data(&req->data, NULL, 0); + } + return (rv); +} + +int +nni_http_res_set_data(nni_http_res *res, const void *data, size_t size) +{ + int rv; + + http_entity_set_data(&res->data, data, size); + if ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0) { + http_entity_set_data(&res->data, NULL, 0); + } + return (rv); +} + +int +nni_http_req_copy_data(nni_http_req *req, const void *data, size_t size) +{ + int rv; + + if (((rv = http_entity_copy_data(&req->data, data, size)) != 0) || + ((rv = http_set_content_length(&req->data, &req->hdrs)) != 0)) { + http_entity_set_data(&req->data, NULL, 0); + return (rv); + } + return (0); +} + +int +nni_http_res_copy_data(nni_http_res *res, const void *data, size_t size) +{ + int rv; + + if (((rv = http_entity_copy_data(&res->data, data, size)) != 0) || + ((rv = http_set_content_length(&res->data, &res->hdrs)) != 0)) { + http_entity_set_data(&res->data, NULL, 0); + return (rv); + } + return (0); +} + +int +nni_http_res_alloc_data(nni_http_res *res, size_t size) +{ + return (http_entity_alloc_data(&res->data, size)); +} + +static int +http_parse_header(nni_list *hdrs, void *line) +{ + http_header *h; + char * key = line; + char * val; + char * end; + + // Find separation between key and value + if ((val = strchr(key, ':')) == NULL) { + return (NNG_EPROTO); + } + + // Trim leading and trailing whitespace from header + *val = '\0'; + val++; + while (*val == ' ' || *val == '\t') { + val++; + } + end = val + strlen(val); + end--; + while ((end > val) && (*end == ' ' || *end == '\t')) { + *end = '\0'; + end--; + } + + return (http_add_header(hdrs, key, val)); +} + +// http_sprintf_headers makes headers for an HTTP request or an HTTP response +// object. Each header is dumped from the list. If the buf is NULL, +// or the sz is 0, then a dryrun is done, in order to allow the caller to +// determine how much space is needed. Returns the size of the space needed, +// not including the terminating NULL byte. Truncation occurs if the size +// returned is >= the requested size. +static size_t +http_sprintf_headers(char *buf, size_t sz, nni_list *list) +{ + size_t rv = 0; + http_header *h; + + if (buf == NULL) { + sz = 0; + } + + NNI_LIST_FOREACH (list, h) { + size_t l; + l = snprintf(buf, sz, "%s: %s\r\n", h->name, h->value); + if (buf != NULL) { + buf += l; + } + sz = (sz > l) ? sz - l : 0; + rv += l; + } + return (rv); +} + +static int +http_asprintf(char **bufp, size_t *szp, nni_list *hdrs, const char *fmt, ...) +{ + va_list ap; + size_t len; + size_t n; + char * buf; + + va_start(ap, fmt); + len = vsnprintf(NULL, 0, fmt, ap); + va_end(ap); + + len += http_sprintf_headers(NULL, 0, hdrs); + len += 5; // \r\n\r\n\0 + + if (len <= *szp) { + buf = *bufp; + } else { + if ((buf = nni_alloc(len)) == NULL) { + return (NNG_ENOMEM); + } + nni_free(*bufp, *szp); + *bufp = buf; + *szp = len; + } + va_start(ap, fmt); + n = vsnprintf(buf, len, fmt, ap); + va_end(ap); + buf += n; + len -= n; + n = http_sprintf_headers(buf, len, hdrs); + buf += n; + len -= n; + snprintf(buf, len, "\r\n"); + return (0); +} + +static int +http_req_prepare(nni_http_req *req) +{ + int rv; + if ((req->uri == NULL) || (req->meth == NULL)) { + return (NNG_EINVAL); + } + rv = http_asprintf(&req->buf, &req->bufsz, &req->hdrs, "%s %s %s\r\n", + req->meth, req->uri, req->vers != NULL ? req->vers : "HTTP/1.1"); + return (rv); +} + +static int +http_res_prepare(nni_http_res *res) +{ + int rv; + rv = http_asprintf(&res->buf, &res->bufsz, &res->hdrs, "%s %d %s\r\n", + res->vers != NULL ? res->vers : "HTTP/1.1", res->code, + res->rsn != NULL ? res->rsn : "Unknown Error"); + return (rv); +} + +int +nni_http_req_get_buf(nni_http_req *req, void **data, size_t *szp) +{ + int rv; + + if ((req->buf == NULL) && (rv = http_req_prepare(req)) != 0) { + return (rv); + } + *data = req->buf; + *szp = strlen(req->buf); + return (0); +} + +int +nni_http_res_get_buf(nni_http_res *res, void **data, size_t *szp) +{ + int rv; + + if ((res->buf == NULL) && (rv = http_res_prepare(res)) != 0) { + return (rv); + } + *data = res->buf; + *szp = strlen(res->buf); + return (0); +} + +int +nni_http_req_init(nni_http_req **reqp) +{ + nni_http_req *req; + if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&req->hdrs, http_header, node); + req->buf = NULL; + req->bufsz = 0; + req->data.data = NULL; + req->data.size = 0; + req->data.own = false; + req->vers = NULL; + req->meth = NULL; + req->uri = NULL; + *reqp = req; + return (0); +} + +int +nni_http_res_init(nni_http_res **resp) +{ + nni_http_res *res; + if ((res = NNI_ALLOC_STRUCT(res)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&res->hdrs, http_header, node); + res->buf = NULL; + res->bufsz = 0; + res->data.data = NULL; + res->data.size = 0; + res->data.own = false; + res->vers = NULL; + res->rsn = NULL; + res->code = 0; + *resp = res; + return (0); +} + +const char * +nni_http_req_get_method(nni_http_req *req) +{ + return (req->meth); +} + +const char * +nni_http_req_get_uri(nni_http_req *req) +{ + return (req->uri); +} + +const char * +nni_http_req_get_version(nni_http_req *req) +{ + return (req->vers); +} + +const char * +nni_http_res_get_version(nni_http_res *res) +{ + return (res->vers); +} + +int +nni_http_req_set_version(nni_http_req *req, const char *vers) +{ + return (http_set_string(&req->vers, vers)); +} + +int +nni_http_res_set_version(nni_http_res *res, const char *vers) +{ + return (http_set_string(&res->vers, vers)); +} + +int +nni_http_req_set_uri(nni_http_req *req, const char *uri) +{ + return (http_set_string(&req->uri, uri)); +} + +int +nni_http_req_set_method(nni_http_req *req, const char *meth) +{ + return (http_set_string(&req->meth, meth)); +} + +int +nni_http_res_set_status(nni_http_res *res, int status, const char *reason) +{ + int rv; + if ((rv = http_set_string(&res->rsn, reason)) == 0) { + res->code = status; + } + return (rv); +} + +int +nni_http_res_get_status(nni_http_res *res) +{ + return (res->code); +} + +const char * +nni_http_res_get_reason(nni_http_res *res) +{ + return (res->rsn); +} + +static int +http_scan_line(void *vbuf, size_t n, size_t *lenp) +{ + size_t len; + char lc; + char * buf = vbuf; + + lc = 0; + for (len = 0; len < n; len++) { + char c = buf[len]; + if (c == '\n') { + // Technically we should be receiving CRLF, but + // debugging is easier with just LF, so we behave + // following Postel's Law. + if (lc != '\r') { + buf[len] = '\0'; + } else { + buf[len - 1] = '\0'; + } + *lenp = len + 1; + return (0); + } + // If we have a control character (other than CR), or a CR + // followed by anything other than LF, then its an error. + if (((c < ' ') && (c != '\r')) || (lc == '\r')) { + return (NNG_EPROTO); + } + lc = c; + } + // Scanned the entire content, but did not find a line. + return (NNG_EAGAIN); +} + +static int +http_req_parse_line(nni_http_req *req, void *line) +{ + int rv; + char *method; + char *uri; + char *version; + + method = line; + if ((uri = strchr(method, ' ')) == NULL) { + return (NNG_EPROTO); + } + *uri = '\0'; + uri++; + + if ((version = strchr(uri, ' ')) == NULL) { + return (NNG_EPROTO); + } + *version = '\0'; + version++; + + if (((rv = nni_http_req_set_method(req, method)) != 0) || + ((rv = nni_http_req_set_uri(req, uri)) != 0) || + ((rv = nni_http_req_set_version(req, version)) != 0)) { + return (rv); + } + return (0); +} + +static int +http_res_parse_line(nni_http_res *res, uint8_t *line) +{ + int rv; + char *reason; + char *codestr; + char *version; + int status; + + version = (char *) line; + if ((codestr = strchr(version, ' ')) == NULL) { + return (NNG_EPROTO); + } + *codestr = '\0'; + codestr++; + + if ((reason = strchr(codestr, ' ')) == NULL) { + return (NNG_EPROTO); + } + *reason = '\0'; + reason++; + + status = atoi(codestr); + if ((status < 100) || (status > 999)) { + return (NNG_EPROTO); + } + + if (((rv = nni_http_res_set_status(res, status, reason)) != 0) || + ((rv = nni_http_res_set_version(res, version)) != 0)) { + return (rv); + } + return (0); +} + +// nni_http_req_parse parses a request (but not any attached entity data). +// The amount of data consumed is returned in lenp. Returns zero on +// success, NNG_EPROTO on parse failure, NNG_EAGAIN if more data is +// required, or NNG_ENOMEM on memory exhaustion. Note that lenp may +// be updated even in the face of errors (esp. NNG_EAGAIN, which is +// not an error so much as a request for more data.) +int +nni_http_req_parse(nni_http_req *req, void *buf, size_t n, size_t *lenp) +{ + + size_t len = 0; + size_t cnt; + int rv = 0; + + for (;;) { + uint8_t *line; + if ((rv = http_scan_line(buf, n, &cnt)) != 0) { + break; + } + + len += cnt; + line = buf; + buf = line + cnt; + n -= cnt; + + if (*line == '\0') { + break; + } + + if (req->vers != NULL) { + rv = http_parse_header(&req->hdrs, line); + } else { + rv = http_req_parse_line(req, line); + } + + if (rv != 0) { + break; + } + } + + *lenp = len; + return (rv); +} + +int +nni_http_res_parse(nni_http_res *res, void *buf, size_t n, size_t *lenp) +{ + + size_t len = 0; + size_t cnt; + int rv = 0; + for (;;) { + uint8_t *line; + if ((rv = http_scan_line(buf, n, &cnt)) != 0) { + break; + } + + len += cnt; + line = buf; + buf = line + cnt; + n -= cnt; + + if (*line == '\0') { + break; + } + + if (res->vers != NULL) { + rv = http_parse_header(&res->hdrs, line); + } else { + rv = http_res_parse_line(res, line); + } + + if (rv != 0) { + break; + } + } + + *lenp = len; + return (rv); +} + +int +nni_http_res_init_error(nni_http_res **resp, uint16_t err) +{ + char * rsn; + char rsnbuf[80]; + char html[1024]; + nni_http_res *res; + + if ((nni_http_res_init(&res)) != 0) { + return (NNG_ENOMEM); + } + + // Note that it is expected that redirect URIs will update the + // payload to reflect the target location. + switch (err) { + case NNI_HTTP_STATUS_STATUS_MOVED_PERMANENTLY: + rsn = "Moved Permanently"; + break; + case NNI_HTTP_STATUS_MULTIPLE_CHOICES: + rsn = "Multiple Choices"; + break; + case NNI_HTTP_STATUS_FOUND: + rsn = "Found"; + break; + case NNI_HTTP_STATUS_SEE_OTHER: + rsn = "See Other"; + break; + case NNI_HTTP_STATUS_TEMPORARY_REDIRECT: + rsn = "Temporary Redirect"; + break; + case NNI_HTTP_STATUS_BAD_REQUEST: + rsn = "Bad Request"; + break; + case NNI_HTTP_STATUS_UNAUTHORIZED: + rsn = "Unauthorized"; + break; + case NNI_HTTP_STATUS_PAYMENT_REQUIRED: + rsn = "Payment Required"; + break; + case NNI_HTTP_STATUS_NOT_FOUND: + rsn = "Not Found"; + break; + case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED: + // Caller must also supply an Allow: header + rsn = "Method Not Allowed"; + break; + case NNI_HTTP_STATUS_NOT_ACCEPTABLE: + rsn = "Not Acceptable"; + break; + case NNI_HTTP_STATUS_REQUEST_TIMEOUT: + rsn = "Request Timeout"; + break; + case NNI_HTTP_STATUS_CONFLICT: + rsn = "Conflict"; + break; + case NNI_HTTP_STATUS_GONE: + rsn = "Gone"; + break; + case NNI_HTTP_STATUS_LENGTH_REQUIRED: + rsn = "Length Required"; + break; + case NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE: + rsn = "Payload Too Large"; + break; + case NNI_HTTP_STATUS_FORBIDDEN: + rsn = "Forbidden"; + break; + case NNI_HTTP_STATUS_URI_TOO_LONG: + rsn = "URI Too Long"; + break; + case NNI_HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE: + rsn = "Unsupported Media Type"; + break; + case NNI_HTTP_STATUS_EXPECTATION_FAILED: + rsn = "Expectation Failed"; + break; + case NNI_HTTP_STATUS_UPGRADE_REQUIRED: + // Caller must add "Upgrade:" header. + rsn = "Upgrade Required"; + break; + case NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR: + rsn = "Internal Server Error"; + break; + case NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP: + rsn = "HTTP version not supported"; + break; + case NNI_HTTP_STATUS_NOT_IMPLEMENTED: + rsn = "Not Implemented"; + break; + case NNI_HTTP_STATUS_SERVICE_UNAVAILABLE: + rsn = "Service Unavailable"; + break; + default: + snprintf(rsnbuf, sizeof(rsnbuf), "HTTP error code %d", err); + rsn = rsnbuf; + break; + } + + // very simple builtin error page + snprintf(html, sizeof(html), + "<head><title>%d %s</title></head>" + "<body><p/><h1 align=\"center\">" + "<span style=\"font-size: 36px; border-radius: 5px; " + "background-color: black; color: white; padding: 7px; " + "font-family: Arial, sans serif;\">%d</span></h1>" + "<p align=\"center\">" + "<span style=\"font-size: 24px; font-family: Arial, sans serif;\">" + "%s</span></p></body>", + err, rsn, err, rsn); + + nni_http_res_set_status(res, err, rsn); + nni_http_res_copy_data(res, html, strlen(html)); + nni_http_res_set_version(res, "HTTP/1.1"); + nni_http_res_set_header( + res, "Content-Type", "text/html; charset=UTF-8"); + // We could set the date, but we don't necessarily have a portable + // way to get the time of day. + + *resp = res; + return (0); +} diff --git a/src/supplemental/http/server.c b/src/supplemental/http/server.c new file mode 100644 index 00000000..15b04f80 --- /dev/null +++ b/src/supplemental/http/server.c @@ -0,0 +1,1103 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <ctype.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "http.h" + +static int http_server_sys_init(void); +static void http_server_sys_fini(void); + +static nni_initializer http_server_initializer = { + .i_init = http_server_sys_init, + .i_fini = http_server_sys_fini, + .i_once = 0, +}; + +typedef struct http_handler { + nni_list_node node; + void * h_arg; + char * h_path; + char * h_method; + char * h_host; + bool h_is_upgrader; + bool h_is_dir; + void (*h_cb)(nni_aio *); + void (*h_free)(void *); +} http_handler; + +typedef struct http_sconn { + nni_list_node node; + nni_http * http; + nni_http_server *server; + nni_http_req * req; + nni_http_res * res; + bool close; + bool closed; + bool finished; + nni_aio * cbaio; + nni_aio * rxaio; + nni_aio * txaio; + nni_aio * txdataio; + nni_http_tran tran; +} http_sconn; + +struct nni_http_server { + nng_sockaddr addr; + nni_list_node node; + int refcnt; + int starts; + nni_list handlers; + nni_list conns; + nni_list reaps; + nni_mtx mtx; + nni_cv cv; + bool closed; + bool tls; + nni_task cleanup; + nni_aio * accaio; + nni_plat_tcp_ep *tep; +}; + +static nni_list http_servers; +static nni_mtx http_servers_lk; + +static void +http_sconn_fini(void *arg) +{ + http_sconn *sc = arg; + NNI_ASSERT(!sc->finished); + sc->finished = true; + nni_aio_stop(sc->rxaio); + nni_aio_stop(sc->txaio); + nni_aio_stop(sc->txdataio); + nni_aio_stop(sc->cbaio); + if (sc->http != NULL) { + nni_http_fini(sc->http); + } + if (sc->req != NULL) { + nni_http_req_fini(sc->req); + } + if (sc->res != NULL) { + nni_http_res_fini(sc->res); + } + nni_aio_fini(sc->rxaio); + nni_aio_fini(sc->txaio); + nni_aio_fini(sc->txdataio); + nni_aio_fini(sc->cbaio); + NNI_FREE_STRUCT(sc); +} + +static void +http_sconn_close(http_sconn *sc) +{ + nni_http_server *s; + s = sc->server; + + NNI_ASSERT(!sc->finished); + nni_mtx_lock(&s->mtx); + if (!sc->closed) { + nni_http *h; + sc->closed = true; + // Close the underlying transport. + if ((h = sc->http) != NULL) { + nni_http_close(h); + } + if (nni_list_node_active(&sc->node)) { + nni_list_remove(&s->conns, sc); + nni_list_append(&s->reaps, sc); + } + nni_task_dispatch(&s->cleanup); + } else { + nni_panic("DOUBLE CLOSE!\n"); + } + nni_mtx_unlock(&s->mtx); +} + +static void +http_sconn_txdatdone(void *arg) +{ + http_sconn *sc = arg; + nni_aio * aio = sc->txdataio; + + if (nni_aio_result(aio) != 0) { + http_sconn_close(sc); + return; + } + + if (sc->res != NULL) { + nni_http_res_fini(sc->res); + sc->res = NULL; + } + + if (sc->close) { + http_sconn_close(sc); + return; + } + + nni_http_req_reset(sc->req); + nni_http_read_req(sc->http, sc->req, sc->rxaio); +} + +static void +http_sconn_txdone(void *arg) +{ + http_sconn *sc = arg; + nni_aio * aio = sc->txaio; + int rv; + void * data; + size_t size; + + if ((rv = nni_aio_result(aio)) != 0) { + http_sconn_close(sc); + return; + } + + // For HEAD requests, we just treat like "GET" but don't send + // the data. (Required per HTTP.) + if (strcmp(nni_http_req_get_method(sc->req), "HEAD") == 0) { + size = 0; + } else { + nni_http_res_get_data(sc->res, &data, &size); + } + if (size) { + // Submit data. + sc->txdataio->a_niov = 1; + sc->txdataio->a_iov[0].iov_buf = data; + sc->txdataio->a_iov[0].iov_len = size; + nni_http_write_full(sc->http, sc->txdataio); + return; + } + + if (sc->close) { + http_sconn_close(sc); + return; + } + + if (sc->res != NULL) { + nni_http_res_fini(sc->res); + sc->res = NULL; + } + nni_http_req_reset(sc->req); + nni_http_read_req(sc->http, sc->req, sc->rxaio); +} + +static char +http_hexval(char c) +{ + if ((c >= '0') && (c <= '9')) { + return (c - '0'); + } + if ((c >= 'a') && (c <= 'f')) { + return ((c - 'a') + 10); + } + if ((c >= 'A') && (c <= 'F')) { + return ((c - 'A') + 10); + } + return (0); +} + +static char * +http_uri_canonify(char *path) +{ + char *tmp; + char *dst; + + // Chomp off query string. + if ((tmp = strchr(path, '?')) != NULL) { + *tmp = '\0'; + } + // If the URI was absolute, make it relative. + if ((strncasecmp(path, "http://", strlen("http://")) == 0) || + (strncasecmp(path, "https://", strlen("https://")) == 0)) { + // Skip past the :// + path = strchr(path, ':'); + path += 3; + + // scan for the end of the host, distinguished by a / + // path delimiter. There might not be one, in which case + // the whole thing is the host and we assume the path is + // just /. + if ((path = strchr(path, '/')) == NULL) { + return ("/"); + } + } + + // Now we have to unescape things. Unescaping is a shrinking + // operation (strictly), so this is safe. This is just URL decode. + // Note that paths with an embedded NUL are going to be treated as + // though truncated. Don't be that guy that sends %00 in a URL. + tmp = path; + dst = path; + while (*tmp != '\0') { + char c; + if ((c = *tmp) != '%') { + *dst++ = c; + tmp++; + continue; + } + if (isxdigit(tmp[1]) && isxdigit(tmp[2])) { + c = http_hexval(tmp[1]); + c *= 16; + c += http_hexval(tmp[2]); + *dst++ = c; + tmp += 3; + } + // garbage in, garbage out + *dst++ = c; + tmp++; + } + *dst = '\0'; + return (path); +} + +static void +http_sconn_error(http_sconn *sc, uint16_t err) +{ + nni_http_res *res; + + if (nni_http_res_init_error(&res, err) != 0) { + http_sconn_close(sc); + return; + } + + sc->res = res; + nni_http_write_res(sc->http, res, sc->txaio); +} + +static void +http_sconn_rxdone(void *arg) +{ + http_sconn * sc = arg; + nni_http_server *s = sc->server; + nni_aio * aio = sc->rxaio; + int rv; + http_handler * h; + const char * val; + nni_http_req * req = sc->req; + char * uri; + size_t urisz; + char * path; + char * tmp; + bool badmeth = false; + + if ((rv = nni_aio_result(aio)) != 0) { + http_sconn_close(sc); + return; + } + + // Validate the request -- it has to at least look like HTTP 1.x + // We flatly refuse to deal with HTTP 0.9, and we can't cope with + // HTTP/2. + if ((val = nni_http_req_get_version(req)) == NULL) { + sc->close = true; + http_sconn_error(sc, NNI_HTTP_STATUS_BAD_REQUEST); + return; + } + if (strncmp(val, "HTTP/1.", 7) != 0) { + sc->close = true; + http_sconn_error(sc, NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP); + return; + } + if (strcmp(val, "HTTP/1.1") != 0) { + // We treat HTTP/1.0 connections as non-persistent. + // No effort is made to handle "persistent" HTTP/1.0 + // since that was not standard. (Everyone is at 1.1 now + // anyways.) + sc->close = true; + } + + // If the connection was 1.0, or a connection: close was requested, + // then mark this close on our end. + if ((val = nni_http_req_get_header(req, "Connection")) != NULL) { + // HTTP 1.1 says these have to be case insensitive (7230) + if (nni_strcasestr(val, "close") != NULL) { + // In theory this could falsely match some other weird + // connection header that included the word close not + // as part of a whole token. No such legal definitions + // exist, and so anyone who does that gets what they + // deserve. (Fairly harmless actually, since it only + // prevents persistent connections.) + sc->close = true; + } + } + + val = nni_http_req_get_uri(req); + urisz = strlen(val) + 1; + if ((uri = nni_alloc(urisz)) == NULL) { + http_sconn_close(sc); // out of memory + return; + } + strncpy(uri, val, urisz); + path = http_uri_canonify(uri); + + NNI_LIST_FOREACH (&s->handlers, h) { + size_t len; + if (h->h_host != NULL) { + val = nni_http_req_get_header(req, "Host"); + if (val == NULL) { + // We insist on a matching Host: line for + // virtual hosting. This leaves HTTP/1.0 + // out in the cold basically. + continue; + } + + // A few ways hosts can match. They might have + // a port attached -- we ignore that. (We don't + // run multiple ports, so if you got here, presumably + // the port at least is correct!) It might also have + // a lone trailing dot, so that is ok too. + + // Ignore the trailing dot if the handler supplied it. + len = strlen(h->h_host); + if ((len > 0) && (h->h_host[len - 1] == '.')) { + len--; + } + if ((nni_strncasecmp(val, h->h_host, len) != 0)) { + continue; + } + if ((val[len] != '\0') && (val[len] != ':') && + ((val[len] != '.') || (val[len + 1] != '\0'))) { + continue; + } + } + + NNI_ASSERT(h->h_method != NULL); + + len = strlen(h->h_path); + if (strncmp(path, h->h_path, len) != 0) { + continue; + } + switch (path[len]) { + case '\0': + break; + case '/': + if ((path[len + 1] != '\0') && (!h->h_is_dir)) { + // trailing component and not a directory. + // Note that this should force a failure. + continue; + } + break; + default: + continue; // some other substring, not matched. + } + + // So, what about the method? + val = nni_http_req_get_method(req); + if (strcmp(val, h->h_method) == 0) { + break; + } + // HEAD is remapped to GET. + if ((strcmp(val, "HEAD") == 0) && + (strcmp(h->h_method, "GET") == 0)) { + break; + } + badmeth = 1; + } + + nni_free(uri, urisz); + if (h == NULL) { + if (badmeth) { + http_sconn_error( + sc, NNI_HTTP_STATUS_METHOD_NOT_ALLOWED); + } else { + http_sconn_error(sc, NNI_HTTP_STATUS_NOT_FOUND); + } + return; + } + + nni_aio_set_input(sc->cbaio, 0, sc->http); + nni_aio_set_input(sc->cbaio, 1, sc->req); + nni_aio_set_input(sc->cbaio, 2, h->h_arg); + nni_aio_set_data(sc->cbaio, 1, h); + + // Technically, probably callback should initialize this with + // start, but we do it instead. + + if (nni_aio_start(sc->cbaio, NULL, NULL) == 0) { + h->h_cb(sc->cbaio); + } +} + +static void +http_sconn_cbdone(void *arg) +{ + http_sconn * sc = arg; + nni_aio * aio = sc->cbaio; + nni_http_res *res; + http_handler *h; + + if (nni_aio_result(aio) != 0) { + // Hard close, no further feedback. + http_sconn_close(sc); + return; + } + + h = nni_aio_get_data(aio, 1); + res = nni_aio_get_output(aio, 0); + + // If its an upgrader, and they didn't give us back a response, it + // means that they took over, and we should just discard this session, + // without closing the underlying channel. + if ((h->h_is_upgrader) && (res == NULL)) { + sc->http = NULL; // the underlying HTTP is not closed + sc->req = NULL; + sc->res = NULL; + http_sconn_close(sc); // discard server session though + return; + } + if (res != NULL) { + + const char *val; + val = nni_http_res_get_header(res, "Connection"); + if ((val != NULL) && (strstr(val, "close") != NULL)) { + sc->close = true; + } + if (sc->close) { + nni_http_res_set_header(res, "Connection", "close"); + } + sc->res = res; + nni_http_write_res(sc->http, res, sc->txaio); + } else if (sc->close) { + http_sconn_close(sc); + } else { + // Presumably client already sent a response. + // Wait for another request. + nni_http_req_reset(sc->req); + nni_http_read_req(sc->http, sc->req, sc->rxaio); + } +} + +static int +http_sconn_init(http_sconn **scp, nni_plat_tcp_pipe *tcp) +{ + http_sconn *sc; + int rv; + + if ((sc = NNI_ALLOC_STRUCT(sc)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_http_req_init(&sc->req)) != 0) || + ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) || + ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) || + ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) != + 0) || + ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) { + // Can't even accept the incoming request. Hard close. + http_sconn_close(sc); + return (rv); + } + // XXX: for HTTPS we would then try to do the TLS negotiation here. + // That would use a different set of tran values. + + sc->tran.h_data = tcp; + sc->tran.h_read = (void *) nni_plat_tcp_pipe_recv; + sc->tran.h_write = (void *) nni_plat_tcp_pipe_send; + sc->tran.h_close = (void *) nni_plat_tcp_pipe_close; // close implied + sc->tran.h_fini = (void *) nni_plat_tcp_pipe_fini; + + if ((rv = nni_http_init(&sc->http, &sc->tran)) != 0) { + http_sconn_close(sc); + return (rv); + } + *scp = sc; + return (0); +} + +static void +http_server_acccb(void *arg) +{ + nni_http_server * s = arg; + nni_aio * aio = s->accaio; + nni_plat_tcp_pipe *tcp; + http_sconn * sc; + int rv; + + if ((rv = nni_aio_result(aio)) != 0) { + if (rv == NNG_ECLOSED) { + return; + } + // try again? + nni_plat_tcp_ep_accept(s->tep, s->accaio); + return; + } + tcp = nni_aio_get_pipe(aio); + if (http_sconn_init(&sc, tcp) != 0) { + nni_plat_tcp_pipe_close(tcp); + nni_plat_tcp_pipe_fini(tcp); + + nni_plat_tcp_ep_accept(s->tep, s->accaio); + return; + } + nni_mtx_lock(&s->mtx); + sc->server = s; + if (s->closed) { + nni_http_close(sc->http); + sc->closed = true; + nni_list_append(&s->reaps, sc); + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_append(&s->conns, sc); + nni_mtx_unlock(&s->mtx); + + nni_http_read_req(sc->http, sc->req, sc->rxaio); + nni_plat_tcp_ep_accept(s->tep, s->accaio); +} + +static void +http_server_cleanup(void *arg) +{ + nni_http_server *s = arg; + http_sconn * sc; + nni_mtx_lock(&s->mtx); + while ((sc = nni_list_first(&s->reaps)) != NULL) { + nni_list_remove(&s->reaps, sc); + nni_mtx_unlock(&s->mtx); + http_sconn_fini(sc); + nni_mtx_lock(&s->mtx); + } + if (nni_list_empty(&s->reaps) && nni_list_empty(&s->conns)) { + nni_cv_wake(&s->cv); + } + nni_mtx_unlock(&s->mtx); +} + +static void +http_handler_fini(http_handler *h) +{ + nni_strfree(h->h_path); + nni_strfree(h->h_host); + nni_strfree(h->h_method); + if (h->h_free != NULL) { + h->h_free(h->h_arg); + } + NNI_FREE_STRUCT(h); +} + +static void +http_server_fini(nni_http_server *s) +{ + http_handler *h; + + nni_mtx_lock(&s->mtx); + while ((!nni_list_empty(&s->conns)) || (!nni_list_empty(&s->reaps))) { + nni_cv_wait(&s->cv); + } + if (s->tep != NULL) { + nni_plat_tcp_ep_fini(s->tep); + } + while ((h = nni_list_first(&s->handlers)) != NULL) { + nni_list_remove(&s->handlers, h); + http_handler_fini(h); + } + nni_mtx_unlock(&s->mtx); + nni_task_wait(&s->cleanup); + nni_aio_fini(s->accaio); + nni_cv_fini(&s->cv); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +void +nni_http_server_fini(nni_http_server *s) +{ + nni_mtx_lock(&http_servers_lk); + s->refcnt--; + if (s->refcnt == 0) { + nni_list_remove(&http_servers, s); + http_server_fini(s); + } + nni_mtx_unlock(&http_servers_lk); +} + +static int +http_server_init(nni_http_server **serverp, nng_sockaddr *sa) +{ + nni_http_server *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + nni_cv_init(&s->cv, &s->mtx); + nni_task_init(NULL, &s->cleanup, http_server_cleanup, s); + NNI_LIST_INIT(&s->handlers, http_handler, node); + NNI_LIST_INIT(&s->conns, http_sconn, node); + NNI_LIST_INIT(&s->reaps, http_sconn, node); + if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) { + http_server_fini(s); + return (rv); + } + s->addr = *sa; + *serverp = s; + return (0); +} + +int +nni_http_server_init(nni_http_server **serverp, nng_sockaddr *sa) +{ + int rv; + nni_http_server *s; + + nni_initialize(&http_server_initializer); + + nni_mtx_lock(&http_servers_lk); + NNI_LIST_FOREACH (&http_servers, s) { + switch (sa->s_un.s_family) { + case NNG_AF_INET: + if (memcmp(&s->addr.s_un.s_in, &sa->s_un.s_in, + sizeof(sa->s_un.s_in)) == 0) { + *serverp = s; + s->refcnt++; + nni_mtx_unlock(&http_servers_lk); + return (0); + } + break; + case NNG_AF_INET6: + if (memcmp(&s->addr.s_un.s_in6, &sa->s_un.s_in6, + sizeof(sa->s_un.s_in6)) == 0) { + *serverp = s; + s->refcnt++; + nni_mtx_unlock(&http_servers_lk); + return (0); + } + break; + } + } + + // We didn't find a server, try to make a new one. + if ((rv = http_server_init(&s, sa)) == 0) { + s->addr = *sa; + s->refcnt = 1; + nni_list_append(&http_servers, s); + *serverp = s; + } + + nni_mtx_unlock(&http_servers_lk); + return (rv); +} + +static int +http_server_start(nni_http_server *s) +{ + int rv; + + rv = nni_plat_tcp_ep_init(&s->tep, &s->addr, NULL, NNI_EP_MODE_LISTEN); + if (rv != 0) { + return (rv); + } + if ((rv = nni_plat_tcp_ep_listen(s->tep)) != 0) { + nni_plat_tcp_ep_fini(s->tep); + s->tep = NULL; + return (rv); + } + nni_plat_tcp_ep_accept(s->tep, s->accaio); + return (0); +} + +int +nni_http_server_start(nni_http_server *s) +{ + int rv = 0; + + nni_mtx_lock(&s->mtx); + if (s->starts == 0) { + rv = http_server_start(s); + } + if (rv == 0) { + s->starts++; + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static void +http_server_stop(nni_http_server *s) +{ + http_sconn *sc; + + if (s->closed) { + return; + } + + s->closed = true; + // Close the TCP endpoint that is listening. + if (s->tep) { + nni_plat_tcp_ep_close(s->tep); + } + + // This marks the server as "shutting down" -- existing + // connections finish their activity and close. + // + // XXX: figure out how to shut down connections that are + // blocked waiting to receive a request. We won't do this for + // upgraded connections... + NNI_LIST_FOREACH (&s->conns, sc) { + sc->close = true; + } +} + +void +nni_http_server_stop(nni_http_server *s) +{ + nni_mtx_lock(&s->mtx); + s->starts--; + if (s->starts == 0) { + http_server_stop(s); + } + nni_mtx_unlock(&s->mtx); +} + +int +http_server_add_handler(void **hp, nni_http_server *s, nni_http_handler *hh, + void *arg, void (*freeit)(void *)) +{ + http_handler *h, *h2; + size_t l1, l2; + + // Must have a legal method (and not one that is HEAD), path, + // and handler. (The reason HEAD is verboten is that we supply + // it automatically as part of GET support.) + if ((hh->h_method == NULL) || (hh->h_path == NULL) || + (hh->h_cb == NULL) || (strcmp(hh->h_method, "HEAD") == 0)) { + return (NNG_EINVAL); + } + if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { + return (NNG_ENOMEM); + } + h->h_arg = arg; + h->h_cb = hh->h_cb; + h->h_is_dir = hh->h_is_dir; + h->h_is_upgrader = hh->h_is_upgrader; + h->h_free = freeit; + + // Ignore the port part of the host. + if (hh->h_host != NULL) { + int rv; + rv = nni_tran_parse_host_port(hh->h_host, &h->h_host, NULL); + if (rv != 0) { + http_handler_fini(h); + return (rv); + } + } + + if (((h->h_method = nni_strdup(hh->h_method)) == NULL) || + ((h->h_path = nni_strdup(hh->h_path)) == NULL)) { + http_handler_fini(h); + return (NNG_ENOMEM); + } + + l1 = strlen(h->h_path); + // Chop off trailing "/" + while (l1 > 0) { + if (h->h_path[l1 - 1] != '/') { + break; + } + l1--; + h->h_path[l1] = '\0'; + } + + nni_mtx_lock(&s->mtx); + // General rule for finding a conflict is that if either string + // is a strict substring of the other, then we have a + // collision. (But only if the methods match, and the host + // matches.) Note that a wild card host matches both. + NNI_LIST_FOREACH (&s->handlers, h2) { + if ((h2->h_host != NULL) && (h->h_host != NULL) && + (strcasecmp(h2->h_host, h->h_host) != 0)) { + // Hosts don't match, so we are safe. + continue; + } + if (strcmp(h2->h_method, h->h_method) != 0) { + // Different methods, so again we are fine. + continue; + } + l2 = strlen(h2->h_path); + if (l1 < l2) { + l2 = l1; + } + if (strncmp(h2->h_path, h->h_path, l2) == 0) { + // Path collision. NNG_EADDRINUSE. + nni_mtx_unlock(&s->mtx); + http_handler_fini(h); + return (NNG_EADDRINUSE); + } + } + nni_list_append(&s->handlers, h); + nni_mtx_unlock(&s->mtx); + if (hp != NULL) { + *hp = h; + } + return (0); +} + +int +nni_http_server_add_handler( + void **hp, nni_http_server *s, nni_http_handler *hh, void *arg) +{ + return (http_server_add_handler(hp, s, hh, arg, NULL)); +} + +void +nni_http_server_del_handler(nni_http_server *s, void *harg) +{ + http_handler *h = harg; + + nni_mtx_lock(&s->mtx); + nni_list_node_remove(&h->node); + nni_mtx_unlock(&s->mtx); + + http_handler_fini(h); +} + +// Very limited MIME type map. Used only if the handler does not +// supply it's own. +static struct content_map { + const char *ext; + const char *typ; +} content_map[] = { + // clang-format off + { ".ai", "application/postscript" }, + { ".aif", "audio/aiff" }, + { ".aiff", "audio/aiff" }, + { ".avi", "video/avi" }, + { ".au", "audio/basic" }, + { ".bin", "application/octet-stream" }, + { ".bmp", "image/bmp" }, + { ".css", "text/css" }, + { ".eps", "application/postscript" }, + { ".gif", "image/gif" }, + { ".htm", "text/html" }, + { ".html", "text/html" }, + { ".ico", "image/x-icon" }, + { ".jpeg", "image/jpeg" }, + { ".jpg", "image/jpeg" }, + { ".js", "application/javascript" }, + { ".md", "text/markdown" }, + { ".mp2", "video/mpeg" }, + { ".mp3", "audio/mpeg3" }, + { ".mpeg", "video/mpeg" }, + { ".mpg", "video/mpeg" }, + { ".pdf", "application/pdf" }, + { ".png", "image/png" }, + { ".ps", "application/postscript" }, + { ".rtf", "text/rtf" }, + { ".text", "text/plain" }, + { ".tif", "image/tiff" }, + { ".tiff", "image/tiff" }, + { ".txt", "text/plain" }, + { ".wav", "audio/wav"}, + { "README", "text/plain" }, + { NULL, NULL }, + // clang-format on +}; + +const char * +http_lookup_type(const char *path) +{ + size_t l1 = strlen(path); + for (int i = 0; content_map[i].ext != NULL; i++) { + size_t l2 = strlen(content_map[i].ext); + if (l2 > l1) { + continue; + } + if (strcasecmp(&path[l1 - l2], content_map[i].ext) == 0) { + return (content_map[i].typ); + } + } + return (NULL); +} + +typedef struct http_file { + char *typ; + char *pth; +} http_file; + +static void +http_handle_file(nni_aio *aio) +{ + http_file * f = nni_aio_get_input(aio, 2); + nni_http_res *res = NULL; + void * data; + size_t size; + int rv; + + if ((rv = nni_plat_file_get(f->pth, &data, &size)) != 0) { + uint16_t status; + switch (rv) { + case NNG_ENOMEM: + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + break; + case NNG_ENOENT: + status = NNI_HTTP_STATUS_NOT_FOUND; + break; + case NNG_EPERM: + status = NNI_HTTP_STATUS_FORBIDDEN; + break; + default: + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + break; + } + if ((rv = nni_http_res_init_error(&res, status)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + } else { + if (((rv = nni_http_res_init(&res)) != 0) || + ((rv = nni_http_res_set_status( + res, NNI_HTTP_STATUS_OK, "OK")) != 0) || + ((rv = nni_http_res_set_header( + res, "Content-Type", f->typ)) != 0) || + ((rv = nni_http_res_set_data(res, data, size)) != 0)) { + nni_free(data, size); + nni_aio_finish_error(aio, rv); + return; + } + } + nni_aio_set_output(aio, 0, res); + nni_aio_finish(aio, 0, 0); +} + +static void +http_free_file(void *arg) +{ + http_file *f = arg; + nni_strfree(f->pth); + nni_strfree(f->typ); + NNI_FREE_STRUCT(f); +} + +int +nni_http_server_add_file(nni_http_server *s, const char *host, + const char *ctype, const char *uri, const char *path) +{ + nni_http_handler h; + http_file * f; + int rv; + + if ((f = NNI_ALLOC_STRUCT(f)) == NULL) { + return (NNG_ENOMEM); + } + if (ctype == NULL) { + ctype = http_lookup_type(path); + } + + if (((f->pth = nni_strdup(path)) == NULL) || + ((ctype != NULL) && ((f->typ = nni_strdup(ctype)) == NULL))) { + http_free_file(f); + return (NNG_ENOMEM); + } + h.h_method = "GET"; + h.h_path = uri; + h.h_host = host; + h.h_cb = http_handle_file; + h.h_is_dir = false; + h.h_is_upgrader = false; + + if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_file)) != + 0) { + http_free_file(f); + return (rv); + } + return (0); +} + +typedef struct http_static { + char * typ; + void * data; + size_t size; +} http_static; + +static void +http_handle_static(nni_aio *aio) +{ + http_static * s = nni_aio_get_input(aio, 2); + nni_http_res *r = NULL; + int rv; + + if (((rv = nni_http_res_init(&r)) != 0) || + ((rv = nni_http_res_set_header(r, "Content-Type", s->typ)) != 0) || + ((rv = nni_http_res_set_status(r, NNI_HTTP_STATUS_OK, "OK")) != + 0) || + ((rv = nni_http_res_set_data(r, s->data, s->size)) != 0)) { + nni_aio_finish_error(aio, rv); + return; + } + + nni_aio_set_output(aio, 0, r); + nni_aio_finish(aio, 0, 0); +} + +static void +http_free_static(void *arg) +{ + http_static *s = arg; + nni_strfree(s->typ); + nni_free(s->data, s->size); + NNI_FREE_STRUCT(s); +} + +int +nni_http_server_add_static(nni_http_server *s, const char *host, + const char *ctype, const char *uri, const void *data, size_t size) +{ + nni_http_handler h; + http_static * f; + int rv; + + if ((f = NNI_ALLOC_STRUCT(f)) == NULL) { + return (NNG_ENOMEM); + } + if (ctype == NULL) { + ctype = "application/octet-stream"; + } + if (((f->data = nni_alloc(size)) == NULL) || + ((f->typ = nni_strdup(ctype)) == NULL)) { + http_free_static(f); + return (NNG_ENOMEM); + } + + f->size = size; + memcpy(f->data, data, size); + + h.h_method = "GET"; + h.h_path = uri; + h.h_host = host; + h.h_cb = http_handle_static; + h.h_is_dir = false; + h.h_is_upgrader = false; + + if ((rv = http_server_add_handler(NULL, s, &h, f, http_free_static)) != + 0) { + http_free_static(f); + return (rv); + } + return (0); +} + +static int +http_server_sys_init(void) +{ + NNI_LIST_INIT(&http_servers, nni_http_server, node); + nni_mtx_init(&http_servers_lk); + return (0); +} + +static void +http_server_sys_fini(void) +{ + nni_mtx_fini(&http_servers_lk); +}
\ No newline at end of file diff --git a/src/supplemental/mbedtls/tls.c b/src/supplemental/mbedtls/tls.c index a33c72b4..94503df5 100644 --- a/src/supplemental/mbedtls/tls.c +++ b/src/supplemental/mbedtls/tls.c @@ -417,7 +417,7 @@ nni_tls_send_cb(void *ctx) aio->a_niov = 1; aio->a_iov[0].iov_buf = tp->sendbuf + tp->sendoff; aio->a_iov[0].iov_len = tp->sendlen; - nni_aio_set_timeout(aio, -1); // No timeout. + nni_aio_set_timeout(aio, NNG_DURATION_INFINITE); nni_plat_tcp_pipe_send(tp->tcp, aio); nni_mtx_unlock(&tp->lk); return; @@ -455,7 +455,7 @@ nni_tls_recv_start(nni_tls *tp) aio->a_niov = 1; aio->a_iov[0].iov_buf = tp->recvbuf; aio->a_iov[0].iov_len = NNG_TLS_MAX_RECV_SIZE; - nni_aio_set_timeout(tp->tcp_recv, -1); // No timeout. + nni_aio_set_timeout(tp->tcp_recv, NNG_DURATION_INFINITE); nni_plat_tcp_pipe_recv(tp->tcp, aio); } @@ -526,7 +526,7 @@ nni_tls_net_send(void *ctx, const unsigned char *buf, size_t len) tp->tcp_send->a_niov = 1; tp->tcp_send->a_iov[0].iov_buf = tp->sendbuf; tp->tcp_send->a_iov[0].iov_len = len; - nni_aio_set_timeout(tp->tcp_send, -1); // No timeout. + nni_aio_set_timeout(tp->tcp_send, NNG_DURATION_INFINITE); nni_plat_tcp_pipe_send(tp->tcp, tp->tcp_send); return (len); } diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt new file mode 100644 index 00000000..f3283257 --- /dev/null +++ b/src/supplemental/websocket/CMakeLists.txt @@ -0,0 +1,14 @@ +# +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +set(WEBSOCKET_SOURCES + supplemental/websocket/websocket.c + supplemental/websocket/websocket.h) +set(NNG_SOURCES ${NNG_SOURCES} ${WEBSOCKET_SOURCES} PARENT_SCOPE) diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c new file mode 100644 index 00000000..fe0a9bd9 --- /dev/null +++ b/src/supplemental/websocket/websocket.c @@ -0,0 +1,1935 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "supplemental/base64/base64.h" +#include "supplemental/http/http.h" +#include "supplemental/sha1/sha1.h" + +#include "websocket.h" + +// Pre-defined types for some prototypes. These are from other subsystems. +typedef struct ws_frame ws_frame; +typedef struct ws_msg ws_msg; + +struct nni_ws { + int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN + nni_list_node node; + nni_reap_item reap; + bool closed; + bool ready; + bool wclose; + nni_mtx mtx; + nni_list txmsgs; + nni_list rxmsgs; + ws_frame * txframe; + ws_frame * rxframe; + nni_aio * txaio; // physical aios + nni_aio * rxaio; + nni_aio * closeaio; + nni_aio * httpaio; // server only, HTTP reply pending + nni_http * http; + nni_http_req *req; + nni_http_res *res; + size_t maxframe; + size_t fragsize; +}; + +struct nni_ws_listener { + nni_tls_config * tls; + nni_http_server * server; + char * proto; + char * url; + char * host; + char * serv; + char * path; + nni_mtx mtx; + nni_list pend; + nni_list reply; + nni_list aios; + bool started; + bool closed; + void * hp; // handler pointer + nni_http_handler handler; + nni_ws_listen_hook hookfn; + void * hookarg; +}; + +// The dialer tracks user aios in two lists. The first list is for aios +// waiting for the http connection to be established, while the second +// are waiting for the HTTP negotiation to complete. We keep two lists +// so we know whether to initiate another outgoing connection after the +// completion of an earlier connection. (We don't want to establish +// requests when we already have connects negotiating.) +struct nni_ws_dialer { + nni_tls_config * tls; + nni_http_req * req; + nni_http_res * res; + nni_http_client *client; + nni_mtx mtx; + nni_aio * conaio; + char * proto; + char * host; + char * serv; + char * path; + char * qinfo; + char * addr; // full address (a URL really) + char * uri; // path + query + nni_list conaios; // user aios waiting for connect. + nni_list httpaios; // user aios waiting for HTTP nego. + bool started; + bool closed; + nng_sockaddr sa; +}; + +typedef enum ws_type { + WS_CONT = 0x0, + WS_TEXT = 0x1, + WS_BINARY = 0x2, + WS_CLOSE = 0x8, + WS_PING = 0x9, + WS_PONG = 0xA, +} ws_type; + +typedef enum ws_reason { + WS_CLOSE_NORMAL_CLOSE = 1000, + WS_CLOSE_GOING_AWAY = 1001, + WS_CLOSE_PROTOCOL_ERR = 1002, + WS_CLOSE_UNSUPP_FORMAT = 1003, + WS_CLOSE_INVALID_DATA = 1007, + WS_CLOSE_POLICY = 1008, + WS_CLOSE_TOO_BIG = 1009, + WS_CLOSE_NO_EXTENSION = 1010, + WS_CLOSE_INTERNAL = 1011, +} ws_reason; + +struct ws_frame { + nni_list_node node; + uint8_t head[14]; // maximum header size + uint8_t mask[4]; // read by server, sent by client + uint8_t sdata[125]; // short data (for short frames only) + size_t hlen; // header length + size_t len; // payload length + enum ws_type op; + bool final; + bool masked; + size_t bufsz; // allocated size + uint8_t * buf; + ws_msg * wmsg; +}; + +struct ws_msg { + nni_list frames; + nni_list_node node; + nni_ws * ws; + nni_msg * msg; + nni_aio * aio; +}; + +static void ws_send_close(nni_ws *ws, uint16_t code); + +// This looks, case independently for a word in a list, which is either +// space or comma separated. +static bool +ws_contains_word(const char *phrase, const char *word) +{ + size_t len = strlen(word); + + while ((phrase != NULL) && (*phrase != '\0')) { + if ((nni_strncasecmp(phrase, word, len) == 0) && + ((phrase[len] == 0) || (phrase[len] == ' ') || + (phrase[len] == ','))) { + return (true); + } + // Skip to next word. + if ((phrase = strchr(phrase, ' ')) != NULL) { + while ((*phrase == ' ') || (*phrase == ',')) { + phrase++; + } + } + } + return (false); +} + +// input is base64 challenge, output is the accepted. input should be +// 24 character base 64, output is 28 character base64 reply. (output +// must be large enough to hold 29 bytes to allow for termination.) +// Returns 0 on success, NNG_EINVAL if the input is malformed somehow. +static int +ws_make_accept(const char *key, char *accept) +{ + uint8_t rawkey[16]; + uint8_t digest[20]; + nni_sha1_ctx ctx; + +#define WS_KEY_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" +#define WS_KEY_GUIDLEN 36 + + if ((strlen(key) != 24) || + (nni_base64_decode(key, 24, rawkey, 16) != 16)) { + return (NNG_EINVAL); + } + + nni_sha1_init(&ctx); + nni_sha1_update(&ctx, rawkey, 16); + nni_sha1_update(&ctx, (uint8_t *) WS_KEY_GUID, WS_KEY_GUIDLEN); + nni_sha1_final(&ctx, digest); + + nni_base64_encode(digest, 20, accept, 28); + accept[28] = '\0'; + return (0); +} + +static void +ws_frame_fini(ws_frame *frame) +{ + if (frame->bufsz) { + nni_free(frame->buf, frame->bufsz); + } + NNI_FREE_STRUCT(frame); +} + +static void +ws_msg_fini(ws_msg *wm) +{ + ws_frame *frame; + + NNI_ASSERT(!nni_list_node_active(&wm->node)); + while ((frame = nni_list_first(&wm->frames)) != NULL) { + nni_list_remove(&wm->frames, frame); + ws_frame_fini(frame); + } + + if (wm->msg != NULL) { + nni_msg_free(wm->msg); + } + NNI_FREE_STRUCT(wm); +} + +static void +ws_mask_frame(ws_frame *frame) +{ + uint32_t r; + // frames sent by client need mask. + if (frame->masked) { + return; + } + r = nni_random(); + NNI_PUT32(frame->mask, r); + for (int i = 0; i < frame->len; i++) { + frame->buf[i] ^= frame->mask[i % 4]; + } + memcpy(frame->head + frame->hlen, frame->mask, 4); + frame->hlen += 4; + frame->head[1] |= 0x80; // set masked bit + frame->masked = true; +} + +static void +ws_unmask_frame(ws_frame *frame) +{ + // frames sent by client need mask. + if (!frame->masked) { + return; + } + for (int i = 0; i < frame->len; i++) { + frame->buf[i] ^= frame->mask[i % 4]; + } + frame->hlen -= 4; + frame->head[1] &= 0x7f; // clear masked bit + frame->masked = false; +} + +static int +ws_msg_init_control( + ws_msg **wmp, nni_ws *ws, uint8_t op, const uint8_t *buf, size_t len) +{ + ws_msg * wm; + ws_frame *frame; + + if (len > 125) { + return (NNG_EINVAL); + } + + if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { + return (NNG_ENOMEM); + } + + if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { + ws_msg_fini(wm); + return (NNG_ENOMEM); + } + + NNI_LIST_INIT(&wm->frames, ws_frame, node); + memcpy(frame->sdata, buf, len); + + nni_list_append(&wm->frames, frame); + frame->wmsg = wm; + frame->len = len; + frame->final = true; + frame->op = op; + frame->head[0] = op | 0x80; // final frame (control) + frame->head[1] = len & 0x7F; + frame->hlen = 2; + frame->buf = frame->sdata; + frame->bufsz = 0; + + if (ws->mode == NNI_EP_MODE_DIAL) { + ws_mask_frame(frame); + } else { + frame->masked = false; + } + + wm->aio = NULL; + wm->ws = ws; + *wmp = wm; + return (0); +} + +static int +ws_msg_init_tx(ws_msg **wmp, nni_ws *ws, nni_msg *msg, nni_aio *aio) +{ + ws_msg * wm; + size_t len; + size_t maxfrag = ws->fragsize; // make this tunable. (1MB default) + uint8_t *buf; + uint8_t op; + + // If the message has a header, move it to front of body. Most of + // the time this will not cause a reallocation (there should be + // headroom). Doing this simplifies our framing, and avoids sending + // tiny frames for headers. + if ((len = nni_msg_header_len(msg)) != 0) { + int rv; + buf = nni_msg_header(msg); + if ((rv = nni_msg_insert(msg, buf, len)) != 0) { + return (rv); + } + nni_msg_header_clear(msg); + } + + if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&wm->frames, ws_frame, node); + + len = nni_msg_len(msg); + buf = nni_msg_body(msg); + op = WS_BINARY; // to start -- no support for sending TEXT frames + + // do ... while because we want at least one frame (even for empty + // messages.) Headers get their own frame, if present. Best bet + // is to try not to have a header when coming here. + do { + ws_frame *frame; + + if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { + ws_msg_fini(wm); + return (NNG_ENOMEM); + } + nni_list_append(&wm->frames, frame); + frame->wmsg = wm; + frame->len = len > maxfrag ? maxfrag : len; + frame->buf = buf; + frame->op = op; + + buf += frame->len; + len -= frame->len; + op = WS_CONT; + + if (len == 0) { + frame->final = true; + } + frame->head[0] = frame->op; + frame->hlen = 2; + if (frame->final) { + frame->head[0] |= 0x80; // final frame bit + } + if (frame->len < 126) { + frame->head[1] = frame->len & 0x7f; + } else if (frame->len < 65536) { + frame->head[1] = 126; + NNI_PUT16(frame->head + 2, (frame->len & 0xffff)); + frame->hlen += 2; + } else { + frame->head[1] = 127; + NNI_PUT64(frame->head + 2, (uint64_t) frame->len); + frame->hlen += 8; + } + + if (ws->mode == NNI_EP_MODE_DIAL) { + ws_mask_frame(frame); + } else { + frame->masked = false; + } + + } while (len); + + wm->msg = msg; + wm->aio = aio; + wm->ws = ws; + *wmp = wm; + return (0); +} + +static int +ws_msg_init_rx(ws_msg **wmp, nni_ws *ws, nni_aio *aio) +{ + ws_msg *wm; + + if ((wm = NNI_ALLOC_STRUCT(wm)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&wm->frames, ws_frame, node); + wm->aio = aio; + wm->ws = ws; + *wmp = wm; + return (0); +} + +static void +ws_close_cb(void *arg) +{ + nni_ws *ws = arg; + ws_msg *wm; + + // Either we sent a close frame, or we didn't. Either way, + // we are done, and its time to abort everything else. + nni_mtx_lock(&ws->mtx); + + nni_http_close(ws->http); + nni_aio_cancel(ws->txaio, NNG_ECLOSED); + nni_aio_cancel(ws->rxaio, NNG_ECLOSED); + + // This list (receive) should be empty. + while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { + nni_list_remove(&ws->rxmsgs, wm); + if (wm->aio) { + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + + while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { + nni_list_remove(&ws->txmsgs, wm); + if (wm->aio) { + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + + if (ws->rxframe != NULL) { + ws_frame_fini(ws->rxframe); + ws->rxframe = NULL; + } + + // Any txframe should have been killed with its wmsg. + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_close(nni_ws *ws, uint16_t code) +{ + ws_msg *wm; + + // Receive stuff gets aborted always. No further receives + // once we get a close. + while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { + nni_list_remove(&ws->rxmsgs, wm); + if (wm->aio) { + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + + // If were closing "gracefully", then don't abort in-flight + // stuff yet. Note that reads should have stopped already. + if (!ws->closed) { + ws_send_close(ws, code); + return; + } +} + +static void +ws_start_write(nni_ws *ws) +{ + ws_frame *frame; + ws_msg * wm; + + if ((ws->txframe != NULL) || (!ws->ready)) { + return; // busy + } + + if ((wm = nni_list_first(&ws->txmsgs)) == NULL) { + // Nothing to send. + return; + } + + frame = nni_list_first(&wm->frames); + NNI_ASSERT(frame != NULL); + + // Push it out. + ws->txframe = frame; + ws->txaio->a_niov = frame->len > 0 ? 2 : 1; + ws->txaio->a_iov[0].iov_len = frame->hlen; + ws->txaio->a_iov[0].iov_buf = frame->head; + if (frame->len > 0) { + ws->txaio->a_iov[1].iov_len = frame->len; + ws->txaio->a_iov[1].iov_buf = frame->buf; + } + nni_http_write_full(ws->http, ws->txaio); +} + +static void +ws_cancel_close(nni_aio *aio, int rv) +{ + nni_ws *ws = aio->a_prov_data; + nni_mtx_lock(&ws->mtx); + if (ws->wclose) { + ws->wclose = false; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_write_cb(void *arg) +{ + nni_ws * ws = arg; + ws_frame *frame; + ws_msg * wm; + nni_aio * aio; + int rv; + + nni_mtx_lock(&ws->mtx); + + if (ws->txframe->op == WS_CLOSE) { + // If this was a close frame, we are done. + // No other messages may succeed.. + while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { + nni_list_remove(&ws->txmsgs, wm); + if (wm->aio != NULL) { + nni_aio_set_msg(wm->aio, NULL); + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + if (ws->wclose) { + ws->wclose = false; + nni_aio_finish(ws->closeaio, 0, 0); + } + nni_mtx_unlock(&ws->mtx); + return; + } + + frame = ws->txframe; + wm = frame->wmsg; + aio = wm->aio; + + if ((rv = nni_aio_result(ws->txaio)) != 0) { + + ws_msg_fini(wm); + if (aio != NULL) { + nni_aio_finish_error(aio, rv); + } + + ws->closed = true; + nni_http_close(ws->http); + nni_mtx_unlock(&ws->mtx); + return; + } + + // good frame, was it the last + nni_list_remove(&wm->frames, frame); + ws_frame_fini(frame); + if (nni_list_empty(&wm->frames)) { + nni_list_remove(&ws->txmsgs, wm); + ws_msg_fini(wm); + if (aio != NULL) { + nni_aio_finish(aio, 0, 0); + } + } + + // Write the next frame. + ws_start_write(ws); + + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_write_cancel(nni_aio *aio, int rv) +{ + nni_ws * ws; + ws_msg * wm; + ws_frame *frame; + + // Is this aio active? We can tell by looking at the + // active tx frame. + wm = aio->a_prov_data; + ws = wm->ws; + nni_mtx_lock(&ws->mtx); + if (((frame = ws->txframe) != NULL) && (frame->wmsg == wm)) { + nni_aio_cancel(ws->txaio, rv); + // We will wait for callback on the txaio to finish aio. + } else if (nni_list_active(&ws->txmsgs, wm)) { + // If scheduled, just need to remove node and complete it. + nni_list_remove(&ws->txmsgs, wm); + wm->aio = NULL; + nni_aio_finish_error(aio, rv); + ws_msg_fini(wm); + } + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_send_close(nni_ws *ws, uint16_t code) +{ + ws_msg * wm; + uint8_t buf[sizeof(uint16_t)]; + int rv; + nni_aio *aio; + + NNI_PUT16(buf, code); + + if (ws->closed) { + return; + } + ws->closed = true; + aio = ws->closeaio; + + // We don't care about cancellation here. If this times out, + // we will still shut all the physical I/O down in the callback. + if (nni_aio_start(aio, ws_cancel_close, ws) != 0) { + return; + } + ws->wclose = true; + rv = ws_msg_init_control(&wm, ws, WS_CLOSE, buf, sizeof(buf)); + if (rv != 0) { + ws->wclose = false; + nni_aio_finish_error(aio, rv); + return; + } + // Close frames get priority! + nni_list_prepend(&ws->txmsgs, wm); + ws_start_write(ws); +} + +static void +ws_send_control(nni_ws *ws, uint8_t op, uint8_t *buf, size_t len) +{ + ws_msg *wm; + + // Note that we do not care if this works or not. So no AIO needed. + + nni_mtx_lock(&ws->mtx); + if ((ws->closed) || + (ws_msg_init_control(&wm, ws, op, buf, sizeof(buf)) != 0)) { + nni_mtx_unlock(&ws->mtx); + return; + } + + // Control frames at head of list. (Note that this may preempt + // the close frame or other ping/pong requests. Oh well.) + nni_list_prepend(&ws->txmsgs, wm); + ws_start_write(ws); + nni_mtx_unlock(&ws->mtx); +} + +void +nni_ws_send_msg(nni_ws *ws, nni_aio *aio) +{ + ws_msg * wm; + nni_msg *msg; + int rv; + + msg = nni_aio_get_msg(aio); + + if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) { + if (nni_aio_start(aio, NULL, NULL) == 0) { + nni_aio_finish_error(aio, rv); + } + return; + } + + nni_mtx_lock(&ws->mtx); + nni_aio_set_msg(aio, NULL); + + if (ws->closed) { + ws_msg_fini(wm); + if (nni_aio_start(aio, NULL, NULL) == 0) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ws->mtx); + return; + } + if (nni_aio_start(aio, ws_write_cancel, wm) != 0) { + nni_mtx_unlock(&ws->mtx); + ws_msg_fini(wm); + return; + } + nni_list_append(&ws->txmsgs, wm); + ws_start_write(ws); + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_start_read(nni_ws *ws) +{ + ws_frame *frame; + ws_msg * wm; + nni_aio * aio; + + if ((ws->rxframe != NULL) || ws->closed) { + return; // already reading or closed + } + + if ((wm = nni_list_first(&ws->rxmsgs)) == NULL) { + return; // no body expecting a message. + } + + if ((frame = NNI_ALLOC_STRUCT(frame)) == NULL) { + nni_list_remove(&ws->rxmsgs, wm); + if (wm->aio != NULL) { + nni_aio_finish_error(wm->aio, NNG_ENOMEM); + } + ws_msg_fini(wm); + // XXX: NOW WHAT? + return; + } + + // Note that the frame is *not* associated with the message + // as yet, because we don't know if that's right until we receive it. + frame->hlen = 0; + frame->len = 0; + ws->rxframe = frame; + + aio = ws->rxaio; + aio->a_niov = 1; + aio->a_iov[0].iov_len = 2; // We want the first two bytes. + aio->a_iov[0].iov_buf = frame->head; + nni_http_read_full(ws->http, aio); +} + +static void +ws_read_frame_cb(nni_ws *ws, ws_frame *frame) +{ + ws_msg *wm = nni_list_first(&ws->rxmsgs); + + switch (frame->op) { + case WS_CONT: + if (wm == NULL) { + ws_close(ws, WS_CLOSE_GOING_AWAY); + return; + } + if (nni_list_empty(&wm->frames)) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + return; + } + ws->rxframe = NULL; + nni_list_append(&wm->frames, frame); + break; + case WS_BINARY: + if (wm == NULL) { + ws_close(ws, WS_CLOSE_GOING_AWAY); + return; + } + if (!nni_list_empty(&wm->frames)) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + return; + } + ws->rxframe = NULL; + nni_list_append(&wm->frames, frame); + break; + case WS_TEXT: + // No support for text mode at present. + ws_close(ws, WS_CLOSE_UNSUPP_FORMAT); + return; + + case WS_PING: + if (frame->len > 125) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + return; + } + ws_send_control(ws, WS_PONG, frame->buf, frame->len); + ws->rxframe = NULL; + ws_frame_fini(frame); + break; + case WS_PONG: + if (frame->len > 125) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + return; + } + ws->rxframe = NULL; + ws_frame_fini(frame); + break; + case WS_CLOSE: + ws->closed = true; // no need to send close reply + ws_close(ws, 0); + return; + default: + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + return; + } + + // If this was the last (final) frame, then complete it. But + // we have to look at the msg, since we might have got a + // control frame. + if (((frame = nni_list_last(&wm->frames)) != NULL) && frame->final) { + size_t len = 0; + nni_msg *msg; + uint8_t *body; + int rv; + + nni_list_remove(&ws->rxmsgs, wm); + NNI_LIST_FOREACH (&wm->frames, frame) { + len += frame->len; + } + if ((rv = nni_msg_alloc(&msg, len)) != 0) { + nni_aio_finish_error(wm->aio, rv); + ws_msg_fini(wm); + ws_close(ws, WS_CLOSE_INTERNAL); + return; + } + body = nni_msg_body(msg); + NNI_LIST_FOREACH (&wm->frames, frame) { + memcpy(body, frame->buf, frame->len); + body += frame->len; + } + nni_aio_finish_msg(wm->aio, msg); + wm->aio = NULL; + ws_msg_fini(wm); + } +} + +static void +ws_read_cb(void *arg) +{ + nni_ws * ws = arg; + nni_aio * aio = ws->rxaio; + ws_frame *frame; + int rv; + + nni_mtx_lock(&ws->mtx); + if ((frame = ws->rxframe) == NULL) { + nni_mtx_unlock(&ws->mtx); // canceled during close + return; + } + + if ((rv = nni_aio_result(aio)) != 0) { + ws->closed = true; // do not send a close frame + ws_close(ws, 0); + nni_mtx_unlock(&ws->mtx); + return; + } + + if (frame->hlen == 0) { + frame->hlen = 2; + frame->op = frame->head[0] & 0x7f; + frame->final = (frame->head[0] & 0x80) ? 1 : 0; + frame->masked = (frame->head[1] & 0x80) ? 1 : 0; + if (frame->masked) { + frame->hlen += 4; + } + if ((frame->head[1] & 0x7F) == 127) { + frame->hlen += 8; + } else if ((frame->head[1] & 0x7F) == 126) { + frame->hlen += 2; + } + + // If we didn't read the full header yet, then read + // the rest of it. + if (frame->hlen != 2) { + aio->a_niov = 1; + aio->a_iov[0].iov_buf = frame->head + 2; + aio->a_iov[0].iov_len = frame->hlen - 2; + nni_http_read_full(ws->http, aio); + nni_mtx_unlock(&ws->mtx); + return; + } + } + + // If we are returning from a read of additional data, then + // the buf will be set. Otherwise we need to determine + // how much data to read. As our headers are complete, we take + // this time to do some protocol checks -- no point in waiting + // to read data. (Frame size check needs to be done first + // anyway to prevent DoS.) + + if (frame->buf == NULL) { + + // Determine expected frame size. + switch ((frame->len = (frame->head[1] & 0x7F))) { + case 127: + NNI_GET64(frame->head + 2, frame->len); + if (frame->len < 65536) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + nni_mtx_unlock(&ws->mtx); + return; + } + break; + case 126: + NNI_GET16(frame->head + 2, frame->len); + if (frame->len < 126) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + nni_mtx_unlock(&ws->mtx); + return; + } + + break; + } + + if (frame->len > ws->maxframe) { + ws_close(ws, WS_CLOSE_TOO_BIG); + nni_mtx_unlock(&ws->mtx); + return; + } + + // Check for masking. (We don't actually do the unmask + // here, because we don't have data yet.) + if (frame->masked) { + memcpy(frame->mask, frame->head + frame->hlen - 4, 4); + if (ws->mode == NNI_EP_MODE_DIAL) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + nni_mtx_unlock(&ws->mtx); + return; + } + } else if (ws->mode == NNI_EP_MODE_LISTEN) { + ws_close(ws, WS_CLOSE_PROTOCOL_ERR); + nni_mtx_unlock(&ws->mtx); + return; + } + + // If we expected data, then ask for it. + if (frame->len != 0) { + + // Short frames can avoid an alloc + if (frame->len < 126) { + frame->buf = frame->sdata; + frame->bufsz = 0; + } else { + frame->buf = nni_alloc(frame->len); + if (frame->buf == NULL) { + ws_close(ws, WS_CLOSE_INTERNAL); + nni_mtx_unlock(&ws->mtx); + return; + } + frame->bufsz = frame->len; + } + + aio->a_niov = 1; + aio->a_iov[0].iov_buf = frame->buf; + aio->a_iov[0].iov_len = frame->len; + nni_http_read_full(ws->http, aio); + nni_mtx_unlock(&ws->mtx); + return; + } + } + + // At this point, we have a complete frame. + ws_unmask_frame(frame); // idempotent + + ws_read_frame_cb(ws, frame); + ws_start_read(ws); + nni_mtx_unlock(&ws->mtx); +} + +static void +ws_read_cancel(nni_aio *aio, int rv) +{ + ws_msg *wm = aio->a_prov_data; + nni_ws *ws = wm->ws; + + nni_mtx_lock(&ws->mtx); + if (wm == nni_list_first(&ws->rxmsgs)) { + // Cancellation will percolate back up. + nni_aio_cancel(ws->rxaio, rv); + } else if (nni_list_active(&ws->rxmsgs, wm)) { + nni_list_remove(&ws->rxmsgs, wm); + ws_msg_fini(wm); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ws->mtx); +} + +void +nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) +{ + ws_msg *wm; + int rv; + nni_mtx_lock(&ws->mtx); + if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) { + if (nni_aio_start(aio, NULL, NULL)) { + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ws->mtx); + return; + } + if (nni_aio_start(aio, ws_read_cancel, wm) == 0) { + nni_list_append(&ws->rxmsgs, wm); + ws_start_read(ws); + } + nni_mtx_unlock(&ws->mtx); +} + +void +nni_ws_close_error(nni_ws *ws, uint16_t code) +{ + nni_mtx_lock(&ws->mtx); + ws_close(ws, code); + nni_mtx_unlock(&ws->mtx); +} + +void +nni_ws_close(nni_ws *ws) +{ + nni_ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); +} + +nni_http_res * +nni_ws_response(nni_ws *ws) +{ + return (ws->res); +} + +nni_http_req * +nni_ws_request(nni_ws *ws) +{ + return (ws->req); +} + +static void +ws_fini(void *arg) +{ + nni_ws *ws = arg; + ws_msg *wm; + + nni_ws_close(ws); + + // Give a chance for the close frame to drain. + if (ws->closeaio) { + nni_aio_wait(ws->closeaio); + } + + nni_aio_stop(ws->rxaio); + nni_aio_stop(ws->txaio); + nni_aio_stop(ws->closeaio); + nni_aio_stop(ws->httpaio); + + nni_mtx_lock(&ws->mtx); + while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { + nni_list_remove(&ws->rxmsgs, wm); + if (wm->aio) { + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + + while ((wm = nni_list_first(&ws->txmsgs)) != NULL) { + nni_list_remove(&ws->txmsgs, wm); + if (wm->aio) { + nni_aio_finish_error(wm->aio, NNG_ECLOSED); + } + ws_msg_fini(wm); + } + + if (ws->rxframe) { + ws_frame_fini(ws->rxframe); + } + nni_mtx_unlock(&ws->mtx); + + if (ws->req) { + nni_http_req_fini(ws->req); + } + if (ws->res) { + nni_http_res_fini(ws->res); + } + + nni_http_fini(ws->http); + nni_aio_fini(ws->rxaio); + nni_aio_fini(ws->txaio); + nni_aio_fini(ws->closeaio); + nni_aio_fini(ws->httpaio); + nni_mtx_fini(&ws->mtx); + NNI_FREE_STRUCT(ws); +} + +void +nni_ws_fini(nni_ws *ws) +{ + nni_reap(&ws->reap, ws_fini, ws); +} + +static void +ws_http_cb_listener(nni_ws *ws, nni_aio *aio) +{ + // This is only + nni_ws_listener *l; + l = nni_aio_get_data(aio, 0); + + nni_mtx_lock(&l->mtx); + nni_list_remove(&l->reply, ws); + if (nni_aio_result(aio) != 0) { + nni_ws_fini(ws); + nni_mtx_unlock(&l->mtx); + return; + } + ws->ready = true; + if ((aio = nni_list_first(&l->aios)) != NULL) { + nni_list_remove(&l->aios, aio); + nni_aio_finish_pipe(aio, ws); + } else { + nni_list_append(&l->pend, ws); + } + nni_mtx_unlock(&l->mtx); +} + +static void +ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) +{ + nni_ws_dialer *d; + nni_aio * uaio; + int rv; + uint16_t status; + char wskey[29]; + const char * ptr; + + d = nni_aio_get_data(aio, 0); + + nni_mtx_lock(&d->mtx); + uaio = nni_list_first(&d->httpaios); + NNI_ASSERT(uaio != NULL); + // We have two steps. In step 1, we just sent the request, + // and need to retrieve the reply. In step two we have + // received the reply, and need to validate it. + if ((rv = nni_aio_result(aio)) != 0) { + goto err; + } + + // If we have no response structure, then this was completion of the + // send of the request. Prepare an empty response, and read it. + if (ws->res == NULL) { + if ((rv = nni_http_res_init(&ws->res)) != 0) { + goto err; + } + nni_http_read_res(ws->http, ws->res, ws->httpaio); + nni_mtx_unlock(&d->mtx); + return; + } + + status = nni_http_res_get_status(ws->res); + switch (status) { + case NNI_HTTP_STATUS_SWITCHING: + break; + case NNI_HTTP_STATUS_FORBIDDEN: + case NNI_HTTP_STATUS_UNAUTHORIZED: + rv = NNG_EPERM; + goto err; + case NNI_HTTP_STATUS_NOT_FOUND: + case NNI_HTTP_STATUS_METHOD_NOT_ALLOWED: + rv = NNG_ECONNREFUSED; // Treat these as refusals. + goto err; + case NNI_HTTP_STATUS_BAD_REQUEST: + default: + // Perhaps we should use NNG_ETRANERR... + rv = NNG_EPROTO; + goto err; + } + + // Check that the server gave us back the right key. + rv = ws_make_accept( + nni_http_req_get_header(ws->req, "Sec-WebSocket-Key"), wskey); + if (rv != 0) { + goto err; + } + +#define GETH(h) nni_http_res_get_header(ws->res, h) + + if (((ptr = GETH("Sec-WebSocket-Accept")) == NULL) || + (strcmp(ptr, wskey) != 0) || + ((ptr = GETH("Connection")) == NULL) || + (!ws_contains_word(ptr, "upgrade")) || + ((ptr = GETH("Upgrade")) == NULL) || + (strcmp(ptr, "websocket") != 0)) { + nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + rv = NNG_EPROTO; + goto err; + } + if (d->proto != NULL) { + if (((ptr = GETH("Sec-WebSocket-Protocol")) == NULL) || + (!ws_contains_word(d->proto, ptr))) { + nni_ws_close_error(ws, WS_CLOSE_PROTOCOL_ERR); + rv = NNG_EPROTO; + goto err; + } + } +#undef GETH + + // At this point, we are in business! + ws->ready = true; + nni_aio_list_remove(uaio); + nni_aio_finish_pipe(uaio, ws); + nni_mtx_unlock(&d->mtx); + return; +err: + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + nni_ws_fini(ws); + nni_mtx_unlock(&d->mtx); +} + +static void +ws_http_cb(void *arg) +{ + // This is only done on the server/listener side. + nni_ws * ws = arg; + nni_aio *aio = ws->httpaio; + + switch (ws->mode) { + case NNI_EP_MODE_LISTEN: + ws_http_cb_listener(ws, aio); + break; + case NNI_EP_MODE_DIAL: + ws_http_cb_dialer(ws, aio); + break; + } +} + +static int +ws_init(nni_ws **wsp, nni_http *http, nni_http_req *req, nni_http_res *res) +{ + nni_ws *ws; + int rv; + + if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ws->mtx); + NNI_LIST_INIT(&ws->rxmsgs, ws_msg, node); + NNI_LIST_INIT(&ws->txmsgs, ws_msg, node); + + if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) || + ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) || + ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) || + ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0)) { + nni_ws_fini(ws); + return (rv); + } + + nni_aio_set_timeout(ws->closeaio, 100); + nni_aio_set_timeout(ws->httpaio, 1000); + + ws->fragsize = 1 << 20; // we won't send a frame larger than this + ws->maxframe = (1 << 20) * 10; // default limit on incoming frame size + ws->http = http; + ws->req = req; + ws->res = res; + *wsp = ws; + return (0); +} + +void +nni_ws_listener_fini(nni_ws_listener *l) +{ + nni_mtx_fini(&l->mtx); + nni_strfree(l->url); + nni_strfree(l->proto); + nni_strfree(l->host); + nni_strfree(l->serv); + nni_strfree(l->path); + NNI_FREE_STRUCT(l); +} + +static void +ws_handler(nni_aio *aio) +{ + nni_ws_listener *l; + nni_ws * ws; + nni_http * http; + nni_http_req * req; + nni_http_res * res; + const char * ptr; + const char * proto; + uint16_t status; + int rv; + char key[29]; + + http = nni_aio_get_input(aio, 0); + req = nni_aio_get_input(aio, 1); + l = nni_aio_get_input(aio, 2); + + // Now check the headers, etc. + if (strcmp(nni_http_req_get_version(req), "HTTP/1.1") != 0) { + status = NNI_HTTP_STATUS_HTTP_VERSION_NOT_SUPP; + goto err; + } + + if (strcmp(nni_http_req_get_method(req), "GET") != 0) { + // HEAD request. We can't really deal with it. + status = NNI_HTTP_STATUS_BAD_REQUEST; + goto err; + } + +#define GETH(h) nni_http_req_get_header(req, h) +#define SETH(h, v) nni_http_res_set_header(res, h, v) + + if ((((ptr = GETH("Content-Length")) != NULL) && (atoi(ptr) > 0)) || + (((ptr = GETH("Transfer-Encoding")) != NULL) && + (nni_strcasestr(ptr, "chunked") != NULL))) { + status = NNI_HTTP_STATUS_PAYLOAD_TOO_LARGE; + goto err; + } + + // These headers have to be present. + if (((ptr = GETH("Upgrade")) == NULL) || + (!ws_contains_word(ptr, "websocket")) || + ((ptr = GETH("Connection")) == NULL) || + (!ws_contains_word(ptr, "upgrade")) || + ((ptr = GETH("Sec-WebSocket-Version")) == NULL) || + (strcmp(ptr, "13") != 0)) { + status = NNI_HTTP_STATUS_BAD_REQUEST; + goto err; + } + + if (((ptr = GETH("Sec-WebSocket-Key")) == NULL) || + (ws_make_accept(ptr, key) != 0)) { + status = NNI_HTTP_STATUS_BAD_REQUEST; + goto err; + } + + // If the client has requested a specific subprotocol, then + // we need to try to match it to what the handler says we support. + // (If no suitable option is found in the handler, we fail the + // request.) + proto = GETH("Sec-WebSocket-Protocol"); + if (proto == NULL) { + if (l->proto != NULL) { + status = NNI_HTTP_STATUS_BAD_REQUEST; + goto err; + } + } else if ((l->proto == NULL) || + (!ws_contains_word(l->proto, proto))) { + status = NNI_HTTP_STATUS_BAD_REQUEST; + goto err; + } + + if ((rv = nni_http_res_init(&res)) != 0) { + // Give a chance to reply to client. + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + goto err; + } + + if (nni_http_res_set_status( + res, NNI_HTTP_STATUS_SWITCHING, "Switching Protocols") != 0) { + nni_http_res_fini(res); + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + goto err; + } + + if ((SETH("Connection", "Upgrade") != 0) || + (SETH("Upgrade", "websocket") != 0) || + (SETH("Sec-WebSocket-Accept", key) != 0)) { + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_fini(res); + goto err; + } + if ((proto != NULL) && (SETH("Sec-WebSocket-Protocol", proto) != 0)) { + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + nni_http_res_fini(res); + goto err; + } + + if (l->hookfn != NULL) { + rv = l->hookfn(l->hookarg, req, res); + if (rv != 0) { + nni_http_res_fini(res); + nni_aio_finish_error(aio, rv); + return; + } + + if (nni_http_res_get_status(res) != + NNI_HTTP_STATUS_SWITCHING) { + // The hook has decided to give back a different + // reply and we are not upgrading anymore. For + // example the Origin might not be permitted, or + // another level of authentication may be required. + // (Note that the hook can also give back various + // other headers, but it would be bad for it to + // alter the websocket mandated headers.) + nni_http_req_fini(req); + nni_aio_set_output(aio, 0, res); + nni_aio_finish(aio, 0, 0); + return; + } + } + +#undef GETH +#undef SETH + + // We are good to go, provided we can get the websocket struct, + // and send the reply. + if ((rv = ws_init(&ws, http, req, res)) != 0) { + nni_http_res_fini(res); + status = NNI_HTTP_STATUS_INTERNAL_SERVER_ERROR; + goto err; + } + ws->mode = NNI_EP_MODE_LISTEN; + + // XXX: Inherit fragmentation and message size limits! + + nni_list_append(&l->reply, ws); + nni_aio_set_data(ws->httpaio, 0, l); + nni_http_write_res(http, res, ws->httpaio); + nni_aio_set_output(aio, 0, NULL); + nni_aio_set_input(aio, 1, NULL); + nni_aio_finish(aio, 0, 0); + return; + +err: + nni_http_req_fini(req); + if ((rv = nni_http_res_init_error(&res, status)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_set_output(aio, 0, res); + nni_aio_finish(aio, 0, 0); + } +} +static int +ws_parse_url(const char *url, char **schemep, char **hostp, char **servp, + char **pathp, char **queryp) +{ + size_t scrlen; + char * scr; + char * pair; + char * scheme = NULL; + char * path = NULL; + char * query = NULL; + char * host = NULL; + char * serv = NULL; + int rv; + + // We need a scratch copy of the url to parse. + scrlen = strlen(url) + 1; + if ((scr = nni_alloc(scrlen)) == NULL) { + return (NNG_ENOMEM); + } + nni_strlcpy(scr, url, scrlen); + scheme = scr; + pair = strchr(scr, ':'); + if ((pair == NULL) || (pair[1] != '/') || (pair[2] != '/')) { + nni_free(scr, scrlen); + return (NNG_EADDRINVAL); + } + + *pair = '\0'; + pair += 3; + + path = strchr(pair, '/'); + if (path != NULL) { + *path = '\0'; // We will restore it shortly. + } + if ((rv = nni_tran_parse_host_port(pair, hostp, servp)) != 0) { + nni_free(scr, scrlen); + return (rv); + } + + // If service was missing, assume normal defaults. + if (*servp == NULL) { + if (strcmp(scheme, "wss")) { + *servp = nni_strdup("443"); + } else { + *servp = nni_strdup("80"); + } + } + + if (path) { + // Restore the path, and trim off the query parameter. + *path = '/'; + if ((query = strchr(path, '?')) != NULL) { + *query = '\0'; + query++; + } else { + query = ""; + } + } else { + path = "/"; + query = ""; + } + + if (schemep) { + *schemep = nni_strdup(scheme); + } + if (pathp) { + *pathp = nni_strdup(path); + } + if (queryp) { + *queryp = nni_strdup(query); + } + nni_free(scr, scrlen); + + if ((schemep && (*schemep == NULL)) || (*pathp == NULL) || + (*servp == NULL) || (queryp && (*queryp == NULL))) { + nni_strfree(*hostp); + nni_strfree(*servp); + nni_strfree(*pathp); + if (schemep) { + nni_strfree(*schemep); + } + if (queryp) { + nni_strfree(*queryp); + } + return (NNG_ENOMEM); + } + + return (0); +} + +int +nni_ws_listener_init(nni_ws_listener **wslp, const char *url) +{ + nni_ws_listener *l; + int rv; + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&l->mtx); + nni_aio_list_init(&l->aios); + + NNI_LIST_INIT(&l->pend, nni_ws, node); + NNI_LIST_INIT(&l->reply, nni_ws, node); + + rv = ws_parse_url(url, NULL, &l->host, &l->serv, &l->path, NULL); + if (rv != 0) { + nni_ws_listener_fini(l); + return (rv); + } + l->handler.h_is_dir = false; + l->handler.h_is_upgrader = true; + l->handler.h_method = "GET"; + l->handler.h_path = l->path; + l->handler.h_host = l->host; + l->handler.h_cb = ws_handler; + + *wslp = l; + return (0); +} + +int +nni_ws_listener_proto(nni_ws_listener *l, const char *proto) +{ + int rv = 0; + char *ns; + nni_mtx_lock(&l->mtx); + if (l->started) { + rv = NNG_EBUSY; + } else if ((ns = nni_strdup(proto)) == NULL) { + rv = NNG_ENOMEM; + } else { + if (l->proto != NULL) { + nni_strfree(l->proto); + } + l->proto = ns; + } + nni_mtx_unlock(&l->mtx); + return (rv); +} + +static void +ws_accept_cancel(nni_aio *aio, int rv) +{ + nni_ws_listener *l = aio->a_prov_data; + + nni_mtx_lock(&l->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) +{ + nni_ws *ws; + + nni_mtx_lock(&l->mtx); + if (nni_aio_start(aio, ws_accept_cancel, l) != 0) { + nni_mtx_unlock(&l->mtx); + return; + } + if (l->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&l->mtx); + return; + } + if (!l->started) { + nni_aio_finish_error(aio, NNG_ESTATE); + nni_mtx_unlock(&l->mtx); + return; + } + if ((ws = nni_list_first(&l->pend)) != NULL) { + nni_list_remove(&l->pend, ws); + nni_aio_finish_pipe(aio, ws); + } else { + nni_list_append(&l->aios, aio); + } + nni_mtx_unlock(&l->mtx); +} + +void +nni_ws_listener_close(nni_ws_listener *l) +{ + nni_aio *aio; + nni_ws * ws; + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return; + } + l->closed = true; + if (l->server != NULL) { + nni_http_server_del_handler(l->server, l->hp); + nni_http_server_fini(l->server); + l->server = NULL; + } + NNI_LIST_FOREACH (&l->pend, ws) { + nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + } + NNI_LIST_FOREACH (&l->reply, ws) { + nni_ws_close_error(ws, WS_CLOSE_GOING_AWAY); + } + nni_mtx_unlock(&l->mtx); +} + +int +nni_ws_listener_listen(nni_ws_listener *l) +{ + nng_sockaddr sa; + nni_aio * aio; + int rv; + + nni_mtx_lock(&l->mtx); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return (NNG_ECLOSED); + } + if (l->started) { + nni_mtx_unlock(&l->mtx); + return (NNG_ESTATE); + } + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + nni_mtx_unlock(&l->mtx); + return (rv); + } + aio->a_addr = &sa; + nni_plat_tcp_resolv(l->host, l->serv, NNG_AF_UNSPEC, true, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + return (rv); + } + + if ((rv = nni_http_server_init(&l->server, &sa)) != 0) { + nni_mtx_unlock(&l->mtx); + return (rv); + } + + rv = nni_http_server_add_handler(&l->hp, l->server, &l->handler, l); + if (rv != 0) { + nni_http_server_fini(l->server); + l->server = NULL; + nni_mtx_unlock(&l->mtx); + return (rv); + } + + // XXX: DEAL WITH HTTPS here. + + if ((rv = nni_http_server_start(l->server)) != 0) { + nni_http_server_del_handler(l->server, l->hp); + nni_http_server_fini(l->server); + l->server = NULL; + } + + l->started = true; + + nni_mtx_unlock(&l->mtx); + return (0); +} + +void +nni_ws_listener_hook( + nni_ws_listener *l, nni_ws_listen_hook hookfn, void *hookarg) +{ + nni_mtx_lock(&l->mtx); + l->hookfn = hookfn; + l->hookarg = hookarg; + nni_mtx_unlock(&l->mtx); +} + +void +nni_ws_listener_tls(nni_ws_listener *l, nni_tls_config *tls) +{ + // We need to add this later. +} + +void +ws_conn_cb(void *arg) +{ + nni_ws_dialer *d = arg; + nni_aio * aio = d->conaio; + nni_aio * uaio; + nni_http * http; + nni_http_req * req = NULL; + int rv; + uint8_t raw[16]; + char wskey[25]; + nni_ws * ws; + + nni_mtx_lock(&d->mtx); + uaio = nni_list_first(&d->conaios); + rv = nni_aio_result(aio); + http = rv == 0 ? nni_aio_get_output(aio, 0) : NULL; + + if (uaio == NULL) { + if (http) { + // Nobody listening anymore - hard abort. + nni_http_fini(http); + } + nni_mtx_unlock(&d->mtx); + return; + } + + nni_aio_list_remove(uaio); + nni_aio_set_output(aio, 0, NULL); + + // We are done with this aio, start another connection request while + // we finish up, if we have more clients waiting. + if (!nni_list_empty(&d->conaios)) { + nni_http_client_connect(d->client, aio); + } + + if (rv != 0) { + goto err; + } + + for (int i = 0; i < 16; i++) { + raw[i] = nni_random(); + } + nni_base64_encode(raw, 16, wskey, 24); + wskey[24] = '\0'; + + if (d->qinfo && d->qinfo[0] != '\0') { + rv = nni_asprintf(&d->uri, "%s?%s", d->path, d->qinfo); + } else if ((d->uri = nni_strdup(d->path)) == NULL) { + rv = NNG_ENOMEM; + } + +#define SETH(h, v) nni_http_req_set_header(req, h, v) + if ((rv != 0) || ((rv = nni_http_req_init(&req)) != 0) || + ((rv = nni_http_req_set_uri(req, d->uri)) != 0) || + ((rv = nni_http_req_set_version(req, "HTTP/1.1")) != 0) || + ((rv = nni_http_req_set_method(req, "GET")) != 0) || + ((rv = SETH("Host", d->host)) != 0) || + ((rv = SETH("Upgrade", "websocket")) != 0) || + ((rv = SETH("Connection", "Upgrade")) != 0) || + ((rv = SETH("Sec-WebSocket-Key", wskey)) != 0) || + ((rv = SETH("Sec-WebSocket-Version", "13")) != 0)) { + goto err; + } + + // If consumer asked for protocol, pass it on. + if ((d->proto != NULL) && + ((rv = SETH("Sec-WebSocket-Protocol", d->proto)) != 0)) { + goto err; + } +#undef SETH + + if ((rv = ws_init(&ws, http, req, NULL)) != 0) { + goto err; + } + ws->mode = NNI_EP_MODE_DIAL; + + // Move this uaio to the http wait list. Note that it is not + // required that the uaio will be completed by this connection. + // If another connection attempt completes first, then the first + // aio queued will get the result. + nni_list_append(&d->httpaios, uaio); + nni_aio_set_data(ws->httpaio, 0, d); + nni_http_write_req(http, req, ws->httpaio); + nni_mtx_unlock(&d->mtx); + return; + +err: + nni_aio_finish_error(uaio, rv); + if (http != NULL) { + nni_http_fini(http); + } + if (req != NULL) { + nni_http_req_fini(req); + } + nni_mtx_unlock(&d->mtx); +} + +void +nni_ws_dialer_fini(nni_ws_dialer *d) +{ + nni_aio_fini(d->conaio); + nni_strfree(d->proto); + nni_strfree(d->addr); + nni_strfree(d->uri); + nni_strfree(d->host); + nni_strfree(d->serv); + nni_strfree(d->path); + nni_strfree(d->qinfo); + if (d->client) { + nni_http_client_fini(d->client); + } + nni_mtx_fini(&d->mtx); + NNI_FREE_STRUCT(d); +} + +int +nni_ws_dialer_init(nni_ws_dialer **dp, const char *url) +{ + nni_ws_dialer *d; + int rv; + nni_aio * aio; + + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->conaios); + nni_aio_list_init(&d->httpaios); + + if ((d->addr = nni_strdup(url)) == NULL) { + nni_ws_dialer_fini(d); + return (NNG_ENOMEM); + } + if ((rv = ws_parse_url( + url, NULL, &d->host, &d->serv, &d->path, &d->qinfo)) != 0) { + nni_ws_dialer_fini(d); + return (rv); + } + if ((rv = nni_aio_init(&d->conaio, ws_conn_cb, d)) != 0) { + nni_ws_dialer_fini(d); + return (rv); + } + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + nni_ws_dialer_fini(d); + return (rv); + } + // XXX: this is synchronous. We should fix this in the HTTP layer. + aio->a_addr = &d->sa; + nni_plat_tcp_resolv(d->host, d->serv, NNG_AF_UNSPEC, false, aio); + nni_aio_wait(aio); + rv = nni_aio_result(aio); + nni_aio_fini(aio); + if (rv != 0) { + nni_ws_dialer_fini(d); + return (rv); + } + + if ((rv = nni_http_client_init(&d->client, &d->sa)) != 0) { + nni_ws_dialer_fini(d); + return (rv); + } + + *dp = d; + return (0); +} + +void +nni_ws_dialer_close(nni_ws_dialer *d) +{ + // XXX: what to do here? + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + return; + } + d->closed = true; + nni_mtx_unlock(&d->mtx); + nni_aio_cancel(d->conaio, NNG_ECLOSED); +} + +int +nni_ws_dialer_proto(nni_ws_dialer *d, const char *proto) +{ + int rv = 0; + char *ns; + nni_mtx_lock(&d->mtx); + if ((ns = nni_strdup(proto)) == NULL) { + rv = NNG_ENOMEM; + } else { + if (d->proto != NULL) { + nni_strfree(d->proto); + } + d->proto = ns; + } + nni_mtx_unlock(&d->mtx); + return (rv); +} + +static void +ws_dial_cancel(nni_aio *aio, int rv) +{ + nni_ws_dialer *d = aio->a_prov_data; + nni_mtx_lock(&d->mtx); + // If we are waiting, then we can cancel. Otherwise we need + // to abort. + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + // This does not cancel in-flight client negotiations with HTTP. + nni_mtx_unlock(&d->mtx); +} + +void +nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) +{ + nni_mtx_lock(&d->mtx); + // First look up the host. + if (nni_aio_start(aio, ws_dial_cancel, d) != 0) { + nni_mtx_unlock(&d->mtx); + return; + } + if (d->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_mtx_unlock(&d->mtx); + return; + } + nni_list_append(&d->conaios, aio); + + if (!d->started) { + d->started = true; + nni_http_client_connect(d->client, d->conaio); + } + nni_mtx_unlock(&d->mtx); +} + +extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *); + +// Dialer does not get a hook chance, as it can examine the request +// and reply after dial is done; this is not a 3-way handshake, so +// the dialer does not confirm the server's response at the HTTP +// level. (It can still issue a websocket close). + +// The implementation will send periodic PINGs, and respond with +// PONGs. diff --git a/src/supplemental/websocket/websocket.h b/src/supplemental/websocket/websocket.h new file mode 100644 index 00000000..25add55e --- /dev/null +++ b/src/supplemental/websocket/websocket.h @@ -0,0 +1,63 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H +#define NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H + +// Pre-defined types for some prototypes. These are from other subsystems. +typedef struct nni_tls_config nni_tls_config; +typedef struct nni_http_req nni_http_req; +typedef struct nni_http_res nni_http_res; + +typedef struct nni_ws nni_ws; +typedef struct nni_ws_listener nni_ws_listener; +typedef struct nni_ws_dialer nni_ws_dialer; + +typedef int (*nni_ws_listen_hook)(void *, nni_http_req *, nni_http_res *); + +// Specify URL as ws://[<host>][:port][/path] +// If host is missing, INADDR_ANY is assumed. If port is missing, +// then either 80 or 443 are assumed. Note that ws:// means listen +// on INADDR_ANY port 80, with path "/". For connect side, INADDR_ANY +// makes no sense. (TBD: return NNG_EADDRINVAL, or try loopback?) + +extern int nni_ws_listener_init(nni_ws_listener **, const char *); +extern void nni_ws_listener_fini(nni_ws_listener *); +extern void nni_ws_listener_close(nni_ws_listener *); +extern int nni_ws_listener_proto(nni_ws_listener *, const char *); +extern int nni_ws_listener_listen(nni_ws_listener *); +extern void nni_ws_listener_accept(nni_ws_listener *, nni_aio *); +extern void nni_ws_listener_hook( + nni_ws_listener *, nni_ws_listen_hook, void *); +extern void nni_ws_listener_tls(nni_ws_listener *, nni_tls_config *); + +extern int nni_ws_dialer_init(nni_ws_dialer **, const char *); +extern void nni_ws_dialer_fini(nni_ws_dialer *); +extern void nni_ws_dialer_close(nni_ws_dialer *); +extern int nni_ws_dialer_proto(nni_ws_dialer *, const char *); +extern int nni_ws_dialer_header(nni_ws_dialer *, const char *, const char *); +extern void nni_ws_dialer_dial(nni_ws_dialer *, nni_aio *); + +// Dialer does not get a hook chance, as it can examine the request and reply +// after dial is done; this is not a 3-way handshake, so the dialer does +// not confirm the server's response at the HTTP level. (It can still issue +// a websocket close). + +extern void nni_ws_send_msg(nni_ws *, nni_aio *); +extern void nni_ws_recv_msg(nni_ws *, nni_aio *); +extern nni_http_res *nni_ws_response(nni_ws *); +extern nni_http_req *nni_ws_request(nni_ws *); +extern void nni_ws_close(nni_ws *); +extern void nni_ws_close_error(nni_ws *, uint16_t); +extern void nni_ws_fini(nni_ws *); + +// The implementation will send periodic PINGs, and respond with PONGs. + +#endif // NNG_SUPPLEMENTAL_WEBSOCKET_WEBSOCKET_H
\ No newline at end of file diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 8dcb3f60..9c794e64 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -10,7 +10,6 @@ #include <stdbool.h> #include <stdio.h> -#include <stdlib.h> #include <string.h> #include "core/nng_impl.h" @@ -498,46 +497,6 @@ nni_tls_pipe_getopt_remaddr(void *arg, void *v, size_t *szp) return (rv); } -static int -nni_tls_parse_pair(char *pair, char **hostp, char **servp) -{ - char *host, *serv, *end; - - if (pair[0] == '[') { - host = pair + 1; - // IP address enclosed ... for IPv6 usually. - if ((end = strchr(host, ']')) == NULL) { - return (NNG_EADDRINVAL); - } - *end = '\0'; - serv = end + 1; - if (*serv == ':') { - serv++; - } else if (*serv != '\0') { - return (NNG_EADDRINVAL); - } - } else { - host = pair; - serv = strchr(host, ':'); - if (serv != NULL) { - *serv = '\0'; - serv++; - } - } - if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { - *hostp = NULL; - } else { - *hostp = host; - } - if ((serv == NULL) || (strlen(serv) == 0)) { - *servp = NULL; - } else { - *servp = serv; - } - // Stash the port in big endian (network) byte order. - return (0); -} - // Note that the url *must* be in a modifiable buffer. int nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost, @@ -555,8 +514,9 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost, // is the second part. *h1 = '\0'; h1++; - if (((rv = nni_tls_parse_pair(h1, rhost, rserv)) != 0) || - ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0)) { + if (((rv = nni_tran_parse_host_port(h1, rhost, rserv)) != 0) || + ((rv = nni_tran_parse_host_port(url, lhost, lserv)) != + 0)) { return (rv); } if ((*rserv == NULL) || (*rhost == NULL)) { @@ -566,7 +526,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost, } else if (mode == NNI_EP_MODE_DIAL) { *lhost = NULL; *lserv = NULL; - if ((rv = nni_tls_parse_pair(url, rhost, rserv)) != 0) { + if ((rv = nni_tran_parse_host_port(url, rhost, rserv)) != 0) { return (rv); } if ((*rserv == NULL) || (*rhost == NULL)) { @@ -577,7 +537,7 @@ nni_tls_parse_url(char *url, char **lhost, char **lserv, char **rhost, NNI_ASSERT(mode == NNI_EP_MODE_LISTEN); *rhost = NULL; *rserv = NULL; - if ((rv = nni_tls_parse_pair(url, lhost, lserv)) != 0) { + if ((rv = nni_tran_parse_host_port(url, lhost, lserv)) != 0) { return (rv); } // We have to have a port to listen on! @@ -657,9 +617,13 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) return (NNG_EADDRINVAL); } + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } // Parse the URLs first. rv = nni_tls_parse_url(buf, &lhost, &lserv, &rhost, &rserv, mode); if (rv != 0) { + nni_aio_fini(aio); return (rv); } if (mode == NNI_EP_MODE_DIAL) { @@ -672,10 +636,6 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) authmode = NNI_TLS_CONFIG_AUTH_MODE_NONE; } - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - // XXX: arguably we could defer this part to the point we do a bind // or connect! @@ -683,7 +643,11 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) aio->a_addr = &rsa; nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio); nni_aio_wait(aio); + nni_strfree(rserv); if ((rv = nni_aio_result(aio)) != 0) { + nni_strfree(rhost); + nni_strfree(lhost); + nni_strfree(lserv); nni_aio_fini(aio); return (rv); } @@ -695,7 +659,10 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) aio->a_addr = &lsa; nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio); nni_aio_wait(aio); + nni_strfree(lhost); + nni_strfree(lserv); if ((rv = nni_aio_result(aio)) != 0) { + nni_strfree(rhost); nni_aio_fini(aio); return (rv); } @@ -705,12 +672,14 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) nni_aio_fini(aio); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + nni_strfree(rhost); return (NNG_ENOMEM); } nni_mtx_init(&ep->mtx); if (nni_strlcpy(ep->addr, url, sizeof(ep->addr)) >= sizeof(ep->addr)) { NNI_FREE_STRUCT(ep); + nni_strfree(rhost); return (NNG_EADDRINVAL); } @@ -718,15 +687,18 @@ nni_tls_ep_init(void **epp, const char *url, nni_sock *sock, int mode) ((rv = nni_tls_config_init(&ep->cfg, tlsmode)) != 0) || ((rv = nni_tls_config_auth_mode(ep->cfg, authmode)) != 0) || ((rv = nni_aio_init(&ep->aio, nni_tls_ep_cb, ep)) != 0)) { + nni_strfree(rhost); nni_tls_ep_fini(ep); return (rv); } if ((tlsmode == NNI_TLS_CONFIG_CLIENT) && (rhost != NULL)) { if ((rv = nni_tls_config_server_name(ep->cfg, rhost)) != 0) { + nni_strfree(rhost); nni_tls_ep_fini(ep); return (rv); } } + nni_strfree(rhost); ep->proto = nni_sock_proto(sock); ep->authmode = authmode; diff --git a/src/transport/ws/CMakeLists.txt b/src/transport/ws/CMakeLists.txt new file mode 100644 index 00000000..18842df9 --- /dev/null +++ b/src/transport/ws/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# WebSocket transport + +if (NNG_TRANSPORT_WS) + set(WS_SOURCES transport/ws/websocket.c transport/ws/websocket.h) + install(FILES websocket.h DESTINATION include/nng/transport/ws) + +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${WS_SOURCES} PARENT_SCOPE) diff --git a/src/transport/ws/README.adoc b/src/transport/ws/README.adoc new file mode 100644 index 00000000..e3101297 --- /dev/null +++ b/src/transport/ws/README.adoc @@ -0,0 +1,38 @@ += websocket transport + +This transport provides support for SP over websocket using TCP or TLS. +When using TCP, it is compatible with the libnanomsg legacy transport. +It also is compatible with mangos (both TCP and TLS). + +TLS support requires the mbedTLS library. + +We set the "protocol" such as "pair.sp.nanomsg.org" in the +Sec-WebSocket-Protocol field -- the client sets to the the server's +protocol - i.e. the protocol that the server speaks. For example, +if the the server is a REP, then a REQ client would send "rep.sp.nanomsg.org". + +The server sends the same value (it's own), per the WebSocket specs. (Note +that the client's protocol is never sent, but assumed to be complementary +to the protocol in the Sec-WebSocket-Protocol field.) + +Each SP message is a WebSocket message. + +WebSocket is defined in RFC 6455. + +== Design + +We unfortunately need to implement our own design for this -- the only +reasonable client library would be libcurl, and there is a dearth of +suitable server libraries. Since we don't have to support full HTTP, but +just the initial handshake, this isn't too tragic. + +== Multiple Server Sockets + +In order to support Multiple Server sockets listening on the same port, +the application must be long lived. We will set up a listener on the +configured TCP (or TLS) port, and examine the PATH supplied in the GET. +This will be used to match against the URL requested, and if the URL +matches we will create the appropriate pipe. + +If no server endpoint at that address can be found, we return an +HTTP error, and close the socket. diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c new file mode 100644 index 00000000..8a73bcfb --- /dev/null +++ b/src/transport/ws/websocket.c @@ -0,0 +1,533 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" +#include "supplemental/websocket/websocket.h" + +typedef struct ws_ep ws_ep; +typedef struct ws_pipe ws_pipe; + +struct ws_ep { + int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN + char * addr; + uint16_t lproto; // local protocol + uint16_t rproto; // remote protocol + size_t rcvmax; + char * protoname; + nni_list aios; + nni_mtx mtx; + nni_aio * connaio; + nni_aio * accaio; + nni_ws_listener *listener; + nni_ws_dialer * dialer; +}; + +struct ws_pipe { + int mode; // NNI_EP_MODE_DIAL or NNI_EP_MODE_LISTEN + nni_mtx mtx; + size_t rcvmax; // inherited from EP + bool closed; + uint16_t rproto; + uint16_t lproto; + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio *txaio; + nni_aio *rxaio; + nni_ws * ws; +}; + +static void +ws_pipe_send_cb(void *arg) +{ + ws_pipe *p = arg; + nni_aio *taio; + nni_aio *uaio; + + nni_mtx_lock(&p->mtx); + taio = p->txaio; + uaio = p->user_txaio; + p->user_txaio = NULL; + + if (uaio != NULL) { + int rv; + if ((rv = nni_aio_result(taio)) != 0) { + nni_aio_finish_error(uaio, rv); + } else { + nni_aio_finish(uaio, 0, 0); + } + } + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_recv_cb(void *arg) +{ + ws_pipe *p = arg; + nni_aio *raio = p->rxaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&p->mtx); + uaio = p->user_rxaio; + p->user_rxaio = NULL; + if ((rv = nni_aio_result(raio)) != 0) { + if (uaio != NULL) { + nni_aio_finish_error(uaio, rv); + } + } else { + nni_msg *msg = nni_aio_get_msg(raio); + if (uaio != NULL) { + nni_aio_finish_msg(uaio, msg); + } else { + nni_msg_free(msg); + } + } + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_recv_cancel(nni_aio *aio, int rv) +{ + ws_pipe *p = aio->a_prov_data; + nni_mtx_lock(&p->mtx); + if (p->user_rxaio != aio) { + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_cancel(p->rxaio, rv); + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_recv(void *arg, nni_aio *aio) +{ + ws_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) { + nni_mtx_unlock(&p->mtx); + return; + } + p->user_rxaio = aio; + + nni_ws_recv_msg(p->ws, p->rxaio); + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_send_cancel(nni_aio *aio, int rv) +{ + ws_pipe *p = aio->a_prov_data; + nni_mtx_lock(&p->mtx); + if (p->user_txaio != aio) { + nni_mtx_unlock(&p->mtx); + return; + } + // This aborts the upper send, which will call back with an error + // when it is done. + nni_aio_cancel(p->txaio, rv); + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_send(void *arg, nni_aio *aio) +{ + ws_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) { + nni_mtx_unlock(&p->mtx); + return; + } + p->user_txaio = aio; + nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + + nni_ws_send_msg(p->ws, p->txaio); + nni_mtx_unlock(&p->mtx); +} + +static void +ws_pipe_fini(void *arg) +{ + ws_pipe *p = arg; + + nni_aio_stop(p->rxaio); + nni_aio_stop(p->txaio); + + nni_aio_fini(p->rxaio); + nni_aio_fini(p->txaio); + + if (p->ws) { + nni_ws_fini(p->ws); + } + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static void +ws_pipe_close(void *arg) +{ + ws_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + nni_ws_close(p->ws); + nni_mtx_unlock(&p->mtx); +} + +static int +ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) +{ + ws_pipe *p; + int rv; + nni_aio *aio; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + + // Initialize AIOs. + if (((rv = nni_aio_init(&p->txaio, ws_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->rxaio, ws_pipe_recv_cb, p)) != 0)) { + ws_pipe_fini(p); + return (rv); + } + + p->mode = ep->mode; + p->rcvmax = ep->rcvmax; + // p->addr = ep->addr; + p->rproto = ep->rproto; + p->lproto = ep->lproto; + p->ws = ws; + + if ((aio = nni_list_first(&ep->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_pipe(aio, p); + } + + *pipep = p; + return (0); +} + +static uint16_t +ws_pipe_peer(void *arg) +{ + ws_pipe *p = arg; + + return (p->rproto); +} + +static void +ws_pipe_start(void *arg, nni_aio *aio) +{ + if (nni_aio_start(aio, NULL, NULL) == 0) { + nni_aio_finish(aio, 0, 0); + } +} + +// We have very different approaches for server and client. +// Servers use the HTTP server framework, and a request methodology. + +static int +ws_ep_bind(void *arg) +{ + ws_ep *ep = arg; + return (nni_ws_listener_listen(ep->listener)); +} + +static void +ws_ep_cancel(nni_aio *aio, int rv) +{ + ws_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +ws_ep_accept(void *arg, nni_aio *aio) +{ + ws_ep *ep = arg; + + // We already bound, so we just need to look for an available + // pipe (created by the handler), and match it. + // Otherwise we stick the AIO in the accept list. + nni_mtx_lock(&ep->mtx); + if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_list_append(&ep->aios, aio); + if (aio == nni_list_first(&ep->aios)) { + nni_ws_listener_accept(ep->listener, ep->accaio); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +ws_ep_connect(void *arg, nni_aio *aio) +{ + ws_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(nni_list_empty(&ep->aios)); + + // If we can't start, then its dying and we can't report + // either. + if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_list_append(&ep->aios, aio); + nni_ws_dialer_dial(ep->dialer, ep->connaio); + nni_mtx_unlock(&ep->mtx); +} + +static int +ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz) +{ + ws_ep *ep = arg; + if (ep == NULL) { + return (nni_chkopt_size(v, sz, 0, NNI_MAXSZ)); + } + return (nni_setopt_size(&ep->rcvmax, v, sz, 0, NNI_MAXSZ)); +} + +static int +ws_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp) +{ + ws_ep *ep = arg; + return (nni_getopt_size(ep->rcvmax, v, szp)); +} + +static nni_tran_pipe_option ws_pipe_options[] = { +#if 0 + // clang-format off + { NNG_OPT_LOCADDR, ws_pipe_getopt_locaddr }, + { NNG_OPT_REMADDR, ws_pipe_getopt_remaddr }, + // clang-format on +#endif + // terminate list + { NULL, NULL } +}; + +static nni_tran_pipe ws_pipe_ops = { + .p_fini = ws_pipe_fini, + .p_start = ws_pipe_start, + .p_send = ws_pipe_send, + .p_recv = ws_pipe_recv, + .p_close = ws_pipe_close, + .p_peer = ws_pipe_peer, + .p_options = ws_pipe_options, +}; + +static nni_tran_ep_option ws_ep_options[] = { + { + .eo_name = NNG_OPT_RECVMAXSZ, + .eo_getopt = ws_ep_getopt_recvmaxsz, + .eo_setopt = ws_ep_setopt_recvmaxsz, + }, + // terminate list + { NULL, NULL, NULL }, +}; + +static void +ws_ep_fini(void *arg) +{ + ws_ep *ep = arg; + + nni_aio_stop(ep->accaio); + nni_aio_stop(ep->connaio); + nni_aio_fini(ep->accaio); + nni_aio_fini(ep->connaio); + if (ep->listener != NULL) { + nni_ws_listener_fini(ep->listener); + } + if (ep->dialer != NULL) { + nni_ws_dialer_fini(ep->dialer); + } + nni_strfree(ep->addr); + nni_strfree(ep->protoname); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +ws_ep_conn_cb(void *arg) +{ + ws_ep * ep = arg; + ws_pipe *p; + nni_aio *caio = ep->connaio; + nni_aio *uaio; + int rv; + nni_ws * ws = NULL; + + nni_mtx_lock(&ep->mtx); + if (nni_aio_result(caio) == 0) { + ws = nni_aio_get_pipe(caio); + } + if ((uaio = nni_list_first(&ep->aios)) == NULL) { + // The client stopped caring about this! + if (ws != NULL) { + nni_ws_fini(ws); + } + nni_mtx_unlock(&ep->mtx); + return; + } + nni_aio_list_remove(uaio); + NNI_ASSERT(nni_list_empty(&ep->aios)); + if ((rv = nni_aio_result(caio)) != 0) { + nni_aio_finish_error(uaio, rv); + } else if ((rv = ws_pipe_init(&p, ep, ws)) != 0) { + nni_ws_fini(ws); + nni_aio_finish_error(uaio, rv); + } else { + nni_aio_finish_pipe(uaio, p); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +ws_ep_close(void *arg) +{ + ws_ep *ep = arg; + + if (ep->mode == NNI_EP_MODE_LISTEN) { + nni_ws_listener_close(ep->listener); + } else { + nni_ws_dialer_close(ep->dialer); + } +} + +static void +ws_ep_acc_cb(void *arg) +{ + ws_ep * ep = arg; + nni_aio *aaio = ep->accaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&ep->mtx); + uaio = nni_list_first(&ep->aios); + if ((rv = nni_aio_result(aaio)) != 0) { + if (uaio != NULL) { + nni_aio_list_remove(uaio); + nni_aio_finish_error(uaio, rv); + } + } else { + nni_ws *ws = nni_aio_get_pipe(aaio); + if (uaio != NULL) { + ws_pipe *p; + // Make a pipe + nni_aio_list_remove(uaio); + if ((rv = ws_pipe_init(&p, ep, ws)) != 0) { + nni_ws_close(ws); + nni_aio_finish_error(uaio, rv); + } else { + nni_aio_finish_pipe(uaio, p); + } + } + } + if (!nni_list_empty(&ep->aios)) { + nni_ws_listener_accept(ep->listener, aaio); + } + nni_mtx_unlock(&ep->mtx); +} + +static int +ws_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + ws_ep * ep; + const char *pname; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&ep->mtx); + + // List of pipes (server only). + nni_aio_list_init(&ep->aios); + + ep->mode = mode; + ep->lproto = nni_sock_proto(sock); + ep->rproto = nni_sock_peer(sock); + + if (mode == NNI_EP_MODE_DIAL) { + pname = nni_sock_peer_name(sock); + rv = nni_ws_dialer_init(&ep->dialer, url); + } else { + pname = nni_sock_proto_name(sock); + rv = nni_ws_listener_init(&ep->listener, url); + } + + if ((rv == 0) && ((ep->addr = nni_strdup(url)) == NULL)) { + rv = NNG_ENOMEM; + } + if ((rv != 0) || + ((rv = nni_aio_init(&ep->connaio, ws_ep_conn_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->accaio, ws_ep_acc_cb, ep)) != 0) || + ((rv = nni_asprintf(&ep->protoname, "%s.sp.nanomsg.org", pname)) != + 0)) { + ws_ep_fini(ep); + return (rv); + } + + *epp = ep; + return (0); +} +static int +ws_tran_init(void) +{ + return (0); +} + +static void +ws_tran_fini(void) +{ +} + +static nni_tran_ep ws_ep_ops = { + .ep_init = ws_ep_init, + .ep_fini = ws_ep_fini, + .ep_connect = ws_ep_connect, + .ep_bind = ws_ep_bind, + .ep_accept = ws_ep_accept, + .ep_close = ws_ep_close, + .ep_options = ws_ep_options, +}; + +static nni_tran ws_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "ws", + .tran_ep = &ws_ep_ops, + .tran_pipe = &ws_pipe_ops, + .tran_init = ws_tran_init, + .tran_fini = ws_tran_fini, +}; + +int +nng_ws_register(void) +{ + return (nni_tran_register(&ws_tran)); +} diff --git a/src/transport/ws/websocket.h b/src/transport/ws/websocket.h new file mode 100644 index 00000000..1beb6156 --- /dev/null +++ b/src/transport/ws/websocket.h @@ -0,0 +1,62 @@ +// +// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_WS_WEBSOCKET_H +#define NNG_TRANSPORT_WS_WEBSOCKET_H + +// TLS transport. This is used for communication via TLS v1.2 over TCP/IP. + +NNG_DECL int nng_ws_register(void); + +// TLS options. Note that these can only be set *before* the endpoint is +// started. Once started, it is no longer possible to alter the TLS +// configuration. + +// NNG_OPT_TLS_CA_CERT is a string with one or more X.509 certificates, +// representing the entire CA chain. The content may be either PEM or DER +// encoded. +#define NNG_OPT_TLS_CA_CERT "tls:ca-cert" + +// NNG_OPT_TLS_CRL is a PEM encoded CRL (revocation list). Multiple lists +// may be loaded by using this option multiple times. +#define NNG_OPT_TLS_CRL "tls:crl" + +// NNG_OPT_TLS_CERT is used to specify our own certificate. At present +// only one certificate may be supplied. (In the future it may be +// possible to call this multiple times, for servers that select different +// certificates depending upon client capabilities.) +#define NNG_OPT_TLS_CERT "tls:cert" + +// NNG_OPT_TLS_PRIVATE_KEY is used to specify the private key used +// with the given certificate. This should be called after setting +// the certificate. The private key may be in PEM or DER format. +// If in PEM encoded, a terminating ZERO byte should be included. +#define NNG_OPT_TLS_PRIVATE_KEY "tls:private-key" + +// NNG_OPT_TLS_PRIVATE_KEY_PASSWORD is used to specify a password +// used for the private key. The value is an ASCIIZ string. +#define NNG_OPT_TLS_PRIVATE_KEY_PASSWORD "tls:private-key-password" + +// NNG_OPT_TLS_AUTH_MODE is an integer indicating whether our +// peer should be verified or not. It is required on clients/dialers, +// and off on servers/listeners, by default. +#define NNG_OPT_TLS_AUTH_MODE "tls:auth-mode" + +extern int nng_tls_auth_mode_required; +extern int nng_tls_auth_mode_none; +extern int nng_tls_auth_mode_optional; + +// NNG_OPT_TLS_AUTH_VERIFIED is a boolean that can be read on pipes, +// indicating whether the peer certificate is verified. +#define NNG_OPT_TLS_AUTH_VERIFIED "tls:auth-verified" + +// XXX: TBD: Ciphersuite selection and reporting. Session reuse? + +#endif // NNG_TRANSPORT_WS_WEBSOCKET_H diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 607c353c..16a29da1 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1292,7 +1292,7 @@ zt_wire_packet_send_cb(void *arg) nni_aio * aio = arg; zt_send_hdr *hdr; - hdr = nni_aio_get_data(aio); + hdr = nni_aio_get_data(aio, 0); nni_free(hdr, hdr->len + sizeof(*hdr)); nni_aio_fini_cb(aio); } @@ -1355,7 +1355,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, buf += sizeof(*hdr); memcpy(buf, data, len); - nni_aio_set_data(aio, hdr); + nni_aio_set_data(aio, hdr, 0); hdr->sa = addr; hdr->len = len; @@ -2001,7 +2001,7 @@ zt_pipe_ping_cb(void *arg) } if (p->zp_ping_try < p->zp_ping_count) { nni_time now = nni_clock(); - nni_aio_set_timeout(aio, now + p->zp_ping_time); + nni_aio_set_timeout(aio, p->zp_ping_time); // We want pings. We only send one if needed, but we // use the the timer to wake us up even if we aren't // going to send a ping. (We don't increment the try count @@ -2034,7 +2034,7 @@ zt_pipe_start(void *arg, nni_aio *aio) if ((p->zp_ping_count > 0) && (p->zp_ping_time != NNI_TIME_ZERO) && (p->zp_ping_time != NNI_TIME_NEVER) && (p->zp_ping_aio != NULL)) { p->zp_ping_try = 0; - nni_aio_set_timeout(aio, nni_clock() + p->zp_ping_time); + nni_aio_set_timeout(aio, p->zp_ping_time); if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) == 0) { p->zp_ping_active = 1; @@ -2456,7 +2456,7 @@ zt_ep_conn_req_cb(void *arg) } if (nni_list_first(&ep->ze_aios) != NULL) { - nni_aio_set_timeout(aio, nni_clock() + zt_conn_interval); + nni_aio_set_timeout(aio, zt_conn_interval); if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) { ep->ze_creq_active = 1; ep->ze_creq_try++; @@ -2479,8 +2479,7 @@ zt_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&zt_lk); if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) { - nni_time now = nni_clock(); - int rv; + int rv; // Clear the port so we get an ephemeral port. ep->ze_laddr &= ~((uint64_t) zt_port_mask); @@ -2498,7 +2497,7 @@ zt_ep_connect(void *arg, nni_aio *aio) ep->ze_running = 1; - nni_aio_set_timeout(ep->ze_creq_aio, now + zt_conn_interval); + nni_aio_set_timeout(ep->ze_creq_aio, zt_conn_interval); if (nni_aio_start( ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 77eb0bc5..921b5925 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -152,6 +152,9 @@ add_nng_test(pair1 5) add_nng_test(udp 5) add_nng_test(zt 60) add_nng_test(multistress 60) +add_nng_test(httpclient 30) +add_nng_test(httpserver 30) +add_nng_test(ws 30) # compatbility tests # We only support these if ALL the legacy protocols are supported. This diff --git a/tests/base64.c b/tests/base64.c index 9682475c..714bd4b2 100644 --- a/tests/base64.c +++ b/tests/base64.c @@ -8,7 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // -#include <strings.h> +#include <string.h> #include "convey.h" diff --git a/tests/httpclient.c b/tests/httpclient.c new file mode 100644 index 00000000..ab4f46a2 --- /dev/null +++ b/tests/httpclient.c @@ -0,0 +1,120 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "trantest.h" + +#ifndef _WIN32 +#include <arpa/inet.h> +#endif + +// Basic HTTP client tests. +#include "core/nng_impl.h" +#include "supplemental/http/http.h" +#include "supplemental/sha1/sha1.h" + +const uint8_t utf8_sha1sum[20] = { 0x54, 0xf3, 0xb8, 0xbb, 0xfe, 0xda, 0x6f, + 0xb4, 0x96, 0xdd, 0xc9, 0x8b, 0x8c, 0x41, 0xf4, 0xfe, 0xe5, 0xa9, 0x7d, + 0xa9 }; + +TestMain("HTTP Client", { + + nni_init(); + atexit(nng_fini); + + Convey("Given a TCP connection to httpbin.org", { + nni_plat_tcp_ep * ep; + nni_plat_tcp_pipe *p; + nng_aio * aio; + nni_aio * iaio; + nng_sockaddr rsa; + nni_http_client * cli; + nni_http * http; + + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + iaio = (nni_aio *) aio; + iaio->a_addr = &rsa; + + nng_aio_set_timeout(aio, 1000); + nni_plat_tcp_resolv("httpbin.org", "80", NNG_AF_INET, 0, iaio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + So(rsa.s_un.s_in.sa_port == htons(80)); + + So(nni_http_client_init(&cli, &rsa) == 0); + nni_http_client_connect(cli, iaio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + http = nni_aio_get_output(iaio, 0); + Reset({ + nni_http_client_fini(cli); + nni_http_fini(http); + nng_aio_free(aio); + }); + + Convey("We can initiate a message", { + nni_http_req *req; + nni_http_res *res; + So(http != NULL); + + So(nni_http_req_init(&req) == 0); + So(nni_http_res_init(&res) == 0); + Reset({ + nni_http_close(http); + nni_http_req_fini(req); + nni_http_res_fini(res); + }); + So(nni_http_req_set_method(req, "GET") == 0); + So(nni_http_req_set_version(req, "HTTP/1.1") == 0); + So(nni_http_req_set_uri(req, "/encoding/utf8") == 0); + So(nni_http_req_set_header( + req, "Host", "httpbin.org") == 0); + nni_http_write_req(http, req, iaio); + + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + nni_http_read_res(http, res, iaio); + nng_aio_wait(aio); + So(nng_aio_result(aio) == 0); + So(nni_http_res_get_status(res) == 200); + + Convey("The message contents are correct", { + uint8_t digest[20]; + void * data; + const char *cstr; + size_t sz; + + cstr = nni_http_res_get_header( + res, "Content-Length"); + So(cstr != NULL); + sz = atoi(cstr); + So(sz > 0); + + data = nni_alloc(sz); + So(data != NULL); + Reset({ nni_free(data, sz); }); + + iaio->a_niov = 1; + iaio->a_iov[0].iov_len = sz; + iaio->a_iov[0].iov_buf = data; + + nni_aio_wait(iaio); + So(nng_aio_result(aio) == 0); + + nni_http_read_full(http, iaio); + nni_aio_wait(iaio); + So(nni_aio_result(iaio) == 0); + + nni_sha1(data, sz, digest); + So(memcmp(digest, utf8_sha1sum, 20) == 0); + }); + }); + }); +}); diff --git a/tests/httpserver.c b/tests/httpserver.c new file mode 100644 index 00000000..fa2753ea --- /dev/null +++ b/tests/httpserver.c @@ -0,0 +1,146 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "trantest.h" + +#ifndef _WIN32 +#include <arpa/inet.h> +#endif + +// Basic HTTP server tests. +#include "core/nng_impl.h" +#include "supplemental/http/http.h" +#include "supplemental/sha1/sha1.h" + +const uint8_t utf8_sha1sum[20] = { 0x54, 0xf3, 0xb8, 0xbb, 0xfe, 0xda, 0x6f, + 0xb4, 0x96, 0xdd, 0xc9, 0x8b, 0x8c, 0x41, 0xf4, 0xfe, 0xe5, 0xa9, 0x7d, + 0xa9 }; + +void +cleanup(void) +{ + nng_fini(); +} + +TestMain("HTTP Client", { + + nni_http_server *s; + + nni_init(); + atexit(cleanup); + + Convey("We can start an HTTP server", { + nng_sockaddr sa; + nni_aio * aio; + char portbuf[16]; + char *doc = "<html><body>Someone <b>is</b> home!</body</html>"; + + trantest_next_address(portbuf, "%u"); + + So(nni_aio_init(&aio, NULL, NULL) == 0); + aio->a_addr = &sa; + nni_plat_tcp_resolv("127.0.0.1", portbuf, NNG_AF_INET, 0, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + + So(nni_http_server_init(&s, &sa) == 0); + + Reset({ + nni_aio_fini(aio); + nni_http_server_fini(s); + }); + + So(nni_http_server_add_static(s, NULL, "text/html", + "/home.html", doc, strlen(doc)) == 0); + So(nni_http_server_start(s) == 0); + + Convey("We can connect a client to it", { + nni_http_client *cli; + nni_http * h; + nni_http_req * req; + nni_http_res * res; + + So(nni_http_client_init(&cli, &sa) == 0); + nni_http_client_connect(cli, aio); + nni_aio_wait(aio); + + So(nni_aio_result(aio) == 0); + h = nni_aio_get_output(aio, 0); + So(h != NULL); + So(nni_http_req_init(&req) == 0); + So(nni_http_res_init(&res) == 0); + + Reset({ + nni_http_client_fini(cli); + nni_http_fini(h); + nni_http_req_fini(req); + nni_http_res_fini(res); + }); + + Convey("404 works", { + So(nni_http_req_set_method(req, "GET") == 0); + So(nni_http_req_set_version(req, "HTTP/1.1") == + 0); + So(nni_http_req_set_uri(req, "/bogus") == 0); + So(nni_http_req_set_header( + req, "Host", "localhost") == 0); + nni_http_write_req(h, req, aio); + + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + + nni_http_read_res(h, res, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + + So(nni_http_res_get_status(res) == 404); + }); + + Convey("Valid data works", { + char chunk[256]; + const void *ptr; + + So(nni_http_req_set_method(req, "GET") == 0); + So(nni_http_req_set_version(req, "HTTP/1.1") == + 0); + So(nni_http_req_set_uri(req, "/home.html") == + 0); + So(nni_http_req_set_header( + req, "Host", "localhost") == 0); + nni_http_write_req(h, req, aio); + + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + + nni_http_read_res(h, res, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + + So(nni_http_res_get_status(res) == 200); + + ptr = nni_http_res_get_header( + res, "Content-Length"); + So(ptr != NULL); + So(atoi(ptr) == strlen(doc)); + + aio->a_niov = 1; + aio->a_iov[0].iov_len = strlen(doc); + aio->a_iov[0].iov_buf = (void *) chunk; + nni_http_read_full(h, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + So(nni_aio_count(aio) == strlen(doc)); + So(memcmp(chunk, doc, strlen(doc)) == 0); + }); + + }); + }); +}); diff --git a/tests/platform.c b/tests/platform.c index 74dbab50..10278e7d 100644 --- a/tests/platform.c +++ b/tests/platform.c @@ -52,7 +52,7 @@ TestMain("Platform Operations", { nng_msleep(100); So((getms() - now) >= 100); // cannot be *shorter*!! - So((getms() - now) < 150); // crummy clock resolution? + So((getms() - now) < 200); // crummy clock resolution? }); Convey("times work", { uint64_t msend; @@ -69,8 +69,8 @@ TestMain("Platform Operations", { usdelta = (int) (usend - usnow); msdelta = (int) (msend - now); So(usdelta >= 200); - So(usdelta < 220); - So(abs(msdelta - usdelta) < 20); + So(usdelta < 250); // increased tolerance for CIs + So(abs(msdelta - usdelta) < 50); }); }); Convey("Mutexes work", { diff --git a/tests/sha1.c b/tests/sha1.c index f26a5b9c..9f1901e0 100644 --- a/tests/sha1.c +++ b/tests/sha1.c @@ -9,7 +9,7 @@ // #include <stdint.h> -#include <strings.h> +#include <string.h> #include "convey.h" diff --git a/tests/tls.c b/tests/tls.c index 6ec249cf..fa44d9c9 100644 --- a/tests/tls.c +++ b/tests/tls.c @@ -128,6 +128,7 @@ TestMain("TLS Transport", { tt.init = init_tls; tt.tmpl = "tls+tcp://127.0.0.1:%u"; + atexit(nng_fini); trantest_test(&tt); @@ -170,7 +171,8 @@ TestMain("TLS Transport", { So(nng_tls_register() == 0); So(nng_pair_open(&s1) == 0); Reset({ nng_close(s1); }); - So(nng_dial(s1, "tls+tcp://127.0.0.1", NULL, 0) == NNG_EADDRINVAL); + So(nng_dial(s1, "tls+tcp://127.0.0.1", NULL, 0) == + NNG_EADDRINVAL); So(nng_dial(s1, "tls+tcp://127.0.0.1.32", NULL, 0) == NNG_EADDRINVAL); So(nng_dial(s1, "tls+tcp://127.0.x.1.32", NULL, 0) == @@ -183,5 +185,4 @@ TestMain("TLS Transport", { NNG_EADDRINVAL); }); - nng_fini(); }) diff --git a/tests/trantest.h b/tests/trantest.h index 4f6dfe7f..b1b0ff80 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -58,6 +58,9 @@ unsigned trantest_port = 0; #ifndef NNG_HAVE_TLS #define nng_tls_register notransport #endif +#ifndef NNG_HAVE_WEBSOCKET +#define nng_ws_register notransport +#endif int notransport(void) @@ -88,6 +91,9 @@ trantest_checktran(const char *url) #ifndef NNG_HAVE_TLS CHKTRAN(url, "tls+tcp:"); #endif +#ifndef NNG_HAVE_WEBSOCKET + CHKTRAN(url, "ws:"); +#endif (void) url; } @@ -99,7 +105,10 @@ trantest_next_address(char *out, const char *template) if (trantest_port == 0) { char *pstr; - trantest_port = 5555; + + // start at a different port each time -- 5000 - 10000 -- + // unless a specific port is given. + trantest_port = nni_clock() % 5000 + 5000; if (((pstr = ConveyGetEnv("TEST_PORT")) != NULL) && (atoi(pstr) != 0)) { trantest_port = atoi(pstr); @@ -218,7 +227,7 @@ trantest_send_recv(trantest *tt) So(l != 0); So(trantest_dial(tt) == 0); - nng_msleep(20); // listener may be behind slightly + nng_msleep(200); // listener may be behind slightly send = NULL; So(nng_msg_alloc(&send, 0) == 0); @@ -265,7 +274,7 @@ trantest_check_properties(trantest *tt, trantest_proptest_t f) So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0); So(d != 0); - nng_msleep(10); // listener may be behind slightly + nng_msleep(200); // listener may be behind slightly send = NULL; So(nng_msg_alloc(&send, 0) == 0); @@ -307,7 +316,7 @@ trantest_send_recv_large(trantest *tt) So(nng_dial(tt->reqsock, tt->addr, &d, 0) == 0); So(d != 0); - nng_msleep(10); // listener may be behind slightly + nng_msleep(200); // listener may be behind slightly send = NULL; So(nng_msg_alloc(&send, size) == 0); diff --git a/tests/ws.c b/tests/ws.c new file mode 100644 index 00000000..f6eac685 --- /dev/null +++ b/tests/ws.c @@ -0,0 +1,97 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/pair1/pair.h" +#include "trantest.h" + +#include "stubs.h" +// TCP tests. + +#ifndef _WIN32 +#include <arpa/inet.h> +#endif + +static int +check_props_v4(nng_msg *msg, nng_listener l, nng_dialer d) +{ +#if 0 + nng_pipe p; + size_t z; + p = nng_msg_get_pipe(msg); + So(p > 0); + + Convey("Local address property works", { + nng_sockaddr la; + z = sizeof(nng_sockaddr); + So(nng_pipe_getopt(p, NNG_OPT_LOCADDR, &la, &z) == 0); + So(z == sizeof(la)); + So(la.s_un.s_family == NNG_AF_INET); + So(la.s_un.s_in.sa_port == htons(trantest_port - 1)); + So(la.s_un.s_in.sa_port != 0); + So(la.s_un.s_in.sa_addr == htonl(0x7f000001)); + }); + + Convey("Remote address property works", { + nng_sockaddr ra; + z = sizeof(nng_sockaddr); + So(nng_pipe_getopt(p, NNG_OPT_REMADDR, &ra, &z) == 0); + So(z == sizeof(ra)); + So(ra.s_un.s_family == NNG_AF_INET); + So(ra.s_un.s_in.sa_port != 0); + So(ra.s_un.s_in.sa_addr == htonl(0x7f000001)); + }); +#endif + return (0); +} + +TestMain("WebSocket Transport", { + + trantest_test_extended("ws://127.0.0.1:%u/test", check_props_v4); + + Convey("Wild cards work", { + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); + Reset({ + nng_close(s2); + nng_close(s1); + }); + trantest_next_address(addr, "ws://*:%u/test"); + So(nng_listen(s1, addr, NULL, 0) == 0); + // reset port back one + trantest_prev_address(addr, "ws://127.0.0.1:%u/test"); + So(nng_dial(s2, addr, NULL, 0) == 0); + }); + + Convey("Incorrect URL paths do not work", { + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + + So(nng_pair_open(&s1) == 0); + So(nng_pair_open(&s2) == 0); + Reset({ + nng_close(s2); + nng_close(s1); + }); + trantest_next_address(addr, "ws://*:%u/test"); + So(nng_listen(s1, addr, NULL, 0) == 0); + // reset port back one + trantest_prev_address(addr, "ws://127.0.0.1:%u/nothere"); + So(nng_dial(s2, addr, NULL, 0) == NNG_ECONNREFUSED); + }); + + nng_fini(); +}) @@ -194,6 +194,7 @@ TestMain("ZeroTier Transport", { unsigned port; port = 5555; + atexit(nng_fini); Convey("We can register the zero tier transport", { So(nng_zt_register() == 0); }); @@ -353,5 +354,4 @@ TestMain("ZeroTier Transport", { trantest_test_extended("zt://" NWID "/*:%u", check_props); - nng_fini(); }) |
