diff options
| -rw-r--r-- | CMakeLists.txt | 22 | ||||
| -rw-r--r-- | include/nng/nng.h | 234 | ||||
| -rw-r--r-- | src/core/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/core/aio.c | 29 | ||||
| -rw-r--r-- | src/core/init.c | 100 | ||||
| -rw-r--r-- | src/core/init.h | 14 | ||||
| -rw-r--r-- | src/core/init_test.c | 140 | ||||
| -rw-r--r-- | src/core/taskq.c | 32 | ||||
| -rw-r--r-- | src/nng.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 38 | ||||
| -rw-r--r-- | src/platform/windows/win_io.c | 36 | ||||
| -rw-r--r-- | src/platform/windows/win_resolv.c | 54 |
12 files changed, 574 insertions, 134 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 226bb2eb..7fe8f712 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> # Copyright (c) 2012 Martin Sustrik All rights reserved. # Copyright (c) 2013 GoPivotal, Inc. All rights reserved. # Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved. @@ -114,17 +114,19 @@ endif () nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS) +set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.") +mark_as_advanced(NNG_RESOLV_CONCURRENCY) if (NNG_RESOLV_CONCURRENCY) add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY}) endif () -mark_as_advanced(NNG_RESOLV_CONCURRENCY) +set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic") +mark_as_advanced(NNG_NUM_TASKQ_THREADS) if (NNG_NUM_TASKQ_THREADS) add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS}) endif () -mark_as_advanced(NNG_NUM_TASKQ_THREADS) -set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit") +set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit") mark_as_advanced(NNG_MAX_TASKQ_THREADS) if (NNG_MAX_TASKQ_THREADS) add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS}) @@ -132,6 +134,12 @@ endif () # Expire threads. This runs the timeout handling, and having more of them # reduces contention on the common locks used for aio expiration. +set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic") +mark_as_advanced(NNG_NUM_EXPIRE_THREADS) +if (NNG_NUM_EXPIRE_THREADS) + add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS}) +endif () + set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit") mark_as_advanced(NNG_MAX_EXPIRE_THREADS) if (NNG_MAX_EXPIRE_THREADS) @@ -140,6 +148,12 @@ endif() # Poller threads. These threads run the pollers. This is mostly used # on Windows right now, as the POSIX platforms use a single threaded poller. +set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic") +if (NNG_NUM_POLLER_THREADS) + add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS}) +endif () +mark_as_advanced(NNG_NUM_POLLER_THREADS) + set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit") mark_as_advanced(NNG_MAX_POLLER_THREADS) if (NNG_MAX_POLLER_THREADS) diff --git a/include/nng/nng.h b/include/nng/nng.h index 98a9a843..394dd0fd 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -47,7 +47,7 @@ extern "C" { #ifndef NNG_DEPRECATED #if defined(__GNUC__) || defined(__clang__) -#define NNG_DEPRECATED __attribute__ ((deprecated)) +#define NNG_DEPRECATED __attribute__((deprecated)) #else #define NNG_DEPRECATED #endif @@ -59,7 +59,8 @@ extern "C" { #define NNG_MAJOR_VERSION 1 #define NNG_MINOR_VERSION 7 #define NNG_PATCH_VERSION 0 -#define NNG_RELEASE_SUFFIX "pre" // if non-empty (i.e. "pre"), this is a pre-release +#define NNG_RELEASE_SUFFIX \ + "pre" // if non-empty (i.e. "pre"), this is a pre-release // Maximum length of a socket address. This includes the terminating NUL. // This limit is built into other implementations, so do not change it. @@ -71,7 +72,7 @@ extern "C" { // NNG_PROTOCOL_NUMBER is used by protocol headers to calculate their // protocol number from a major and minor number. Applications should // probably not need to use this. -#define NNG_PROTOCOL_NUMBER(maj, min) (((x) *16) + (y)) +#define NNG_PROTOCOL_NUMBER(maj, min) (((x) * 16) + (y)) // Types common to nng. @@ -101,7 +102,7 @@ typedef struct nng_socket_s { uint32_t id; } nng_socket; -typedef int32_t nng_duration; // in milliseconds +typedef int32_t nng_duration; // in milliseconds // nng_time represents an absolute time since some arbitrary point in the // past, measured in milliseconds. The values are always positive. @@ -199,7 +200,7 @@ enum nng_sockaddr_family { // Scatter/gather I/O. typedef struct nng_iov { - void * iov_buf; + void *iov_buf; size_t iov_len; } nng_iov; @@ -616,7 +617,7 @@ NNG_DECL void nng_aio_finish(nng_aio *, int); // final argument is passed to the cancelfn. The final argument of the // cancellation function is the error number (will not be zero) corresponding // to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED. -typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int); +typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int); NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *); // nng_aio_sleep does a "sleeping" operation, basically does nothing @@ -630,9 +631,9 @@ NNG_DECL void nng_msg_free(nng_msg *); NNG_DECL int nng_msg_realloc(nng_msg *, size_t); NNG_DECL int nng_msg_reserve(nng_msg *, size_t); NNG_DECL size_t nng_msg_capacity(nng_msg *); -NNG_DECL void * nng_msg_header(nng_msg *); +NNG_DECL void *nng_msg_header(nng_msg *); NNG_DECL size_t nng_msg_header_len(const nng_msg *); -NNG_DECL void * nng_msg_body(nng_msg *); +NNG_DECL void *nng_msg_body(nng_msg *); NNG_DECL size_t nng_msg_len(const nng_msg *); NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t); NNG_DECL int nng_msg_insert(nng_msg *, const void *, size_t); @@ -693,7 +694,7 @@ NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe); NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // Flags. -#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer +#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer #define NNG_FLAG_NONBLOCK 2u // Non-blocking operations // Options. @@ -1250,7 +1251,6 @@ NNG_DECL int nng_stream_listener_set_ptr( NNG_DECL int nng_stream_listener_set_addr( nng_stream_listener *, const char *, const nng_sockaddr *); - #ifndef NNG_ELIDE_DEPRECATED // These are legacy APIs that have been deprecated. // Their use is strongly discouraged. @@ -1260,95 +1260,205 @@ NNG_DECL int nng_stream_listener_set_addr( NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *) NNG_DEPRECATED; // Socket options. Use nng_socket_get and nng_socket_set instead. -NNG_DECL int nng_getopt(nng_socket, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt( + nng_socket, const char *, void *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_getopt_bool(nng_socket, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_getopt_int(nng_socket, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_ms(nng_socket, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_ms( + nng_socket, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_size( + nng_socket, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_getopt_uint64( + nng_socket, const char *, uint64_t *) NNG_DEPRECATED; NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_getopt_string(nng_socket, const char *, char **) NNG_DEPRECATED; -NNG_DECL int nng_setopt(nng_socket, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_getopt_string( + nng_socket, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_setopt( + nng_socket, const char *, const void *, size_t) NNG_DEPRECATED; NNG_DECL int nng_setopt_bool(nng_socket, const char *, bool) NNG_DEPRECATED; NNG_DECL int nng_setopt_int(nng_socket, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_setopt_ms(nng_socket, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_setopt_ms( + nng_socket, const char *, nng_duration) NNG_DEPRECATED; NNG_DECL int nng_setopt_size(nng_socket, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_setopt_uint64(nng_socket, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_setopt_string(nng_socket, const char *, const char *) NNG_DEPRECATED; +NNG_DECL int nng_setopt_uint64( + nng_socket, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_setopt_string( + nng_socket, const char *, const char *) NNG_DEPRECATED; NNG_DECL int nng_setopt_ptr(nng_socket, const char *, void *) NNG_DEPRECATED; // Context options. Use nng_ctx_get and nng_ctx_set instead. -NNG_DECL int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt( + nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_ctx_getopt_bool(nng_ctx, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_ctx_getopt_int(nng_ctx, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_getopt_size(nng_ctx, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt_ms( + nng_ctx, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_getopt_size( + nng_ctx, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_ctx_setopt( + nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_bool(nng_ctx, const char *, bool) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_int(nng_ctx, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_ctx_setopt_ms( + nng_ctx, const char *, nng_duration) NNG_DEPRECATED; NNG_DECL int nng_ctx_setopt_size(nng_ctx, const char *, size_t) NNG_DEPRECATED; // Dialer options. Use nng_dialer_get and nng_dialer_set instead. -NNG_DECL int nng_dialer_getopt(nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_bool(nng_dialer, const char *, bool *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_int(nng_dialer, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_ms(nng_dialer, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_size(nng_dialer, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt( + nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_bool( + nng_dialer, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_int( + nng_dialer, const char *, int *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_ms( + nng_dialer, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_size( + nng_dialer, const char *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_dialer_getopt_sockaddr( nng_dialer, const char *, nng_sockaddr *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_ptr(nng_dialer, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_dialer_getopt_string(nng_dialer, const char *, char **) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt(nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_bool(nng_dialer, const char *, bool) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_int(nng_dialer, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_ms(nng_dialer, const char *, nng_duration) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_size(nng_dialer, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_uint64(nng_dialer, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_ptr(nng_dialer, const char *, void *) NNG_DEPRECATED; -NNG_DECL int nng_dialer_setopt_string(nng_dialer, const char *, const char *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_uint64( + nng_dialer, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_ptr( + nng_dialer, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_dialer_getopt_string( + nng_dialer, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt( + nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_bool( + nng_dialer, const char *, bool) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_int( + nng_dialer, const char *, int) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_ms( + nng_dialer, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_size( + nng_dialer, const char *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_uint64( + nng_dialer, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_ptr( + nng_dialer, const char *, void *) NNG_DEPRECATED; +NNG_DECL int nng_dialer_setopt_string( + nng_dialer, const char *, const char *) NNG_DEPRECATED; // Listener options. Use nng_listener_get and nng_listener_set instead. -NNG_DECL int nng_listener_getopt(nng_listener, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_bool(nng_listener, const char *, bool *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_int(nng_listener, const char *, int *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt( + nng_listener, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_bool( + nng_listener, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_int( + nng_listener, const char *, int *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_ms( nng_listener, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_size(nng_listener, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_size( + nng_listener, const char *, size_t *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_sockaddr( nng_listener, const char *, nng_sockaddr *) NNG_DEPRECATED; NNG_DECL int nng_listener_getopt_uint64( nng_listener, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_ptr(nng_listener, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_listener_getopt_string(nng_listener, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_ptr( + nng_listener, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_listener_getopt_string( + nng_listener, const char *, char **) NNG_DEPRECATED; NNG_DECL int nng_listener_setopt( nng_listener, const char *, const void *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_bool(nng_listener, const char *, bool) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_int(nng_listener, const char *, int) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_ms(nng_listener, const char *, nng_duration) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_size(nng_listener, const char *, size_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_uint64(nng_listener, const char *, uint64_t) NNG_DEPRECATED; -NNG_DECL int nng_listener_setopt_ptr(nng_listener, const char *, void *) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_bool( + nng_listener, const char *, bool) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_int( + nng_listener, const char *, int) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_ms( + nng_listener, const char *, nng_duration) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_size( + nng_listener, const char *, size_t) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_uint64( + nng_listener, const char *, uint64_t) NNG_DEPRECATED; +NNG_DECL int nng_listener_setopt_ptr( + nng_listener, const char *, void *) NNG_DEPRECATED; NNG_DECL int nng_listener_setopt_string( nng_listener, const char *, const char *) NNG_DEPRECATED; // Pipe options. Use nng_pipe_get instead. -NNG_DECL int nng_pipe_getopt(nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_bool(nng_pipe, const char *, bool *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt( + nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_bool( + nng_pipe, const char *, bool *) NNG_DEPRECATED; NNG_DECL int nng_pipe_getopt_int(nng_pipe, const char *, int *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_ms(nng_pipe, const char *, nng_duration *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_size(nng_pipe, const char *, size_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **) NNG_DEPRECATED; -NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_ms( + nng_pipe, const char *, nng_duration *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_size( + nng_pipe, const char *, size_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_sockaddr( + nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_uint64( + nng_pipe, const char *, uint64_t *) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_ptr( + nng_pipe, const char *, void **) NNG_DEPRECATED; +NNG_DECL int nng_pipe_getopt_string( + nng_pipe, const char *, char **) NNG_DEPRECATED; // nng_closeall closes all open sockets. Do not call this from // a library; it will affect all sockets. NNG_DECL void nng_closeall(void) NNG_DEPRECATED; -#endif +#endif // NNG_ELIDE_DEPRECATED + +// nng_init_parameter is used by applications to change a tunable setting. +// This function must be called before any other NNG function for the setting +// to have any effect. This function is also not thread-safe! +// +// The list of parameters supported is *not* documented, and subject to change. +// +// We try to provide sane defaults, so the use here is intended to provide +// more control for applications that cannot use compile-time configuration. +// +// Applications should not depend on this API for correct operation. +// +// This API is intentionally undocumented. +// +// Parameter settings are lost after nng_fini() is called. +typedef int nng_init_parameter; +NNG_DECL void nng_init_set_parameter(nng_init_parameter, uint64_t); + +// The following list of parameters is not part of our API stability promise. +// In particular the set of parameters that are supported, the default values, +// the range of valid values, and semantics associated therein are subject to +// change at any time. We won't go out of our way to break these, and we will +// try to prevent changes here from breaking working applications, but this is +// on a best effort basis only. +// +// NOTE: When removing a value, please leave the enumeration in place and add +// a suffix _RETIRED ... this will preserve the binary values for binary compatibility. +enum { + NNG_INIT_PARAMETER_NONE = 0, // ensure values start at 1. + + // Fix the number of threads used for tasks (callbacks), + // Default is 2 threads per core, capped to NNG_INIT_MAX_TASK_THREADS. + // At least 2 threads will be created in any case. + NNG_INIT_NUM_TASK_THREADS, + + // Fix the number of threads used for expiration. Default is one thread per + // core, capped to NNG_INIT_MAX_EXPIRE_THREADS. At least one thread will be created. + NNG_INIT_NUM_EXPIRE_THREADS, + + // Fix the number of poller threads (used for I/O). Support varies + // by platform (many platforms only support a single poller thread.) + NNG_INIT_NUM_POLLER_THREADS, + + // Fix the number of threads used for DNS resolution. At least one will be used. + // Default is controlled by NNG_RESOLV_CONCURRENCY compile time variable. + NNG_INIT_NUM_RESOLVER_THREADS, + + // Limit the number of threads of created for tasks. + // NNG will always create at least 2 of these in order to prevent deadlocks. + // Zero means no limit. Default is determined by NNG_MAX_TASKQ_THREADS compile time variable. + NNG_INIT_MAX_TASK_THREADS, + + // Limit the number of threads created for expiration. Zero means no limit. + // Default is determined by the NNG_MAX_EXPIRE_THREADS compile time variable. + NNG_INIT_MAX_EXPIRE_THREADS, + + // Limit the number of poller/IO threads created. Zero means no limit. + // Default is determined by NNG_MAX_POLLER_THREADS compile time variable. + NNG_INIT_MAX_POLLER_THREADS, +}; #ifdef __cplusplus } diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 9e5a6bec..009d6bb0 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2021 Staysail Systems, Inc. <info@staystail.tech> +# Copyright 2024 Staysail Systems, Inc. <info@staystail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -78,6 +78,7 @@ nng_test(aio_test) nng_test(buf_size_test) nng_test(errors_test) nng_test(id_test) +nng_test(init_test) nng_test(list_test) nng_test(message_test) nng_test(reconnect_test) diff --git a/src/core/aio.c b/src/core/aio.c index 3d4a56c1..084795bd 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -843,18 +843,29 @@ int nni_aio_sys_init(void) { int num_thr; + int max_thr; -#ifndef NNG_NUM_EXPIRE_THREADS - num_thr = nni_plat_ncpu(); -#else - num_thr = NNG_NUM_EXPIRE_THREADS; +#ifndef NNG_MAX_EXPIRE_THREADS +#define NNG_MAX_EXPIRE_THREADS 8 #endif -#if NNG_MAX_EXPIRE_THREADS > 0 - if (num_thr > NNG_MAX_EXPIRE_THREADS) { - num_thr = NNG_MAX_EXPIRE_THREADS; - } + +#ifndef NNG_NUM_EXPIRE_THREADS +#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu()) #endif + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS); + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + nni_init_set_effective(NNG_INIT_NUM_EXPIRE_THREADS, num_thr); nni_aio_expire_q_list = nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); nni_aio_expire_q_cnt = num_thr; diff --git a/src/core/init.c b/src/core/init.c index f2195bcb..8f2e1056 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -54,10 +54,107 @@ nni_init(void) return (nni_plat_init(nni_init_helper)); } +// accessing the list of parameters +typedef struct nni_init_param { + nni_list_node node; + nng_init_parameter param; + uint64_t value; +#ifdef NNG_TEST_LIB + uint64_t effective; +#endif +} nni_init_param; + +static nni_list nni_init_params = + NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node); + +void +nni_init_set_param(nng_init_parameter p, uint64_t value) +{ + if (nni_inited) { + // this is paranoia -- if some library code started already + // then we cannot safely change parameters, and modifying the + // list is not thread safe. + return; + } + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + item->value = value; + return; + } + } + if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { + item->param = p; + item->value = value; + nni_list_append(&nni_init_params, item); + } +} + +uint64_t +nni_init_get_param(nng_init_parameter p, uint64_t default_value) +{ + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + return (item->value); + } + } + return (default_value); +} + +void +nni_init_set_effective(nng_init_parameter p, uint64_t value) +{ +#ifdef NNG_TEST_LIB + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + item->effective = value; + return; + } + } + if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { + item->param = p; + item->effective = value; + nni_list_append(&nni_init_params, item); + } +#else + NNI_ARG_UNUSED(p); + NNI_ARG_UNUSED(value); +#endif +} + +#ifdef NNG_TEST_LIB +uint64_t +nni_init_get_effective(nng_init_parameter p) +{ + nni_init_param *item; + NNI_LIST_FOREACH (&nni_init_params, item) { + if (item->param == p) { + return (item->effective); + } + } + return ((uint64_t)-1); +} +#endif + + +static void +nni_init_params_fini(void) +{ + nni_init_param *item; + while ((item = nni_list_first(&nni_init_params)) != NULL) { + nni_list_remove(&nni_init_params, item); + NNI_FREE_STRUCT(item); + } +} + void nni_fini(void) { if (!nni_inited) { + // make sure we discard parameters even if we didn't startup + nni_init_params_fini(); return; } nni_sp_tran_sys_fini(); @@ -67,6 +164,7 @@ nni_fini(void) nni_taskq_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_id_map_sys_fini(); + nni_init_params_fini(); nni_plat_fini(); nni_inited = false; diff --git a/src/core/init.h b/src/core/init.h index 4340b15b..d20cf046 100644 --- a/src/core/init.h +++ b/src/core/init.h @@ -1,7 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2017 Capitar IT Group BV <info@capitar.com> -// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -23,4 +22,15 @@ int nni_init(void); // that all resources used by the library are released back to the system. void nni_fini(void); +// nni_init_param is used by applications (via nng_init_param) to configure +// some tunable settings at runtime. It must be called before any other NNG +// functions are called, in order to have any effect at all. +void nni_init_set_param(nng_init_parameter, uint64_t value); + +// subsystems can call this to obtain a parameter value. +uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value); + +// subsystems can set this to facilitate tests (only used in test code) +void nni_init_set_effective(nng_init_parameter p, uint64_t value); + #endif // CORE_INIT_H diff --git a/src/core/init_test.c b/src/core/init_test.c new file mode 100644 index 00000000..9b9e26b4 --- /dev/null +++ b/src/core/init_test.c @@ -0,0 +1,140 @@ +// +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +uint64_t nni_init_get_param( + nng_init_parameter parameter, uint64_t default_value); +uint64_t nni_init_get_effective(nng_init_parameter p); +void nni_init_set_effective(nng_init_parameter p, uint64_t); + +void +test_init_param(void) +{ + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456); + nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); + nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 124); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 124); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); + nng_fini(); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567); +} + +void +test_set_effective(void) +{ + nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 999); + NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 0) == 0); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 999); + nng_fini(); +} + +void +test_init_zero_resolvers(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_RESOLVER_THREADS, 0); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT( + nni_init_get_effective(NNG_INIT_NUM_RESOLVER_THREADS) == 1); + nng_fini(); +} + +void +test_init_one_task_thread(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 0); + nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 1); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 2); + nng_fini(); +} + +void +test_init_too_many_task_threads(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 256); + nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 4); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 4); + nng_fini(); +} + +void +test_init_no_expire_thread(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 0); + nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 0); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 1); + nng_fini(); +} + +void +test_init_too_many_expire_threads(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 256); + nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 2); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 2); + nng_fini(); +} + +// poller tuning only supported on Windows right now +#ifdef NNG_PLATFORM_WINDOWS +void +test_init_poller_no_threads(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 0); + nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 0); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 1); + nng_fini(); +} + +void +test_init_too_many_poller_threads(void) +{ + nng_socket s; + nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 256); + nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 2); + NUTS_OPEN(s); + NUTS_CLOSE(s); + NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 2); + nng_fini(); +} +#endif + +NUTS_TESTS = { + { "init parameter", test_init_param }, + { "init set effective", test_set_effective }, + { "init zero resolvers", test_init_zero_resolvers }, + { "init one task thread", test_init_one_task_thread }, + { "init too many task threads", test_init_too_many_task_threads }, + { "init no expire thread", test_init_no_expire_thread }, + { "init too many expire threads", test_init_too_many_expire_threads }, +#ifdef NNG_PLATFORM_WINDOWS + { "init no poller thread", test_init_poller_no_threads }, + { "init too many poller threads", test_init_too_many_poller_threads }, +#endif + + { NULL, NULL }, +};
\ No newline at end of file diff --git a/src/core/taskq.c b/src/core/taskq.c index d914093b..09886596 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -1,5 +1,5 @@ // -// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -245,20 +245,32 @@ nni_task_fini(nni_task *task) int nni_taskq_sys_init(void) { - int nthrs; + int num_thr; + int max_thr; #ifndef NNG_NUM_TASKQ_THREADS - nthrs = nni_plat_ncpu() * 2; -#else - nthrs = NNG_NUM_TASKQ_THREADS; +#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2) #endif -#if NNG_MAX_TASKQ_THREADS > 0 - if (nthrs > NNG_MAX_TASKQ_THREADS) { - nthrs = NNG_MAX_TASKQ_THREADS; - } + +#ifndef NNG_MAX_TASKQ_THREADS +#define NNG_MAX_TASKQ_THREADS 16 #endif - return (nni_taskq_init(&nni_taskq_systq, nthrs)); + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS); + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 2) { + num_thr = 2; + } + nni_init_set_effective(NNG_INIT_NUM_TASK_THREADS, num_thr); + + return (nni_taskq_init(&nni_taskq_systq, num_thr)); } void @@ -2011,3 +2011,9 @@ nng_version(void) return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr( NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX); } + +void +nng_init_set_parameter(nng_init_parameter p, uint64_t value) +{ + nni_init_set_param(p, value); +}
\ No newline at end of file diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index c6abee5f..8eaa29f2 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -25,14 +25,10 @@ // for it to ensure that names can be looked up concurrently. This isn't // as elegant or scalable as a true asynchronous resolver would be, but // it has the advantage of being fairly portable, and concurrent enough for -// the vast, vast majority of use cases. The total thread count can be +// the vast majority of use cases. The total thread count can be // changed with this define. Note that some platforms may not have a // thread-safe getaddrinfo(). In that case they should set this to 1. -#ifndef NNG_RESOLV_CONCURRENCY -#define NNG_RESOLV_CONCURRENCY 4 -#endif - #ifndef AI_NUMERICSERV #define AI_NUMERICSERV 0 #endif @@ -41,7 +37,8 @@ static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); static bool resolv_fini = false; static nni_list resolv_aios; -static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY]; +static nni_thr *resolv_thrs; +static int resolv_num_thr; typedef struct resolv_item resolv_item; struct resolv_item { @@ -450,14 +447,30 @@ nni_posix_resolv_sysinit(void) resolv_fini = false; nni_aio_list_init(&resolv_aios); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { +#ifndef NNG_RESOLV_CONCURRENCY +#define NNG_RESOLV_CONCURRENCY 4 +#endif + + resolv_num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY); + if (resolv_num_thr < 1) { + resolv_num_thr = 1; + } + // no limit on the maximum for now + nni_init_set_effective(NNG_INIT_NUM_RESOLVER_THREADS, resolv_num_thr); + resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); + if (resolv_thrs == NULL) { + return (NNG_ENOMEM); + } + + for (int i = 0; i < resolv_num_thr; i++) { int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); if (rv != 0) { nni_posix_resolv_sysfini(); return (rv); } } - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_run(&resolv_thrs[i]); } @@ -472,8 +485,11 @@ nni_posix_resolv_sysfini(void) nni_cv_wake(&resolv_cv); nni_mtx_unlock(&resolv_mtx); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { - nni_thr_fini(&resolv_thrs[i]); + if (resolv_thrs != NULL) { + for (int i = 0; i < resolv_num_thr; i++) { + nni_thr_fini(&resolv_thrs[i]); + } + NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); } } diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index b08f2e4d..1e985130 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -89,26 +89,34 @@ nni_win_io_sysinit(void) HANDLE h; int i; int rv; - int nthr = nni_plat_ncpu() * 2; + int num_thr; + int max_thr; - // Limits on the thread count. This is fairly arbitrary. - if (nthr < 2) { - nthr = 2; - } #ifndef NNG_MAX_POLLER_THREADS #define NNG_MAX_POLLER_THREADS 8 #endif -#if NNG_MAX_POLLER_THREADS > 0 - if (nthr > NNG_MAX_POLLER_THREADS) { - nthr = NNG_MAX_POLLER_THREADS; - } +#ifndef NNG_NUM_POLLER_THREADS +#define NNG_NUM_POLLER_THREADS (nni_plat_ncpu()) #endif - if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { + max_thr = (int) nni_init_get_param( + NNG_INIT_MAX_POLLER_THREADS, NNG_MAX_POLLER_THREADS); + + num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_POLLER_THREADS, NNG_NUM_POLLER_THREADS); + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + nni_init_set_effective(NNG_INIT_NUM_POLLER_THREADS, num_thr); + if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, num_thr)) == NULL) { return (NNG_ENOMEM); } - win_io_nthr = nthr; + win_io_nthr = num_thr; - h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr); + h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, num_thr); if (h == NULL) { return (nni_win_error(GetLastError())); } @@ -145,7 +153,7 @@ nni_win_io_sysfini(void) nni_thr_fini(&win_io_thrs[i]); } - NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr); + NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr); } #endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 528da451..92c7461f 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -22,23 +22,20 @@ // host file, WINS, or other naming services. As a result, we just build // our own limited asynchronous resolver with threads. -#ifndef NNG_RESOLV_CONCURRENCY -#define NNG_RESOLV_CONCURRENCY 4 -#endif - -static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; -static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); +static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER; +static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx); static bool resolv_fini = false; static nni_list resolv_aios; -static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY]; +static nni_thr *resolv_thrs; +static int resolv_num_thr; typedef struct resolv_item resolv_item; struct resolv_item { int family; bool passive; - char * host; - char * serv; - nni_aio * aio; + char *host; + char *serv; + nni_aio *aio; nng_sockaddr *sa; }; @@ -159,9 +156,9 @@ resolv_task(resolv_item *item) nni_mtx_lock(&resolv_mtx); if ((probe != NULL) && (item->aio != NULL)) { - struct sockaddr_in * sin; + struct sockaddr_in *sin; struct sockaddr_in6 *sin6; - nni_sockaddr * sa; + nni_sockaddr *sa; sa = item->sa; @@ -270,7 +267,7 @@ resolv_worker(void *notused) nni_mtx_lock(&resolv_mtx); for (;;) { - nni_aio * aio; + nni_aio *aio; resolv_item *item; int rv; @@ -311,9 +308,9 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port) int rv; bool v6 = false; bool wrapped = false; - char * port; - char * host; - char * buf; + char *port; + char *host; + char *buf; size_t buf_len; if (addr == NULL) { @@ -411,7 +408,23 @@ nni_win_resolv_sysinit(void) nni_aio_list_init(&resolv_aios); resolv_fini = false; - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { +#ifndef NNG_RESOLV_CONCURRENCY +#define NNG_RESOLV_CONCURRENCY 4 +#endif + + resolv_num_thr = (int) nni_init_get_param( + NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY); + if (resolv_num_thr < 1) { + resolv_num_thr = 1; + } + // no limit on the maximum for now + nni_init_set_effective(NNG_INIT_NUM_RESOLVER_THREADS, resolv_num_thr); + resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr); + if (resolv_thrs == NULL) { + return (NNG_ENOMEM); + } + + for (int i = 0; i < resolv_num_thr; i++) { int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); if (rv != 0) { nni_win_resolv_sysfini(); @@ -419,7 +432,7 @@ nni_win_resolv_sysinit(void) } nni_thr_set_name(&resolv_thrs[i], "nng:resolver"); } - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_run(&resolv_thrs[i]); } return (0); @@ -432,9 +445,10 @@ nni_win_resolv_sysfini(void) resolv_fini = true; nni_cv_wake(&resolv_cv); nni_mtx_unlock(&resolv_mtx); - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < resolv_num_thr; i++) { nni_thr_fini(&resolv_thrs[i]); } + NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr); } #endif // NNG_PLATFORM_WINDOWS |
