diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 23 | ||||
| -rw-r--r-- | src/core/aio.h | 2 | ||||
| -rw-r--r-- | src/core/dialer.c | 5 | ||||
| -rw-r--r-- | src/core/init.c | 210 | ||||
| -rw-r--r-- | src/core/init.h | 21 | ||||
| -rw-r--r-- | src/core/init_test.c | 135 | ||||
| -rw-r--r-- | src/core/listener.c | 5 | ||||
| -rw-r--r-- | src/core/platform.h | 21 | ||||
| -rw-r--r-- | src/core/socket.c | 13 | ||||
| -rw-r--r-- | src/core/stats.c | 4 | ||||
| -rw-r--r-- | src/core/stream.c | 19 | ||||
| -rw-r--r-- | src/core/taskq.c | 26 | ||||
| -rw-r--r-- | src/core/taskq.h | 5 | ||||
| -rw-r--r-- | src/core/tcp.c | 6 |
14 files changed, 191 insertions, 304 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 1c7e41d1..e1e9c42f 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -823,24 +823,13 @@ nni_aio_sys_fini(void) } int -nni_aio_sys_init(void) +nni_aio_sys_init(nng_init_params *params) { - int num_thr; - int max_thr; + int16_t num_thr; + int16_t max_thr; -#ifndef NNG_MAX_EXPIRE_THREADS -#define NNG_MAX_EXPIRE_THREADS 8 -#endif - -#ifndef NNG_NUM_EXPIRE_THREADS -#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu()) -#endif - - max_thr = (int) nni_init_get_param( - NNG_INIT_MAX_EXPIRE_THREADS, NNG_MAX_EXPIRE_THREADS); - - num_thr = (int) nni_init_get_param( - NNG_INIT_NUM_EXPIRE_THREADS, NNG_NUM_EXPIRE_THREADS); + max_thr = params->max_expire_threads; + num_thr = params->num_expire_threads; if ((max_thr > 0) && (num_thr > max_thr)) { num_thr = max_thr; @@ -848,7 +837,7 @@ nni_aio_sys_init(void) if (num_thr < 1) { num_thr = 1; } - nni_init_set_effective(NNG_INIT_NUM_EXPIRE_THREADS, num_thr); + params->num_expire_threads = num_thr; nni_aio_expire_q_list = nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); nni_aio_expire_q_cnt = num_thr; diff --git a/src/core/aio.h b/src/core/aio.h index cae8610f..50cb266b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -192,7 +192,7 @@ extern void nni_aio_completions_run(nni_aio_completions *); extern void nni_aio_completions_add( nni_aio_completions *, nni_aio *, int, size_t); -extern int nni_aio_sys_init(void); +extern int nni_aio_sys_init(nng_init_params *); extern void nni_aio_sys_fini(void); typedef struct nni_aio_expire_q nni_aio_expire_q; diff --git a/src/core/dialer.c b/src/core/dialer.c index 0ee2d361..907da99d 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -285,13 +285,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str) int nni_dialer_find(nni_dialer **dp, uint32_t id) { - int rv; nni_dialer *d; - if ((rv = nni_init()) != 0) { - return (rv); - } - nni_mtx_lock(&dialers_lk); if ((d = nni_id_get(&dialers, id)) != NULL) { d->d_ref++; diff --git a/src/core/init.c b/src/core/init.c index c7a660b0..6672beb1 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -9,6 +9,8 @@ // #include "core/nng_impl.h" +#include "core/platform.h" +#include "core/socket.h" #include "nng/nng.h" #include <stdbool.h> @@ -18,152 +20,120 @@ extern int nni_tls_sys_init(void); extern void nni_tls_sys_fini(void); -static bool nni_inited = false; +#ifndef NNG_NUM_EXPIRE_THREADS +#define NNG_NUM_EXPIRE_THREADS (nni_plat_ncpu()) +#endif -static int -nni_init_helper(void) -{ - int rv; +#ifndef NNG_NUM_TASKQ_THREADS +#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2) +#endif -#ifdef NNG_TEST_LIB - static bool cleanup = false; - if (!cleanup) { - atexit(nng_fini); - cleanup = true; - } +#ifndef NNG_NUM_POLLER_THREADS +#define NNG_NUM_POLLER_THREADS (nni_plat_ncpu()) #endif - if (((rv = nni_taskq_sys_init()) != 0) || - ((rv = nni_reap_sys_init()) != 0) || - ((rv = nni_aio_sys_init()) != 0) || - ((rv = nni_tls_sys_init()) != 0)) { - nni_fini(); - return (rv); - } +#ifndef NNG_RESOLV_CONCURRENCY +#define NNG_RESOLV_CONCURRENCY 4 +#endif - // following never fail - nni_sp_tran_sys_init(); +#ifndef NNG_MAX_TASKQ_THREADS +#define NNG_MAX_TASKQ_THREADS 16 +#endif - nni_inited = true; - nng_log_notice( - "NNG-INIT", "NNG library version %s initialized", nng_version()); +#ifndef NNG_MAX_EXPIRE_THREADS +#define NNG_MAX_EXPIRE_THREADS 8 +#endif - return (0); -} +static nng_init_params init_params; + +unsigned int init_count; +nni_atomic_flag init_busy; int -nni_init(void) +nng_init(nng_init_params *params) { - int rv; - if ((rv = nni_plat_init(nni_init_helper)) != 0) { - nng_log_err("NNG-INIT", - "NNG library initialization failed: %s", nng_strerror(rv)); - } - return (rv); -} - -// accessing the list of parameters -typedef struct nni_init_param { - nni_list_node node; - nng_init_parameter param; - uint64_t value; -#ifdef NNG_TEST_LIB - uint64_t effective; -#endif -} nni_init_param; + nng_init_params zero = { 0 }; + int rv; -static nni_list nni_init_params = - NNI_LIST_INITIALIZER(nni_init_params, nni_init_param, node); - -void -nni_init_set_param(nng_init_parameter p, uint64_t value) -{ - if (nni_inited) { - // this is paranoia -- if some library code started already - // then we cannot safely change parameters, and modifying the - // list is not thread safe. - return; + // cheap spin lock + while (nni_atomic_flag_test_and_set(&init_busy)) { + continue; } - nni_init_param *item; - NNI_LIST_FOREACH (&nni_init_params, item) { - if (item->param == p) { - item->value = value; - return; + if (init_count > 0) { + if (params != NULL) { + nni_atomic_flag_reset(&init_busy); + return (NNG_EBUSY); } + init_count++; + nni_atomic_flag_reset(&init_busy); + return (0); } - if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { - item->param = p; - item->value = value; - nni_list_append(&nni_init_params, item); + if (params == NULL) { + params = &zero; } -} - -uint64_t -nni_init_get_param(nng_init_parameter p, uint64_t default_value) -{ - nni_init_param *item; - NNI_LIST_FOREACH (&nni_init_params, item) { - if (item->param == p) { - return (item->value); - } + init_params.num_task_threads = params->num_task_threads + ? params->num_task_threads + : NNG_NUM_TASKQ_THREADS; + init_params.max_task_threads = params->max_task_threads + ? params->max_task_threads + : NNG_MAX_TASKQ_THREADS; + init_params.num_expire_threads = params->num_expire_threads + ? params->num_expire_threads + : NNG_NUM_EXPIRE_THREADS; + init_params.max_expire_threads = params->max_expire_threads + ? params->max_expire_threads + : NNG_MAX_EXPIRE_THREADS; + init_params.num_poller_threads = params->num_poller_threads + ? params->num_poller_threads + : NNG_NUM_POLLER_THREADS; + init_params.max_poller_threads = params->max_poller_threads + ? params->max_poller_threads + : NNG_MAX_POLLER_THREADS; + init_params.num_resolver_threads = params->num_resolver_threads + ? params->num_resolver_threads + : NNG_RESOLV_CONCURRENCY; + + if (((rv = nni_plat_init(&init_params)) != 0) || + ((rv = nni_taskq_sys_init(&init_params)) != 0) || + ((rv = nni_reap_sys_init()) != 0) || + ((rv = nni_aio_sys_init(&init_params)) != 0) || + ((rv = nni_tls_sys_init()) != 0)) { + nni_atomic_flag_reset(&init_busy); + nng_fini(); + return (rv); } - return (default_value); -} -void -nni_init_set_effective(nng_init_parameter p, uint64_t value) -{ -#ifdef NNG_TEST_LIB - nni_init_param *item; - NNI_LIST_FOREACH (&nni_init_params, item) { - if (item->param == p) { - item->effective = value; - return; - } - } - if ((item = NNI_ALLOC_STRUCT(item)) != NULL) { - item->param = p; - item->effective = value; - nni_list_append(&nni_init_params, item); - } -#else - NNI_ARG_UNUSED(p); - NNI_ARG_UNUSED(value); -#endif + // following never fails + nni_sp_tran_sys_init(); + + nng_log_notice( + "NNG-INIT", "NNG library version %s initialized", nng_version()); + init_count++; + nni_atomic_flag_reset(&init_busy); + return (rv); } +// Undocumented, for test code only #ifdef NNG_TEST_LIB -uint64_t -nni_init_get_effective(nng_init_parameter p) +nng_init_params * +nng_init_get_params(void) { - nni_init_param *item; - NNI_LIST_FOREACH (&nni_init_params, item) { - if (item->param == p) { - return (item->effective); - } - } - return ((uint64_t) -1); + return &init_params; } #endif -static void -nni_init_params_fini(void) -{ - nni_init_param *item; - while ((item = nni_list_first(&nni_init_params)) != NULL) { - nni_list_remove(&nni_init_params, item); - NNI_FREE_STRUCT(item); - } -} - void -nni_fini(void) +nng_fini(void) { - if (!nni_inited) { - // make sure we discard parameters even if we didn't startup - nni_init_params_fini(); + while (nni_atomic_flag_test_and_set(&init_busy)) { + continue; + } + init_count--; + if (init_count > 0) { + nni_atomic_flag_reset(&init_busy); return; } + nni_sock_closeall(); nni_sp_tran_sys_fini(); nni_tls_sys_fini(); nni_reap_drain(); @@ -171,8 +141,6 @@ nni_fini(void) nni_taskq_sys_fini(); nni_reap_sys_fini(); // must be before timer and aio (expire) nni_id_map_sys_fini(); - nni_init_params_fini(); - nni_plat_fini(); - nni_inited = false; + nni_atomic_flag_reset(&init_busy); } diff --git a/src/core/init.h b/src/core/init.h index d20cf046..2826870b 100644 --- a/src/core/init.h +++ b/src/core/init.h @@ -11,26 +11,9 @@ #ifndef CORE_INIT_H #define CORE_INIT_H -#include "core/nng_impl.h" - -// nni_init is called each time the user enters the library. It ensures that -// the library is initialized properly, and also deals with checks such as -// whether the process has forked since last initialization. -int nni_init(void); - -// nni_fini tears everything down. In the future it may be used to ensure -// that all resources used by the library are released back to the system. -void nni_fini(void); - -// nni_init_param is used by applications (via nng_init_param) to configure -// some tunable settings at runtime. It must be called before any other NNG -// functions are called, in order to have any effect at all. -void nni_init_set_param(nng_init_parameter, uint64_t value); +#include "nng/nng.h" // subsystems can call this to obtain a parameter value. -uint64_t nni_init_get_param(nng_init_parameter parameter, uint64_t default_value); - -// subsystems can set this to facilitate tests (only used in test code) -void nni_init_set_effective(nng_init_parameter p, uint64_t value); +nng_init_params *nni_init_get_params(void); #endif // CORE_INIT_H diff --git a/src/core/init_test.c b/src/core/init_test.c index 9b9e26b4..0ef13543 100644 --- a/src/core/init_test.c +++ b/src/core/init_test.c @@ -7,93 +7,97 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include <nuts.h> -uint64_t nni_init_get_param( - nng_init_parameter parameter, uint64_t default_value); -uint64_t nni_init_get_effective(nng_init_parameter p); -void nni_init_set_effective(nng_init_parameter p, uint64_t); +nng_init_params *nng_init_get_params(void); void test_init_param(void) { - NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 456) == 456); - nng_init_set_parameter(NNG_INIT_PARAMETER_NONE, 123); - NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); - nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 124); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 124); - NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 123); - nng_fini(); - NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 567) == 567); -} - -void -test_set_effective(void) -{ - nni_init_set_effective(NNG_INIT_PARAMETER_NONE, 999); - NUTS_ASSERT(nni_init_get_param(NNG_INIT_PARAMETER_NONE, 0) == 0); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_PARAMETER_NONE) == 999); - nng_fini(); + nng_init_params *p; + p = nng_init_get_params(); + NUTS_ASSERT(p != NULL); } void test_init_zero_resolvers(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_RESOLVER_THREADS, 0); - NUTS_OPEN(s); - NUTS_CLOSE(s); - NUTS_ASSERT( - nni_init_get_effective(NNG_INIT_NUM_RESOLVER_THREADS) == 1); + nng_init_params *pp; + nng_init_params p = { 0 }; + p.num_resolver_threads = 0; + nng_fini(); + NUTS_PASS(nng_init(&p)); + pp = nng_init_get_params(); + NUTS_ASSERT(pp->num_resolver_threads > 0); nng_fini(); } void test_init_one_task_thread(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 0); - nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 1); - NUTS_OPEN(s); - NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 2); + nng_init_params *pp; + nng_init_params p = { 0 }; + nng_fini(); + p.max_task_threads = 1; + NUTS_PASS(nng_init(&p)); + pp = nng_init_get_params(); + NUTS_ASSERT(pp->max_task_threads == 1); } void test_init_too_many_task_threads(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_TASK_THREADS, 256); - nng_init_set_parameter(NNG_INIT_MAX_TASK_THREADS, 4); + nng_socket s; + nng_init_params *pp; + nng_init_params p = { 0 }; + + p.num_task_threads = 256; + p.max_task_threads = 4; + + nng_fini(); + NUTS_PASS(nng_init(&p)); NUTS_OPEN(s); NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_TASK_THREADS) == 4); - nng_fini(); + pp = nng_init_get_params(); + NUTS_TRUE(pp->num_task_threads == 4); } void test_init_no_expire_thread(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 0); - nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 0); + nng_socket s; + nng_init_params *pp; + nng_init_params p = { 0 }; + + nng_fini(); + p.num_expire_threads = 0; + p.max_expire_threads = 0; + NUTS_PASS(nng_init(&p)); NUTS_OPEN(s); NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 1); - nng_fini(); + pp = nng_init_get_params(); + NUTS_TRUE(pp->num_expire_threads > 0); + NUTS_MSG("Got %d expire threads", pp->num_expire_threads); } void test_init_too_many_expire_threads(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_EXPIRE_THREADS, 256); - nng_init_set_parameter(NNG_INIT_MAX_EXPIRE_THREADS, 2); + nng_socket s; + nng_init_params *pp; + nng_init_params p = { 0 }; + + nng_fini(); + p.num_expire_threads = 256; + p.max_expire_threads = 2; + NUTS_PASS(nng_init(&p)); NUTS_OPEN(s); NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_EXPIRE_THREADS) == 2); - nng_fini(); + pp = nng_init_get_params(); + NUTS_TRUE(pp->num_expire_threads == 2); + NUTS_MSG("Got %d expire threads", pp->num_expire_threads); } // poller tuning only supported on Windows right now @@ -101,31 +105,42 @@ test_init_too_many_expire_threads(void) void test_init_poller_no_threads(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 0); - nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 0); + nng_socket s; + nng_init_params *pp; + nng_init_params p = { 0 }; + + nng_fini(); + p.num_poller_threads = 0; + p.max_poller_threads = 0; + NUTS_PASS(nng_init(&p)); NUTS_OPEN(s); NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 1); - nng_fini(); + pp = nng_init_get_params(); + NUTS_TRUE(pp->num_poller_threads > 0); + NUTS_MSG("Got %d poller threads", pp->num_expire_threads); } void test_init_too_many_poller_threads(void) { - nng_socket s; - nng_init_set_parameter(NNG_INIT_NUM_POLLER_THREADS, 256); - nng_init_set_parameter(NNG_INIT_MAX_POLLER_THREADS, 2); + nng_socket s; + nng_init_params *pp; + nng_init_params p = { 0 }; + + nng_fini(); + p.num_poller_threads = 256; + p.max_poller_threads = 2; + NUTS_PASS(nng_init(&p)); NUTS_OPEN(s); NUTS_CLOSE(s); - NUTS_ASSERT(nni_init_get_effective(NNG_INIT_NUM_POLLER_THREADS) == 2); - nng_fini(); + pp = nng_init_get_params(); + NUTS_TRUE(pp->num_poller_threads == 2); + NUTS_MSG("Got %d poller threads", pp->num_expire_threads); } #endif NUTS_TESTS = { { "init parameter", test_init_param }, - { "init set effective", test_set_effective }, { "init zero resolvers", test_init_zero_resolvers }, { "init one task thread", test_init_one_task_thread }, { "init too many task threads", test_init_too_many_task_threads }, @@ -137,4 +152,4 @@ NUTS_TESTS = { #endif { NULL, NULL }, -};
\ No newline at end of file +}; diff --git a/src/core/listener.c b/src/core/listener.c index 38a7d323..88729f56 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -272,13 +272,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str) int nni_listener_find(nni_listener **lp, uint32_t id) { - int rv; nni_listener *l; - if ((rv = nni_init()) != 0) { - return (rv); - } - nni_mtx_lock(&listeners_lk); if ((l = nni_id_get(&listeners, id)) != NULL) { l->l_ref++; diff --git a/src/core/platform.h b/src/core/platform.h index d7a88238..62a321bf 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -47,11 +47,6 @@ // for abnormal programs on the platform, such as calling abort(). extern void nni_plat_abort(void); -// nni_plat_println is used to emit debug messages. Typically this is used -// during core debugging, or to emit panic messages. Message content will -// not contain newlines, but the output will add them. -extern void nni_plat_println(const char *); - // nni_plat_printf is like printf. It should conform to C99 standard printf, // but is a function to allow platform ports to redirect. It should go to // the same place that nni_plat_println does. @@ -263,18 +258,10 @@ extern void nni_msleep(nni_duration); uint32_t nni_random(void); // nni_plat_init is called to allow the platform the chance to -// do any necessary initialization. This routine MUST be idempotent, -// and thread-safe, and will be called before any other API calls, and -// may be called at any point thereafter. It is permitted to return -// an error if some critical failure initializing the platform occurs, -// but once this succeeds, all future calls must succeed as well, unless -// nni_plat_fini has been called. -// -// The function argument should be called if the platform has not initialized -// (i.e. exactly once), and its result passed back to the caller. If it -// does not return 0 (success), then it may be called again to try to -// initialize the platform again at a later date. -extern int nni_plat_init(int (*)(void)); +// do any necessary initialization. This will be called before any other API +// calls. It is permitted to return an error if some critical failure +// initializing the platform occurs. +extern int nni_plat_init(nng_init_params *); // nni_plat_fini is called to clean up resources. It is intended to // be called as the last thing executed in the library, and no other functions diff --git a/src/core/socket.c b/src/core/socket.c index 2b94e573..c92a1c3b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -364,12 +364,9 @@ nni_sock_recvq(nni_sock *s) int nni_sock_find(nni_sock **sockp, uint32_t id) { - int rv; + int rv = 0; nni_sock *s; - if ((rv = nni_init()) != 0) { - return (rv); - } nni_mtx_lock(&sock_lk); if ((s = nni_id_get(&sock_ids, id)) != NULL) { if (s->s_closed) { @@ -621,8 +618,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) return (NNG_ENOTSUP); } - if (((rv = nni_init()) != 0) || - ((rv = nni_sock_create(&s, proto)) != 0)) { + if ((rv = nni_sock_create(&s, proto)) != 0) { return (rv); } @@ -1076,12 +1072,9 @@ nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) int nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing) { - int rv; + int rv = 0; nni_ctx *ctx; - if ((rv = nni_init()) != 0) { - return (rv); - } nni_mtx_lock(&sock_lk); if ((ctx = nni_id_get(&ctx_ids, id)) != NULL) { // We refuse a reference if either the socket is diff --git a/src/core/stats.c b/src/core/stats.c index c049d611..f01f642e 100644 --- a/src/core/stats.c +++ b/src/core/stats.c @@ -385,10 +385,6 @@ int nng_stats_get(nng_stat **statp) { #ifdef NNG_ENABLE_STATS - int rv; - if ((rv = nni_init()) != 0) { - return (rv); - } return (nni_stat_snapshot(statp, &stats_root)); #else NNI_ARG_UNUSED(statp); diff --git a/src/core/stream.c b/src/core/stream.c index 78029ddc..d900329a 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -169,12 +169,6 @@ nng_stream_dialer_dial(nng_stream_dialer *d, nng_aio *aio) int nng_stream_dialer_alloc_url(nng_stream_dialer **dp, const nng_url *url) { - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); - } - for (int i = 0; stream_drivers[i].scheme != NULL; i++) { if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) { return (stream_drivers[i].dialer_alloc(dp, url)); @@ -189,9 +183,6 @@ nng_stream_dialer_alloc(nng_stream_dialer **dp, const char *uri) nng_url *url; int rv; - if ((rv = nni_init()) != 0) { - return (rv); - } if ((rv = nng_url_parse(&url, uri)) != 0) { return (rv); } @@ -291,12 +282,6 @@ nni_stream_listener_set_tls(nng_stream_listener *l, nng_tls_config *cfg) int nng_stream_listener_alloc_url(nng_stream_listener **lp, const nng_url *url) { - int rv; - - if ((rv = nni_init()) != 0) { - return (rv); - } - for (int i = 0; stream_drivers[i].scheme != NULL; i++) { if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) { return (stream_drivers[i].listener_alloc(lp, url)); @@ -311,10 +296,6 @@ nng_stream_listener_alloc(nng_stream_listener **lp, const char *uri) nng_url *url; int rv; - if ((rv = nni_init()) != 0) { - return (rv); - } - if ((rv = nng_url_parse(&url, uri)) != 0) { return (rv); } diff --git a/src/core/taskq.c b/src/core/taskq.c index 09886596..496c2fab 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "nng/nng.h" typedef struct nni_taskq_thr nni_taskq_thr; struct nni_taskq_thr { @@ -243,24 +244,13 @@ nni_task_fini(nni_task *task) } int -nni_taskq_sys_init(void) +nni_taskq_sys_init(nng_init_params *params) { - int num_thr; - int max_thr; + int16_t num_thr; + int16_t max_thr; -#ifndef NNG_NUM_TASKQ_THREADS -#define NNG_NUM_TASKQ_THREADS (nni_plat_ncpu() * 2) -#endif - -#ifndef NNG_MAX_TASKQ_THREADS -#define NNG_MAX_TASKQ_THREADS 16 -#endif - - max_thr = (int) nni_init_get_param( - NNG_INIT_MAX_TASK_THREADS, NNG_MAX_TASKQ_THREADS); - - num_thr = (int) nni_init_get_param( - NNG_INIT_NUM_TASK_THREADS, NNG_NUM_TASKQ_THREADS); + max_thr = params->max_task_threads; + num_thr = params->num_task_threads; if ((max_thr > 0) && (num_thr > max_thr)) { num_thr = max_thr; @@ -268,9 +258,9 @@ nni_taskq_sys_init(void) if (num_thr < 2) { num_thr = 2; } - nni_init_set_effective(NNG_INIT_NUM_TASK_THREADS, num_thr); + params->num_task_threads = num_thr; - return (nni_taskq_init(&nni_taskq_systq, num_thr)); + return (nni_taskq_init(&nni_taskq_systq, (int) num_thr)); } void diff --git a/src/core/taskq.h b/src/core/taskq.h index ccc54f61..498b4f37 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -1,5 +1,5 @@ // -// Copyright 2022 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -14,6 +14,7 @@ #include "core/defs.h" #include "core/list.h" #include "core/platform.h" +#include "nng/nng.h" typedef struct nni_taskq nni_taskq; typedef struct nni_task nni_task; @@ -60,7 +61,7 @@ extern void nni_task_init(nni_task *, nni_taskq *, nni_cb, void *); // it reschedules the task.) extern void nni_task_fini(nni_task *); -extern int nni_taskq_sys_init(void); +extern int nni_taskq_sys_init(nng_init_params *); extern void nni_taskq_sys_fini(void); // nni_task implementation details are not to be used except by the diff --git a/src/core/tcp.c b/src/core/tcp.c index e54cdee7..7fb67228 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -369,9 +369,6 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa) tcp_listener *l; int rv; - if ((rv = nni_init()) != 0) { - return (rv); - } if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { return (NNG_ENOMEM); } @@ -398,9 +395,6 @@ nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url) int rv; nng_sockaddr sa; - if ((rv = nni_init()) != 0) { - return (rv); - } if ((rv = nni_url_to_address(&sa, url)) != 0) { return (rv); } |
