aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt22
-rw-r--r--include/nng/nng.h234
-rw-r--r--src/core/CMakeLists.txt3
-rw-r--r--src/core/aio.c29
-rw-r--r--src/core/init.c100
-rw-r--r--src/core/init.h14
-rw-r--r--src/core/init_test.c140
-rw-r--r--src/core/taskq.c32
-rw-r--r--src/nng.c6
-rw-r--r--src/platform/posix/posix_resolv_gai.c38
-rw-r--r--src/platform/windows/win_io.c36
-rw-r--r--src/platform/windows/win_resolv.c54
12 files changed, 574 insertions, 134 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 226bb2eb..7fe8f712 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
# Copyright (c) 2012 Martin Sustrik All rights reserved.
# Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
# Copyright (c) 2015-2016 Jack R. Dunaway. All rights reserved.
@@ -114,17 +114,19 @@ endif ()
nng_defines_if(NNG_ENABLE_STATS NNG_ENABLE_STATS)
+set(NNG_RESOLV_CONCURRENCY 4 CACHE STRING "Resolver (DNS) concurrency.")
+mark_as_advanced(NNG_RESOLV_CONCURRENCY)
if (NNG_RESOLV_CONCURRENCY)
add_definitions(-DNNG_RESOLV_CONCURRENCY=${NNG_RESOLV_CONCURRENCY})
endif ()
-mark_as_advanced(NNG_RESOLV_CONCURRENCY)
+set(NNG_NUM_TASKQ_THREADS 0 CACHE STRING "Fixed number of task threads, 0 for automatic")
+mark_as_advanced(NNG_NUM_TASKQ_THREADS)
if (NNG_NUM_TASKQ_THREADS)
add_definitions(-DNNG_NUM_TASKQ_THREADS=${NNG_NUM_TASKQ_THREADS})
endif ()
-mark_as_advanced(NNG_NUM_TASKQ_THREADS)
-set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on taskq threads, 0 for no limit")
+set(NNG_MAX_TASKQ_THREADS 16 CACHE STRING "Upper bound on task threads, 0 for no limit")
mark_as_advanced(NNG_MAX_TASKQ_THREADS)
if (NNG_MAX_TASKQ_THREADS)
add_definitions(-DNNG_MAX_TASKQ_THREADS=${NNG_MAX_TASKQ_THREADS})
@@ -132,6 +134,12 @@ endif ()
# Expire threads. This runs the timeout handling, and having more of them
# reduces contention on the common locks used for aio expiration.
+set(NNG_NUM_EXPIRE_THREADS 0 CACHE STRING "Fixed number of expire threads, 0 for automatic")
+mark_as_advanced(NNG_NUM_EXPIRE_THREADS)
+if (NNG_NUM_EXPIRE_THREADS)
+ add_definitions(-DNNG_NUM_EXPIRE_THREADS=${NNG_NUM_EXPIRE_THREADS})
+endif ()
+
set(NNG_MAX_EXPIRE_THREADS 8 CACHE STRING "Upper bound on expire threads, 0 for no limit")
mark_as_advanced(NNG_MAX_EXPIRE_THREADS)
if (NNG_MAX_EXPIRE_THREADS)
@@ -140,6 +148,12 @@ endif()
# Poller threads. These threads run the pollers. This is mostly used
# on Windows right now, as the POSIX platforms use a single threaded poller.
+set(NNG_NUM_POLLER_THREADS 0 CACHE STRING "Fixed number of I/O poller threads, 0 for automatic")
+if (NNG_NUM_POLLER_THREADS)
+ add_definitions(-DNNG_NUM_POLLER_THREADS=${NNG_NUM_POLLER_THREADS})
+endif ()
+mark_as_advanced(NNG_NUM_POLLER_THREADS)
+
set(NNG_MAX_POLLER_THREADS 8 CACHE STRING "Upper bound on I/O poller threads, 0 for no limit")
mark_as_advanced(NNG_MAX_POLLER_THREADS)
if (NNG_MAX_POLLER_THREADS)
diff --git a/include/nng/nng.h b/include/nng/nng.h
index 98a9a843..394dd0fd 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -47,7 +47,7 @@ extern "C" {
#ifndef NNG_DEPRECATED
#if defined(__GNUC__) || defined(__clang__)
-#define NNG_DEPRECATED __attribute__ ((deprecated))
+#define NNG_DEPRECATED __attribute__((deprecated))
#else
#define NNG_DEPRECATED
#endif
@@ -59,7 +59,8 @@ extern "C" {
#define NNG_MAJOR_VERSION 1
#define NNG_MINOR_VERSION 7
#define NNG_PATCH_VERSION 0
-#define NNG_RELEASE_SUFFIX "pre" // if non-empty (i.e. "pre"), this is a pre-release
+#define NNG_RELEASE_SUFFIX \
+ "pre" // if non-empty (i.e. "pre"), this is a pre-release
// Maximum length of a socket address. This includes the terminating NUL.
// This limit is built into other implementations, so do not change it.
@@ -71,7 +72,7 @@ extern "C" {
// NNG_PROTOCOL_NUMBER is used by protocol headers to calculate their
// protocol number from a major and minor number. Applications should
// probably not need to use this.
-#define NNG_PROTOCOL_NUMBER(maj, min) (((x) *16) + (y))
+#define NNG_PROTOCOL_NUMBER(maj, min) (((x) * 16) + (y))
// Types common to nng.
@@ -101,7 +102,7 @@ typedef struct nng_socket_s {
uint32_t id;
} nng_socket;
-typedef int32_t nng_duration; // in milliseconds
+typedef int32_t nng_duration; // in milliseconds
// nng_time represents an absolute time since some arbitrary point in the
// past, measured in milliseconds. The values are always positive.
@@ -199,7 +200,7 @@ enum nng_sockaddr_family {
// Scatter/gather I/O.
typedef struct nng_iov {
- void * iov_buf;
+ void *iov_buf;
size_t iov_len;
} nng_iov;
@@ -616,7 +617,7 @@ NNG_DECL void nng_aio_finish(nng_aio *, int);
// final argument is passed to the cancelfn. The final argument of the
// cancellation function is the error number (will not be zero) corresponding
// to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED.
-typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int);
+typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int);
NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
// nng_aio_sleep does a "sleeping" operation, basically does nothing
@@ -630,9 +631,9 @@ NNG_DECL void nng_msg_free(nng_msg *);
NNG_DECL int nng_msg_realloc(nng_msg *, size_t);
NNG_DECL int nng_msg_reserve(nng_msg *, size_t);
NNG_DECL size_t nng_msg_capacity(nng_msg *);
-NNG_DECL void * nng_msg_header(nng_msg *);
+NNG_DECL void *nng_msg_header(nng_msg *);
NNG_DECL size_t nng_msg_header_len(const nng_msg *);
-NNG_DECL void * nng_msg_body(nng_msg *);
+NNG_DECL void *nng_msg_body(nng_msg *);
NNG_DECL size_t nng_msg_len(const nng_msg *);
NNG_DECL int nng_msg_append(nng_msg *, const void *, size_t);
NNG_DECL int nng_msg_insert(nng_msg *, const void *, size_t);
@@ -693,7 +694,7 @@ NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe);
NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// Flags.
-#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer
+#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer
#define NNG_FLAG_NONBLOCK 2u // Non-blocking operations
// Options.
@@ -1250,7 +1251,6 @@ NNG_DECL int nng_stream_listener_set_ptr(
NNG_DECL int nng_stream_listener_set_addr(
nng_stream_listener *, const char *, const nng_sockaddr *);
-
#ifndef NNG_ELIDE_DEPRECATED
// These are legacy APIs that have been deprecated.
// Their use is strongly discouraged.
@@ -1260,95 +1260,205 @@ NNG_DECL int nng_stream_listener_set_addr(
NNG_DECL int nng_msg_getopt(nng_msg *, int, void *, size_t *) NNG_DEPRECATED;
// Socket options. Use nng_socket_get and nng_socket_set instead.
-NNG_DECL int nng_getopt(nng_socket, const char *, void *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_getopt(
+ nng_socket, const char *, void *, size_t *) NNG_DEPRECATED;
NNG_DECL int nng_getopt_bool(nng_socket, const char *, bool *) NNG_DEPRECATED;
NNG_DECL int nng_getopt_int(nng_socket, const char *, int *) NNG_DEPRECATED;
-NNG_DECL int nng_getopt_ms(nng_socket, const char *, nng_duration *) NNG_DEPRECATED;
-NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *) NNG_DEPRECATED;
+NNG_DECL int nng_getopt_ms(
+ nng_socket, const char *, nng_duration *) NNG_DEPRECATED;
+NNG_DECL int nng_getopt_size(
+ nng_socket, const char *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_getopt_uint64(
+ nng_socket, const char *, uint64_t *) NNG_DEPRECATED;
NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **) NNG_DEPRECATED;
-NNG_DECL int nng_getopt_string(nng_socket, const char *, char **) NNG_DEPRECATED;
-NNG_DECL int nng_setopt(nng_socket, const char *, const void *, size_t) NNG_DEPRECATED;
+NNG_DECL int nng_getopt_string(
+ nng_socket, const char *, char **) NNG_DEPRECATED;
+NNG_DECL int nng_setopt(
+ nng_socket, const char *, const void *, size_t) NNG_DEPRECATED;
NNG_DECL int nng_setopt_bool(nng_socket, const char *, bool) NNG_DEPRECATED;
NNG_DECL int nng_setopt_int(nng_socket, const char *, int) NNG_DEPRECATED;
-NNG_DECL int nng_setopt_ms(nng_socket, const char *, nng_duration) NNG_DEPRECATED;
+NNG_DECL int nng_setopt_ms(
+ nng_socket, const char *, nng_duration) NNG_DEPRECATED;
NNG_DECL int nng_setopt_size(nng_socket, const char *, size_t) NNG_DEPRECATED;
-NNG_DECL int nng_setopt_uint64(nng_socket, const char *, uint64_t) NNG_DEPRECATED;
-NNG_DECL int nng_setopt_string(nng_socket, const char *, const char *) NNG_DEPRECATED;
+NNG_DECL int nng_setopt_uint64(
+ nng_socket, const char *, uint64_t) NNG_DEPRECATED;
+NNG_DECL int nng_setopt_string(
+ nng_socket, const char *, const char *) NNG_DEPRECATED;
NNG_DECL int nng_setopt_ptr(nng_socket, const char *, void *) NNG_DEPRECATED;
// Context options. Use nng_ctx_get and nng_ctx_set instead.
-NNG_DECL int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_ctx_getopt(
+ nng_ctx, const char *, void *, size_t *) NNG_DEPRECATED;
NNG_DECL int nng_ctx_getopt_bool(nng_ctx, const char *, bool *) NNG_DEPRECATED;
NNG_DECL int nng_ctx_getopt_int(nng_ctx, const char *, int *) NNG_DEPRECATED;
-NNG_DECL int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *) NNG_DEPRECATED;
-NNG_DECL int nng_ctx_getopt_size(nng_ctx, const char *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED;
+NNG_DECL int nng_ctx_getopt_ms(
+ nng_ctx, const char *, nng_duration *) NNG_DEPRECATED;
+NNG_DECL int nng_ctx_getopt_size(
+ nng_ctx, const char *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_ctx_setopt(
+ nng_ctx, const char *, const void *, size_t) NNG_DEPRECATED;
NNG_DECL int nng_ctx_setopt_bool(nng_ctx, const char *, bool) NNG_DEPRECATED;
NNG_DECL int nng_ctx_setopt_int(nng_ctx, const char *, int) NNG_DEPRECATED;
-NNG_DECL int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration) NNG_DEPRECATED;
+NNG_DECL int nng_ctx_setopt_ms(
+ nng_ctx, const char *, nng_duration) NNG_DEPRECATED;
NNG_DECL int nng_ctx_setopt_size(nng_ctx, const char *, size_t) NNG_DEPRECATED;
// Dialer options. Use nng_dialer_get and nng_dialer_set instead.
-NNG_DECL int nng_dialer_getopt(nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_bool(nng_dialer, const char *, bool *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_int(nng_dialer, const char *, int *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_ms(nng_dialer, const char *, nng_duration *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_size(nng_dialer, const char *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt(
+ nng_dialer, const char *, void *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_bool(
+ nng_dialer, const char *, bool *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_int(
+ nng_dialer, const char *, int *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_ms(
+ nng_dialer, const char *, nng_duration *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_size(
+ nng_dialer, const char *, size_t *) NNG_DEPRECATED;
NNG_DECL int nng_dialer_getopt_sockaddr(
nng_dialer, const char *, nng_sockaddr *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_ptr(nng_dialer, const char *, void **) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_getopt_string(nng_dialer, const char *, char **) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt(nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_bool(nng_dialer, const char *, bool) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_int(nng_dialer, const char *, int) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_ms(nng_dialer, const char *, nng_duration) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_size(nng_dialer, const char *, size_t) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_uint64(nng_dialer, const char *, uint64_t) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_ptr(nng_dialer, const char *, void *) NNG_DEPRECATED;
-NNG_DECL int nng_dialer_setopt_string(nng_dialer, const char *, const char *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_uint64(
+ nng_dialer, const char *, uint64_t *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_ptr(
+ nng_dialer, const char *, void **) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_getopt_string(
+ nng_dialer, const char *, char **) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt(
+ nng_dialer, const char *, const void *, size_t) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_bool(
+ nng_dialer, const char *, bool) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_int(
+ nng_dialer, const char *, int) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_ms(
+ nng_dialer, const char *, nng_duration) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_size(
+ nng_dialer, const char *, size_t) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_uint64(
+ nng_dialer, const char *, uint64_t) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_ptr(
+ nng_dialer, const char *, void *) NNG_DEPRECATED;
+NNG_DECL int nng_dialer_setopt_string(
+ nng_dialer, const char *, const char *) NNG_DEPRECATED;
// Listener options. Use nng_listener_get and nng_listener_set instead.
-NNG_DECL int nng_listener_getopt(nng_listener, const char *, void *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_listener_getopt_bool(nng_listener, const char *, bool *) NNG_DEPRECATED;
-NNG_DECL int nng_listener_getopt_int(nng_listener, const char *, int *) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt(
+ nng_listener, const char *, void *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt_bool(
+ nng_listener, const char *, bool *) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt_int(
+ nng_listener, const char *, int *) NNG_DEPRECATED;
NNG_DECL int nng_listener_getopt_ms(
nng_listener, const char *, nng_duration *) NNG_DEPRECATED;
-NNG_DECL int nng_listener_getopt_size(nng_listener, const char *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt_size(
+ nng_listener, const char *, size_t *) NNG_DEPRECATED;
NNG_DECL int nng_listener_getopt_sockaddr(
nng_listener, const char *, nng_sockaddr *) NNG_DEPRECATED;
NNG_DECL int nng_listener_getopt_uint64(
nng_listener, const char *, uint64_t *) NNG_DEPRECATED;
-NNG_DECL int nng_listener_getopt_ptr(nng_listener, const char *, void **) NNG_DEPRECATED;
-NNG_DECL int nng_listener_getopt_string(nng_listener, const char *, char **) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt_ptr(
+ nng_listener, const char *, void **) NNG_DEPRECATED;
+NNG_DECL int nng_listener_getopt_string(
+ nng_listener, const char *, char **) NNG_DEPRECATED;
NNG_DECL int nng_listener_setopt(
nng_listener, const char *, const void *, size_t) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_bool(nng_listener, const char *, bool) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_int(nng_listener, const char *, int) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_ms(nng_listener, const char *, nng_duration) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_size(nng_listener, const char *, size_t) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_uint64(nng_listener, const char *, uint64_t) NNG_DEPRECATED;
-NNG_DECL int nng_listener_setopt_ptr(nng_listener, const char *, void *) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_bool(
+ nng_listener, const char *, bool) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_int(
+ nng_listener, const char *, int) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_ms(
+ nng_listener, const char *, nng_duration) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_size(
+ nng_listener, const char *, size_t) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_uint64(
+ nng_listener, const char *, uint64_t) NNG_DEPRECATED;
+NNG_DECL int nng_listener_setopt_ptr(
+ nng_listener, const char *, void *) NNG_DEPRECATED;
NNG_DECL int nng_listener_setopt_string(
nng_listener, const char *, const char *) NNG_DEPRECATED;
// Pipe options. Use nng_pipe_get instead.
-NNG_DECL int nng_pipe_getopt(nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_bool(nng_pipe, const char *, bool *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt(
+ nng_pipe, const char *, void *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_bool(
+ nng_pipe, const char *, bool *) NNG_DEPRECATED;
NNG_DECL int nng_pipe_getopt_int(nng_pipe, const char *, int *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_ms(nng_pipe, const char *, nng_duration *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_size(nng_pipe, const char *, size_t *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **) NNG_DEPRECATED;
-NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_ms(
+ nng_pipe, const char *, nng_duration *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_size(
+ nng_pipe, const char *, size_t *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_sockaddr(
+ nng_pipe, const char *, nng_sockaddr *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_uint64(
+ nng_pipe, const char *, uint64_t *) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_ptr(
+ nng_pipe, const char *, void **) NNG_DEPRECATED;
+NNG_DECL int nng_pipe_getopt_string(
+ nng_pipe, const char *, char **) NNG_DEPRECATED;
// nng_closeall closes all open sockets. Do not call this from
// a library; it will affect all sockets.
NNG_DECL void nng_closeall(void) NNG_DEPRECATED;
-#endif
+#endif // NNG_ELIDE_DEPRECATED
+
+// nng_init_parameter is used by applications to change a tunable setting.
+// This function must be called before any other NNG function for the setting
+// to have any effect. This function is also not thread-safe!
+//
+// The list of parameters supported is *not* documented, and subject to change.
+//
+// We try to provide sane defaults, so the use here is intended to provide
+// more control for applications that cannot use compile-time configuration.
+//
+// Applications should not depend on this API for correct operation.
+//
+// This API is intentionally undocumented.
+//
+// Parameter settings are lost after nng_fini() is called.
+typedef int nng_init_parameter;
+NNG_DECL void nng_init_set_parameter(nng_init_parameter, uint64_t);
+
+// The following list of parameters is not part of our API stability promise.
+// In particular the set of parameters that are supported, the default values,
+// the range of valid values, and semantics associated therein are subject to
+// change at any time. We won't go out of our way to break these, and we will
+// try to prevent changes here from breaking working applications, but this is
+// on a best effort basis only.
+//
+// NOTE: When removing a value, please leave the enumeration in place and add
+// a suffix _RETIRED ... this will preserve the binary values for binary compatibility.
+enum {
+ NNG_INIT_PARAMETER_NONE = 0, // ensure values start at 1.
+
+ // Fix the number of threads used for tasks (callbacks),
+ // Default is 2 threads per core, capped to NNG_INIT_MAX_TASK_THREADS.
+ // At least 2 threads will be created in any case.
+ NNG_INIT_NUM_TASK_THREADS,
+
+ // Fix the number of threads used for expiration. Default is one thread per
+ // core, capped to NNG_INIT_MAX_EXPIRE_THREADS. At least one thread will be created.
+ NNG_INIT_NUM_EXPIRE_THREADS,
+
+ // Fix the number of poller threads (used for I/O). Support varies
+ // by platform (many platforms only support a single poller thread.)
+ NNG_INIT_NUM_POLLER_THREADS,
+
+ // Fix the number of threads used for DNS resolution. At least one will be used.
+ // Default is controlled by NNG_RESOLV_CONCURRENCY compile time variable.
+ NNG_INIT_NUM_RESOLVER_THREADS,
+
+ // Limit the number of threads of created for tasks.
+ // NNG will always create at least 2 of these in order to prevent deadlocks.
+ // Zero means no limit. Default is determined by NNG_MAX_TASKQ_THREADS compile time variable.
+ NNG_INIT_MAX_TASK_THREADS,
+
+ // Limit the number of threads created for expiration. Zero means no limit.
+ // Default is determined by the NNG_MAX_EXPIRE_THREADS compile time variable.
+ NNG_INIT_MAX_EXPIRE_THREADS,
+
+ // Limit the number of poller/IO threads created. Zero means no limit.
+ // Default is determined by NNG_MAX_POLLER_THREADS compile time variable.
+ NNG_INIT_MAX_POLLER_THREADS,
+};
#ifdef __cplusplus
}
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
index 9e5a6bec..009d6bb0 100644
--- a/src/core/CMakeLists.txt
+++ b/src/core/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2021 Staysail Systems, Inc. <info@staystail.tech>
+# Copyright 2024 Staysail Systems, Inc. <info@staystail.tech>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
@@ -78,6 +78,7 @@ nng_test(aio_test)
nng_test(buf_size_test)
nng_test(errors_test)
nng_test(id_test)
+nng_test(init_test)
nng_test(list_test)
nng_test(message_test)
nng_test(reconnect_test)
diff --git a/src/core/aio.c b/src/core/aio.c
index 3d4a56c1..084795bd 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -843,18 +843,29 @@ int
nni_aio_sys_init(void)
{
int num_thr;
+ int max_thr;
-#ifndef NNG_NUM_EXPIRE_THREADS
- num_thr = nni_plat_ncpu();
-#else
- num_thr = NNG_NUM_EXPIRE_THREADS;
+#ifndef NNG_MAX_EXPIRE_THREADS
+#define NNG_MAX_EXPIRE_THREADS 8
#endif
-#if NNG_MAX_EXPIRE_THREADS > 0
- if (num_thr > NNG_MAX_EXPIRE_THREADS) {
- num_thr = NNG_MAX_EXPIRE_THREADS;
- }
+
+#ifndef NNG_NUM_EXPIRE_THREADS
+#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu())
#endif
+ max_thr = (int) nni_init_get_param(
+ NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS);
+
+ num_thr = (int) nni_init_get_param(
+ NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS);
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ nni_init_set_effective(NNG_INIT_NUM_EXPIRE_THREADS, num_thr);
nni_aio_expire_q_list =
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
nni_aio_expire_q_cnt = num_thr;
diff --git a/src/core/init.c b/src/core/init.c
index f2195bcb..8f2e1056 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -54,10 +54,107 @@ nni_init(void)
return (nni_plat_init(nni_init_helper));
}
+// accessing the list of parameters
+typedef struct nni_init_param {
+ nni_list_node node;
+ nng_init_parameter param;
+ uint64_t value;
+#ifdef NNG_TEST_LIB
+ uint64_t effective;
+#endif
+} nni_init_param;
+
+static nni_list nni_init_params =
+ NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node);
+
+void
+nni_init_set_param(nng_init_parameter p, uint64_t value)
+{
+ if (nni_inited) {
+ // this is paranoia -- if some library code started already
+ // then we cannot safely change parameters, and modifying the
+ // list is not thread safe.
+ return;
+ }
+ nni_init_param *item;
+ NNI_LIST_FOREACH (&nni_init_params, item) {
+ if (item->param == p) {
+ item->value = value;
+ return;
+ }
+ }
+ if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
+ item->param = p;
+ item->value = value;
+ nni_list_append(&nni_init_params, item);
+ }
+}
+
+uint64_t
+nni_init_get_param(nng_init_parameter p, uint64_t default_value)
+{
+ nni_init_param *item;
+ NNI_LIST_FOREACH (&nni_init_params, item) {
+ if (item->param == p) {
+ return (item->value);
+ }
+ }
+ return (default_value);
+}
+
+void
+nni_init_set_effective(nng_init_parameter p, uint64_t value)
+{
+#ifdef NNG_TEST_LIB
+ nni_init_param *item;
+ NNI_LIST_FOREACH (&nni_init_params, item) {
+ if (item->param == p) {
+ item->effective = value;
+ return;
+ }
+ }
+ if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
+ item->param = p;
+ item->effective = value;
+ nni_list_append(&nni_init_params, item);
+ }
+#else
+ NNI_ARG_UNUSED(p);
+ NNI_ARG_UNUSED(value);
+#endif
+}
+
+#ifdef NNG_TEST_LIB
+uint64_t
+nni_init_get_effective(nng_init_parameter p)
+{
+ nni_init_param *item;
+ NNI_LIST_FOREACH (&nni_init_params, item) {
+ if (item->param == p) {
+ return (item->effective);
+ }
+ }
+ return ((uint64_t)-1);
+}
+#endif
+
+
+static void
+nni_init_params_fini(void)
+{
+ nni_init_param *item;
+ while ((item = nni_list_first(&nni_init_params)) != NULL) {
+ nni_list_remove(&nni_init_params, item);
+ NNI_FREE_STRUCT(item);
+ }
+}
+
void
nni_fini(void)
{
if (!nni_inited) {
+ // make sure we discard parameters even if we didn't startup
+ nni_init_params_fini();
return;
}
nni_sp_tran_sys_fini();
@@ -67,6 +164,7 @@ nni_fini(void)
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_id_map_sys_fini();
+ nni_init_params_fini();
nni_plat_fini();
nni_inited = false;
diff --git a/src/core/init.h b/src/core/init.h
index 4340b15b..d20cf046 100644
--- a/src/core/init.h
+++ b/src/core/init.h
@@ -1,7 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
-// Copyright 2017 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -23,4 +22,15 @@ int nni_init(void);
// that all resources used by the library are released back to the system.
void nni_fini(void);
+// nni_init_param is used by applications (via nng_init_param) to configure
+// some tunable settings at runtime. It must be called before any other NNG
+// functions are called, in order to have any effect at all.
+void nni_init_set_param(nng_init_parameter, uint64_t value);
+
+// subsystems can call this to obtain a parameter value.
+uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value);
+
+// subsystems can set this to facilitate tests (only used in test code)
+void nni_init_set_effective(nng_init_parameter p, uint64_t value);
+
#endif // CORE_INIT_H
diff --git a/src/core/init_test.c b/src/core/init_test.c
new file mode 100644
index 00000000..9b9e26b4
--- /dev/null
+++ b/src/core/init_test.c
@@ -0,0 +1,140 @@
+//
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+uint64_t nni_init_get_param(
+ nng_init_parameter parameter, uint64_t default_value);
+uint64_t nni_init_get_effective(nng_init_parameter p);
+void nni_init_set_effective(nng_init_parameter p, uint64_t);
+
+void
+test_init_param(void)
+{
+ NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456);
+ nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123);
+ NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123);
+ nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 124);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 124);
+ NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123);
+ nng_fini();
+ NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567);
+}
+
+void
+test_set_effective(void)
+{
+ nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 999);
+ NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 0) == 0);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 999);
+ nng_fini();
+}
+
+void
+test_init_zero_resolvers(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_RESOLVER_THREADS, 0);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(
+ nni_init_get_effective(NNG_INIT_NUM_RESOLVER_THREADS) == 1);
+ nng_fini();
+}
+
+void
+test_init_one_task_thread(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 0);
+ nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 1);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 2);
+ nng_fini();
+}
+
+void
+test_init_too_many_task_threads(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 256);
+ nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 4);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 4);
+ nng_fini();
+}
+
+void
+test_init_no_expire_thread(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 0);
+ nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 0);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 1);
+ nng_fini();
+}
+
+void
+test_init_too_many_expire_threads(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 256);
+ nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 2);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 2);
+ nng_fini();
+}
+
+// poller tuning only supported on Windows right now
+#ifdef NNG_PLATFORM_WINDOWS
+void
+test_init_poller_no_threads(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 0);
+ nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 0);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 1);
+ nng_fini();
+}
+
+void
+test_init_too_many_poller_threads(void)
+{
+ nng_socket s;
+ nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 256);
+ nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 2);
+ NUTS_OPEN(s);
+ NUTS_CLOSE(s);
+ NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 2);
+ nng_fini();
+}
+#endif
+
+NUTS_TESTS = {
+ { "init parameter", test_init_param },
+ { "init set effective", test_set_effective },
+ { "init zero resolvers", test_init_zero_resolvers },
+ { "init one task thread", test_init_one_task_thread },
+ { "init too many task threads", test_init_too_many_task_threads },
+ { "init no expire thread", test_init_no_expire_thread },
+ { "init too many expire threads", test_init_too_many_expire_threads },
+#ifdef NNG_PLATFORM_WINDOWS
+ { "init no poller thread", test_init_poller_no_threads },
+ { "init too many poller threads", test_init_too_many_poller_threads },
+#endif
+
+ { NULL, NULL },
+}; \ No newline at end of file
diff --git a/src/core/taskq.c b/src/core/taskq.c
index d914093b..09886596 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -1,5 +1,5 @@
//
-// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -245,20 +245,32 @@ nni_task_fini(nni_task *task)
int
nni_taskq_sys_init(void)
{
- int nthrs;
+ int num_thr;
+ int max_thr;
#ifndef NNG_NUM_TASKQ_THREADS
- nthrs = nni_plat_ncpu() * 2;
-#else
- nthrs = NNG_NUM_TASKQ_THREADS;
+#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2)
#endif
-#if NNG_MAX_TASKQ_THREADS > 0
- if (nthrs > NNG_MAX_TASKQ_THREADS) {
- nthrs = NNG_MAX_TASKQ_THREADS;
- }
+
+#ifndef NNG_MAX_TASKQ_THREADS
+#define NNG_MAX_TASKQ_THREADS 16
#endif
- return (nni_taskq_init(&nni_taskq_systq, nthrs));
+ max_thr = (int) nni_init_get_param(
+ NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS);
+
+ num_thr = (int) nni_init_get_param(
+ NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS);
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 2) {
+ num_thr = 2;
+ }
+ nni_init_set_effective(NNG_INIT_NUM_TASK_THREADS, num_thr);
+
+ return (nni_taskq_init(&nni_taskq_systq, num_thr));
}
void
diff --git a/src/nng.c b/src/nng.c
index ce75d832..965aab86 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -2011,3 +2011,9 @@ nng_version(void)
return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr(
NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX);
}
+
+void
+nng_init_set_parameter(nng_init_parameter p, uint64_t value)
+{
+ nni_init_set_param(p, value);
+} \ No newline at end of file
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index c6abee5f..8eaa29f2 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -25,14 +25,10 @@
// for it to ensure that names can be looked up concurrently. This isn't
// as elegant or scalable as a true asynchronous resolver would be, but
// it has the advantage of being fairly portable, and concurrent enough for
-// the vast, vast majority of use cases. The total thread count can be
+// the vast majority of use cases. The total thread count can be
// changed with this define. Note that some platforms may not have a
// thread-safe getaddrinfo(). In that case they should set this to 1.
-#ifndef NNG_RESOLV_CONCURRENCY
-#define NNG_RESOLV_CONCURRENCY 4
-#endif
-
#ifndef AI_NUMERICSERV
#define AI_NUMERICSERV 0
#endif
@@ -41,7 +37,8 @@ static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
static bool resolv_fini = false;
static nni_list resolv_aios;
-static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY];
+static nni_thr *resolv_thrs;
+static int resolv_num_thr;
typedef struct resolv_item resolv_item;
struct resolv_item {
@@ -450,14 +447,30 @@ nni_posix_resolv_sysinit(void)
resolv_fini = false;
nni_aio_list_init(&resolv_aios);
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+#ifndef NNG_RESOLV_CONCURRENCY
+#define NNG_RESOLV_CONCURRENCY 4
+#endif
+
+ resolv_num_thr = (int) nni_init_get_param(
+ NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY);
+ if (resolv_num_thr < 1) {
+ resolv_num_thr = 1;
+ }
+ // no limit on the maximum for now
+ nni_init_set_effective(NNG_INIT_NUM_RESOLVER_THREADS, resolv_num_thr);
+ resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr);
+ if (resolv_thrs == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ for (int i = 0; i < resolv_num_thr; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) {
nni_posix_resolv_sysfini();
return (rv);
}
}
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+ for (int i = 0; i < resolv_num_thr; i++) {
nni_thr_run(&resolv_thrs[i]);
}
@@ -472,8 +485,11 @@ nni_posix_resolv_sysfini(void)
nni_cv_wake(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
- nni_thr_fini(&resolv_thrs[i]);
+ if (resolv_thrs != NULL) {
+ for (int i = 0; i < resolv_num_thr; i++) {
+ nni_thr_fini(&resolv_thrs[i]);
+ }
+ NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr);
}
}
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index b08f2e4d..1e985130 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -89,26 +89,34 @@ nni_win_io_sysinit(void)
HANDLE h;
int i;
int rv;
- int nthr = nni_plat_ncpu() * 2;
+ int num_thr;
+ int max_thr;
- // Limits on the thread count. This is fairly arbitrary.
- if (nthr < 2) {
- nthr = 2;
- }
#ifndef NNG_MAX_POLLER_THREADS
#define NNG_MAX_POLLER_THREADS 8
#endif
-#if NNG_MAX_POLLER_THREADS > 0
- if (nthr > NNG_MAX_POLLER_THREADS) {
- nthr = NNG_MAX_POLLER_THREADS;
- }
+#ifndef NNG_NUM_POLLER_THREADS
+#define NNG_NUM_POLLER_THREADS (nni_plat_ncpu())
#endif
- if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) {
+ max_thr = (int) nni_init_get_param(
+ NNG_INIT_MAX_POLLER_THREADS, NNG_MAX_POLLER_THREADS);
+
+ num_thr = (int) nni_init_get_param(
+ NNG_INIT_NUM_POLLER_THREADS, NNG_NUM_POLLER_THREADS);
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ nni_init_set_effective(NNG_INIT_NUM_POLLER_THREADS, num_thr);
+ if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, num_thr)) == NULL) {
return (NNG_ENOMEM);
}
- win_io_nthr = nthr;
+ win_io_nthr = num_thr;
- h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nthr);
+ h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, num_thr);
if (h == NULL) {
return (nni_win_error(GetLastError()));
}
@@ -145,7 +153,7 @@ nni_win_io_sysfini(void)
nni_thr_fini(&win_io_thrs[i]);
}
- NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr);
+ NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr);
}
#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 528da451..92c7461f 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -22,23 +22,20 @@
// host file, WINS, or other naming services. As a result, we just build
// our own limited asynchronous resolver with threads.
-#ifndef NNG_RESOLV_CONCURRENCY
-#define NNG_RESOLV_CONCURRENCY 4
-#endif
-
-static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
-static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
+static nni_mtx resolv_mtx = NNI_MTX_INITIALIZER;
+static nni_cv resolv_cv = NNI_CV_INITIALIZER(&resolv_mtx);
static bool resolv_fini = false;
static nni_list resolv_aios;
-static nni_thr resolv_thrs[NNG_RESOLV_CONCURRENCY];
+static nni_thr *resolv_thrs;
+static int resolv_num_thr;
typedef struct resolv_item resolv_item;
struct resolv_item {
int family;
bool passive;
- char * host;
- char * serv;
- nni_aio * aio;
+ char *host;
+ char *serv;
+ nni_aio *aio;
nng_sockaddr *sa;
};
@@ -159,9 +156,9 @@ resolv_task(resolv_item *item)
nni_mtx_lock(&resolv_mtx);
if ((probe != NULL) && (item->aio != NULL)) {
- struct sockaddr_in * sin;
+ struct sockaddr_in *sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa;
+ nni_sockaddr *sa;
sa = item->sa;
@@ -270,7 +267,7 @@ resolv_worker(void *notused)
nni_mtx_lock(&resolv_mtx);
for (;;) {
- nni_aio * aio;
+ nni_aio *aio;
resolv_item *item;
int rv;
@@ -311,9 +308,9 @@ parse_ip(const char *addr, nng_sockaddr *sa, bool want_port)
int rv;
bool v6 = false;
bool wrapped = false;
- char * port;
- char * host;
- char * buf;
+ char *port;
+ char *host;
+ char *buf;
size_t buf_len;
if (addr == NULL) {
@@ -411,7 +408,23 @@ nni_win_resolv_sysinit(void)
nni_aio_list_init(&resolv_aios);
resolv_fini = false;
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+#ifndef NNG_RESOLV_CONCURRENCY
+#define NNG_RESOLV_CONCURRENCY 4
+#endif
+
+ resolv_num_thr = (int) nni_init_get_param(
+ NNG_INIT_NUM_RESOLVER_THREADS, NNG_RESOLV_CONCURRENCY);
+ if (resolv_num_thr < 1) {
+ resolv_num_thr = 1;
+ }
+ // no limit on the maximum for now
+ nni_init_set_effective(NNG_INIT_NUM_RESOLVER_THREADS, resolv_num_thr);
+ resolv_thrs = NNI_ALLOC_STRUCTS(resolv_thrs, resolv_num_thr);
+ if (resolv_thrs == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ for (int i = 0; i < resolv_num_thr; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) {
nni_win_resolv_sysfini();
@@ -419,7 +432,7 @@ nni_win_resolv_sysinit(void)
}
nni_thr_set_name(&resolv_thrs[i], "nng:resolver");
}
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+ for (int i = 0; i < resolv_num_thr; i++) {
nni_thr_run(&resolv_thrs[i]);
}
return (0);
@@ -432,9 +445,10 @@ nni_win_resolv_sysfini(void)
resolv_fini = true;
nni_cv_wake(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+ for (int i = 0; i < resolv_num_thr; i++) {
nni_thr_fini(&resolv_thrs[i]);
}
+ NNI_FREE_STRUCTS(resolv_thrs, resolv_num_thr);
}
#endif // NNG_PLATFORM_WINDOWS