aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-11-09 23:45:21 -0800
committerGarrett D'Amore <garrett@damore.org>2024-11-11 11:03:12 -0800
commit713b80f440cb414cd0b856bde0ea1b31f939777f (patch)
tree1186c42418559c85719023bde3e919aa2df7fcef /src/core
parentcbe9a27ef7485977fbc7c713376b096b6723da3d (diff)
downloadnng-713b80f440cb414cd0b856bde0ea1b31f939777f.tar.gz
nng-713b80f440cb414cd0b856bde0ea1b31f939777f.tar.bz2
nng-713b80f440cb414cd0b856bde0ea1b31f939777f.zip
refactor initialization/finalization
Applications must now call nng_init(), but they can supply a set of parameters optionally. The code is now safe for multiple libraries to do this concurrently, meaning nng_fini no longer can race against another instance starting up. The nni_init checks on all public APIs are removed now.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c23
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/dialer.c5
-rw-r--r--src/core/init.c210
-rw-r--r--src/core/init.h21
-rw-r--r--src/core/init_test.c135
-rw-r--r--src/core/listener.c5
-rw-r--r--src/core/platform.h21
-rw-r--r--src/core/socket.c13
-rw-r--r--src/core/stats.c4
-rw-r--r--src/core/stream.c19
-rw-r--r--src/core/taskq.c26
-rw-r--r--src/core/taskq.h5
-rw-r--r--src/core/tcp.c6
14 files changed, 191 insertions, 304 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 1c7e41d1..e1e9c42f 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -823,24 +823,13 @@ nni_aio_sys_fini(void)
}
int
-nni_aio_sys_init(void)
+nni_aio_sys_init(nng_init_params *params)
{
- int num_thr;
- int max_thr;
+ int16_t num_thr;
+ int16_t max_thr;
-#ifndef NNG_MAX_EXPIRE_THREADS
-#define NNG_MAX_EXPIRE_THREADS 8
-#endif
-
-#ifndef NNG_NUM_EXPIRE_THREADS
-#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu())
-#endif
-
- max_thr = (int) nni_init_get_param(
- NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS);
-
- num_thr = (int) nni_init_get_param(
- NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS);
+ max_thr = params->max_expire_threads;
+ num_thr = params->num_expire_threads;
if ((max_thr > 0) && (num_thr > max_thr)) {
num_thr = max_thr;
@@ -848,7 +837,7 @@ nni_aio_sys_init(void)
if (num_thr < 1) {
num_thr = 1;
}
- nni_init_set_effective(NNG_INIT_NUM_EXPIRE_THREADS, num_thr);
+ params->num_expire_threads = num_thr;
nni_aio_expire_q_list =
nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr);
nni_aio_expire_q_cnt = num_thr;
diff --git a/src/core/aio.h b/src/core/aio.h
index cae8610f..50cb266b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -192,7 +192,7 @@ extern void nni_aio_completions_run(nni_aio_completions *);
extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);
-extern int nni_aio_sys_init(void);
+extern int nni_aio_sys_init(nng_init_params *);
extern void nni_aio_sys_fini(void);
typedef struct nni_aio_expire_q nni_aio_expire_q;
diff --git a/src/core/dialer.c b/src/core/dialer.c
index 0ee2d361..907da99d 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -285,13 +285,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
int
nni_dialer_find(nni_dialer **dp, uint32_t id)
{
- int rv;
nni_dialer *d;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
nni_mtx_lock(&dialers_lk);
if ((d = nni_id_get(&dialers, id)) != NULL) {
d->d_ref++;
diff --git a/src/core/init.c b/src/core/init.c
index c7a660b0..6672beb1 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -9,6 +9,8 @@
//
#include "core/nng_impl.h"
+#include "core/platform.h"
+#include "core/socket.h"
#include "nng/nng.h"
#include <stdbool.h>
@@ -18,152 +20,120 @@
extern int nni_tls_sys_init(void);
extern void nni_tls_sys_fini(void);
-static bool nni_inited = false;
+#ifndef NNG_NUM_EXPIRE_THREADS
+#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu())
+#endif
-static int
-nni_init_helper(void)
-{
- int rv;
+#ifndef NNG_NUM_TASKQ_THREADS
+#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2)
+#endif
-#ifdef NNG_TEST_LIB
- static bool cleanup = false;
- if (!cleanup) {
- atexit(nng_fini);
- cleanup = true;
- }
+#ifndef NNG_NUM_POLLER_THREADS
+#define NNG_NUM_POLLER_THREADS (nni_plat_ncpu())
#endif
- if (((rv = nni_taskq_sys_init()) != 0) ||
- ((rv = nni_reap_sys_init()) != 0) ||
- ((rv = nni_aio_sys_init()) != 0) ||
- ((rv = nni_tls_sys_init()) != 0)) {
- nni_fini();
- return (rv);
- }
+#ifndef NNG_RESOLV_CONCURRENCY
+#define NNG_RESOLV_CONCURRENCY 4
+#endif
- // following never fail
- nni_sp_tran_sys_init();
+#ifndef NNG_MAX_TASKQ_THREADS
+#define NNG_MAX_TASKQ_THREADS 16
+#endif
- nni_inited = true;
- nng_log_notice(
- "NNG-INIT", "NNG library version %s initialized", nng_version());
+#ifndef NNG_MAX_EXPIRE_THREADS
+#define NNG_MAX_EXPIRE_THREADS 8
+#endif
- return (0);
-}
+static nng_init_params init_params;
+
+unsigned int init_count;
+nni_atomic_flag init_busy;
int
-nni_init(void)
+nng_init(nng_init_params *params)
{
- int rv;
- if ((rv = nni_plat_init(nni_init_helper)) != 0) {
- nng_log_err("NNG-INIT",
- "NNG library initialization failed: %s", nng_strerror(rv));
- }
- return (rv);
-}
-
-// accessing the list of parameters
-typedef struct nni_init_param {
- nni_list_node node;
- nng_init_parameter param;
- uint64_t value;
-#ifdef NNG_TEST_LIB
- uint64_t effective;
-#endif
-} nni_init_param;
+ nng_init_params zero = { 0 };
+ int rv;
-static nni_list nni_init_params =
- NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node);
-
-void
-nni_init_set_param(nng_init_parameter p, uint64_t value)
-{
- if (nni_inited) {
- // this is paranoia -- if some library code started already
- // then we cannot safely change parameters, and modifying the
- // list is not thread safe.
- return;
+ // cheap spin lock
+ while (nni_atomic_flag_test_and_set(&init_busy)) {
+ continue;
}
- nni_init_param *item;
- NNI_LIST_FOREACH (&nni_init_params, item) {
- if (item->param == p) {
- item->value = value;
- return;
+ if (init_count > 0) {
+ if (params != NULL) {
+ nni_atomic_flag_reset(&init_busy);
+ return (NNG_EBUSY);
}
+ init_count++;
+ nni_atomic_flag_reset(&init_busy);
+ return (0);
}
- if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
- item->param = p;
- item->value = value;
- nni_list_append(&nni_init_params, item);
+ if (params == NULL) {
+ params = &zero;
}
-}
-
-uint64_t
-nni_init_get_param(nng_init_parameter p, uint64_t default_value)
-{
- nni_init_param *item;
- NNI_LIST_FOREACH (&nni_init_params, item) {
- if (item->param == p) {
- return (item->value);
- }
+ init_params.num_task_threads = params->num_task_threads
+ ? params->num_task_threads
+ : NNG_NUM_TASKQ_THREADS;
+ init_params.max_task_threads = params->max_task_threads
+ ? params->max_task_threads
+ : NNG_MAX_TASKQ_THREADS;
+ init_params.num_expire_threads = params->num_expire_threads
+ ? params->num_expire_threads
+ : NNG_NUM_EXPIRE_THREADS;
+ init_params.max_expire_threads = params->max_expire_threads
+ ? params->max_expire_threads
+ : NNG_MAX_EXPIRE_THREADS;
+ init_params.num_poller_threads = params->num_poller_threads
+ ? params->num_poller_threads
+ : NNG_NUM_POLLER_THREADS;
+ init_params.max_poller_threads = params->max_poller_threads
+ ? params->max_poller_threads
+ : NNG_MAX_POLLER_THREADS;
+ init_params.num_resolver_threads = params->num_resolver_threads
+ ? params->num_resolver_threads
+ : NNG_RESOLV_CONCURRENCY;
+
+ if (((rv = nni_plat_init(&init_params)) != 0) ||
+ ((rv = nni_taskq_sys_init(&init_params)) != 0) ||
+ ((rv = nni_reap_sys_init()) != 0) ||
+ ((rv = nni_aio_sys_init(&init_params)) != 0) ||
+ ((rv = nni_tls_sys_init()) != 0)) {
+ nni_atomic_flag_reset(&init_busy);
+ nng_fini();
+ return (rv);
}
- return (default_value);
-}
-void
-nni_init_set_effective(nng_init_parameter p, uint64_t value)
-{
-#ifdef NNG_TEST_LIB
- nni_init_param *item;
- NNI_LIST_FOREACH (&nni_init_params, item) {
- if (item->param == p) {
- item->effective = value;
- return;
- }
- }
- if ((item = NNI_ALLOC_STRUCT(item)) != NULL) {
- item->param = p;
- item->effective = value;
- nni_list_append(&nni_init_params, item);
- }
-#else
- NNI_ARG_UNUSED(p);
- NNI_ARG_UNUSED(value);
-#endif
+ // following never fails
+ nni_sp_tran_sys_init();
+
+ nng_log_notice(
+ "NNG-INIT", "NNG library version %s initialized", nng_version());
+ init_count++;
+ nni_atomic_flag_reset(&init_busy);
+ return (rv);
}
+// Undocumented, for test code only
#ifdef NNG_TEST_LIB
-uint64_t
-nni_init_get_effective(nng_init_parameter p)
+nng_init_params *
+nng_init_get_params(void)
{
- nni_init_param *item;
- NNI_LIST_FOREACH (&nni_init_params, item) {
- if (item->param == p) {
- return (item->effective);
- }
- }
- return ((uint64_t) -1);
+ return &init_params;
}
#endif
-static void
-nni_init_params_fini(void)
-{
- nni_init_param *item;
- while ((item = nni_list_first(&nni_init_params)) != NULL) {
- nni_list_remove(&nni_init_params, item);
- NNI_FREE_STRUCT(item);
- }
-}
-
void
-nni_fini(void)
+nng_fini(void)
{
- if (!nni_inited) {
- // make sure we discard parameters even if we didn't startup
- nni_init_params_fini();
+ while (nni_atomic_flag_test_and_set(&init_busy)) {
+ continue;
+ }
+ init_count--;
+ if (init_count > 0) {
+ nni_atomic_flag_reset(&init_busy);
return;
}
+ nni_sock_closeall();
nni_sp_tran_sys_fini();
nni_tls_sys_fini();
nni_reap_drain();
@@ -171,8 +141,6 @@ nni_fini(void)
nni_taskq_sys_fini();
nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_id_map_sys_fini();
- nni_init_params_fini();
-
nni_plat_fini();
- nni_inited = false;
+ nni_atomic_flag_reset(&init_busy);
}
diff --git a/src/core/init.h b/src/core/init.h
index d20cf046..2826870b 100644
--- a/src/core/init.h
+++ b/src/core/init.h
@@ -11,26 +11,9 @@
#ifndef CORE_INIT_H
#define CORE_INIT_H
-#include "core/nng_impl.h"
-
-// nni_init is called each time the user enters the library. It ensures that
-// the library is initialized properly, and also deals with checks such as
-// whether the process has forked since last initialization.
-int nni_init(void);
-
-// nni_fini tears everything down. In the future it may be used to ensure
-// that all resources used by the library are released back to the system.
-void nni_fini(void);
-
-// nni_init_param is used by applications (via nng_init_param) to configure
-// some tunable settings at runtime. It must be called before any other NNG
-// functions are called, in order to have any effect at all.
-void nni_init_set_param(nng_init_parameter, uint64_t value);
+#include "nng/nng.h"
// subsystems can call this to obtain a parameter value.
-uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value);
-
-// subsystems can set this to facilitate tests (only used in test code)
-void nni_init_set_effective(nng_init_parameter p, uint64_t value);
+nng_init_params *nni_init_get_params(void);
#endif // CORE_INIT_H
diff --git a/src/core/init_test.c b/src/core/init_test.c
index 9b9e26b4..0ef13543 100644
--- a/src/core/init_test.c
+++ b/src/core/init_test.c
@@ -7,93 +7,97 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "nng/nng.h"
#include <nuts.h>
-uint64_t nni_init_get_param(
- nng_init_parameter parameter, uint64_t default_value);
-uint64_t nni_init_get_effective(nng_init_parameter p);
-void nni_init_set_effective(nng_init_parameter p, uint64_t);
+nng_init_params *nng_init_get_params(void);
void
test_init_param(void)
{
- NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456);
- nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123);
- NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123);
- nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 124);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 124);
- NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123);
- nng_fini();
- NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567);
-}
-
-void
-test_set_effective(void)
-{
- nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 999);
- NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 0) == 0);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 999);
- nng_fini();
+ nng_init_params *p;
+ p = nng_init_get_params();
+ NUTS_ASSERT(p != NULL);
}
void
test_init_zero_resolvers(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_RESOLVER_THREADS, 0);
- NUTS_OPEN(s);
- NUTS_CLOSE(s);
- NUTS_ASSERT(
- nni_init_get_effective(NNG_INIT_NUM_RESOLVER_THREADS) == 1);
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+ p.num_resolver_threads = 0;
+ nng_fini();
+ NUTS_PASS(nng_init(&p));
+ pp = nng_init_get_params();
+ NUTS_ASSERT(pp->num_resolver_threads > 0);
nng_fini();
}
void
test_init_one_task_thread(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 0);
- nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 1);
- NUTS_OPEN(s);
- NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 2);
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
nng_fini();
+ p.max_task_threads = 1;
+ NUTS_PASS(nng_init(&p));
+ pp = nng_init_get_params();
+ NUTS_ASSERT(pp->max_task_threads == 1);
}
void
test_init_too_many_task_threads(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 256);
- nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 4);
+ nng_socket s;
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
+ p.num_task_threads = 256;
+ p.max_task_threads = 4;
+
+ nng_fini();
+ NUTS_PASS(nng_init(&p));
NUTS_OPEN(s);
NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 4);
- nng_fini();
+ pp = nng_init_get_params();
+ NUTS_TRUE(pp->num_task_threads == 4);
}
void
test_init_no_expire_thread(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 0);
- nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 0);
+ nng_socket s;
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
+ nng_fini();
+ p.num_expire_threads = 0;
+ p.max_expire_threads = 0;
+ NUTS_PASS(nng_init(&p));
NUTS_OPEN(s);
NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 1);
- nng_fini();
+ pp = nng_init_get_params();
+ NUTS_TRUE(pp->num_expire_threads > 0);
+ NUTS_MSG("Got %d expire threads", pp->num_expire_threads);
}
void
test_init_too_many_expire_threads(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 256);
- nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 2);
+ nng_socket s;
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
+ nng_fini();
+ p.num_expire_threads = 256;
+ p.max_expire_threads = 2;
+ NUTS_PASS(nng_init(&p));
NUTS_OPEN(s);
NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 2);
- nng_fini();
+ pp = nng_init_get_params();
+ NUTS_TRUE(pp->num_expire_threads == 2);
+ NUTS_MSG("Got %d expire threads", pp->num_expire_threads);
}
// poller tuning only supported on Windows right now
@@ -101,31 +105,42 @@ test_init_too_many_expire_threads(void)
void
test_init_poller_no_threads(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 0);
- nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 0);
+ nng_socket s;
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
+ nng_fini();
+ p.num_poller_threads = 0;
+ p.max_poller_threads = 0;
+ NUTS_PASS(nng_init(&p));
NUTS_OPEN(s);
NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 1);
- nng_fini();
+ pp = nng_init_get_params();
+ NUTS_TRUE(pp->num_poller_threads > 0);
+ NUTS_MSG("Got %d poller threads", pp->num_expire_threads);
}
void
test_init_too_many_poller_threads(void)
{
- nng_socket s;
- nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 256);
- nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 2);
+ nng_socket s;
+ nng_init_params *pp;
+ nng_init_params p = { 0 };
+
+ nng_fini();
+ p.num_poller_threads = 256;
+ p.max_poller_threads = 2;
+ NUTS_PASS(nng_init(&p));
NUTS_OPEN(s);
NUTS_CLOSE(s);
- NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 2);
- nng_fini();
+ pp = nng_init_get_params();
+ NUTS_TRUE(pp->num_poller_threads == 2);
+ NUTS_MSG("Got %d poller threads", pp->num_expire_threads);
}
#endif
NUTS_TESTS = {
{ "init parameter", test_init_param },
- { "init set effective", test_set_effective },
{ "init zero resolvers", test_init_zero_resolvers },
{ "init one task thread", test_init_one_task_thread },
{ "init too many task threads", test_init_too_many_task_threads },
@@ -137,4 +152,4 @@ NUTS_TESTS = {
#endif
{ NULL, NULL },
-}; \ No newline at end of file
+};
diff --git a/src/core/listener.c b/src/core/listener.c
index 38a7d323..88729f56 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -272,13 +272,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
int
nni_listener_find(nni_listener **lp, uint32_t id)
{
- int rv;
nni_listener *l;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
nni_mtx_lock(&listeners_lk);
if ((l = nni_id_get(&listeners, id)) != NULL) {
l->l_ref++;
diff --git a/src/core/platform.h b/src/core/platform.h
index d7a88238..62a321bf 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -47,11 +47,6 @@
// for abnormal programs on the platform, such as calling abort().
extern void nni_plat_abort(void);
-// nni_plat_println is used to emit debug messages. Typically this is used
-// during core debugging, or to emit panic messages. Message content will
-// not contain newlines, but the output will add them.
-extern void nni_plat_println(const char *);
-
// nni_plat_printf is like printf. It should conform to C99 standard printf,
// but is a function to allow platform ports to redirect. It should go to
// the same place that nni_plat_println does.
@@ -263,18 +258,10 @@ extern void nni_msleep(nni_duration);
uint32_t nni_random(void);
// nni_plat_init is called to allow the platform the chance to
-// do any necessary initialization. This routine MUST be idempotent,
-// and thread-safe, and will be called before any other API calls, and
-// may be called at any point thereafter. It is permitted to return
-// an error if some critical failure initializing the platform occurs,
-// but once this succeeds, all future calls must succeed as well, unless
-// nni_plat_fini has been called.
-//
-// The function argument should be called if the platform has not initialized
-// (i.e. exactly once), and its result passed back to the caller. If it
-// does not return 0 (success), then it may be called again to try to
-// initialize the platform again at a later date.
-extern int nni_plat_init(int (*)(void));
+// do any necessary initialization. This will be called before any other API
+// calls. It is permitted to return an error if some critical failure
+// initializing the platform occurs.
+extern int nni_plat_init(nng_init_params *);
// nni_plat_fini is called to clean up resources. It is intended to
// be called as the last thing executed in the library, and no other functions
diff --git a/src/core/socket.c b/src/core/socket.c
index 2b94e573..c92a1c3b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -364,12 +364,9 @@ nni_sock_recvq(nni_sock *s)
int
nni_sock_find(nni_sock **sockp, uint32_t id)
{
- int rv;
+ int rv = 0;
nni_sock *s;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
nni_mtx_lock(&sock_lk);
if ((s = nni_id_get(&sock_ids, id)) != NULL) {
if (s->s_closed) {
@@ -621,8 +618,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
return (NNG_ENOTSUP);
}
- if (((rv = nni_init()) != 0) ||
- ((rv = nni_sock_create(&s, proto)) != 0)) {
+ if ((rv = nni_sock_create(&s, proto)) != 0) {
return (rv);
}
@@ -1076,12 +1072,9 @@ nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
int
nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
{
- int rv;
+ int rv = 0;
nni_ctx *ctx;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
nni_mtx_lock(&sock_lk);
if ((ctx = nni_id_get(&ctx_ids, id)) != NULL) {
// We refuse a reference if either the socket is
diff --git a/src/core/stats.c b/src/core/stats.c
index c049d611..f01f642e 100644
--- a/src/core/stats.c
+++ b/src/core/stats.c
@@ -385,10 +385,6 @@ int
nng_stats_get(nng_stat **statp)
{
#ifdef NNG_ENABLE_STATS
- int rv;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
return (nni_stat_snapshot(statp, &stats_root));
#else
NNI_ARG_UNUSED(statp);
diff --git a/src/core/stream.c b/src/core/stream.c
index 78029ddc..d900329a 100644
--- a/src/core/stream.c
+++ b/src/core/stream.c
@@ -169,12 +169,6 @@ nng_stream_dialer_dial(nng_stream_dialer *d, nng_aio *aio)
int
nng_stream_dialer_alloc_url(nng_stream_dialer **dp, const nng_url *url)
{
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
for (int i = 0; stream_drivers[i].scheme != NULL; i++) {
if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) {
return (stream_drivers[i].dialer_alloc(dp, url));
@@ -189,9 +183,6 @@ nng_stream_dialer_alloc(nng_stream_dialer **dp, const char *uri)
nng_url *url;
int rv;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
if ((rv = nng_url_parse(&url, uri)) != 0) {
return (rv);
}
@@ -291,12 +282,6 @@ nni_stream_listener_set_tls(nng_stream_listener *l, nng_tls_config *cfg)
int
nng_stream_listener_alloc_url(nng_stream_listener **lp, const nng_url *url)
{
- int rv;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
for (int i = 0; stream_drivers[i].scheme != NULL; i++) {
if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) {
return (stream_drivers[i].listener_alloc(lp, url));
@@ -311,10 +296,6 @@ nng_stream_listener_alloc(nng_stream_listener **lp, const char *uri)
nng_url *url;
int rv;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
if ((rv = nng_url_parse(&url, uri)) != 0) {
return (rv);
}
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 09886596..496c2fab 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "nng/nng.h"
typedef struct nni_taskq_thr nni_taskq_thr;
struct nni_taskq_thr {
@@ -243,24 +244,13 @@ nni_task_fini(nni_task *task)
}
int
-nni_taskq_sys_init(void)
+nni_taskq_sys_init(nng_init_params *params)
{
- int num_thr;
- int max_thr;
+ int16_t num_thr;
+ int16_t max_thr;
-#ifndef NNG_NUM_TASKQ_THREADS
-#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2)
-#endif
-
-#ifndef NNG_MAX_TASKQ_THREADS
-#define NNG_MAX_TASKQ_THREADS 16
-#endif
-
- max_thr = (int) nni_init_get_param(
- NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS);
-
- num_thr = (int) nni_init_get_param(
- NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS);
+ max_thr = params->max_task_threads;
+ num_thr = params->num_task_threads;
if ((max_thr > 0) && (num_thr > max_thr)) {
num_thr = max_thr;
@@ -268,9 +258,9 @@ nni_taskq_sys_init(void)
if (num_thr < 2) {
num_thr = 2;
}
- nni_init_set_effective(NNG_INIT_NUM_TASK_THREADS, num_thr);
+ params->num_task_threads = num_thr;
- return (nni_taskq_init(&nni_taskq_systq, num_thr));
+ return (nni_taskq_init(&nni_taskq_systq, (int) num_thr));
}
void
diff --git a/src/core/taskq.h b/src/core/taskq.h
index ccc54f61..498b4f37 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -1,5 +1,5 @@
//
-// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -14,6 +14,7 @@
#include "core/defs.h"
#include "core/list.h"
#include "core/platform.h"
+#include "nng/nng.h"
typedef struct nni_taskq nni_taskq;
typedef struct nni_task nni_task;
@@ -60,7 +61,7 @@ extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *);
// it reschedules the task.)
extern void nni_task_fini(nni_task *);
-extern int nni_taskq_sys_init(void);
+extern int nni_taskq_sys_init(nng_init_params *);
extern void nni_taskq_sys_fini(void);
// nni_task implementation details are not to be used except by the
diff --git a/src/core/tcp.c b/src/core/tcp.c
index e54cdee7..7fb67228 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -369,9 +369,6 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
tcp_listener *l;
int rv;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
}
@@ -398,9 +395,6 @@ nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url)
int rv;
nng_sockaddr sa;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
if ((rv = nni_url_to_address(&sa, url)) != 0) {
return (rv);
}