From 8dbcb5def2656798af46da835a90421a99cd6f36 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 30 Dec 2024 18:38:06 -0800 Subject: poller: port events scalability (illumos/Solaris) --- src/platform/posix/posix_pollq_port.c | 66 +++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c index 2e3debad..58691613 100644 --- a/src/platform/posix/posix_pollq_port.c +++ b/src/platform/posix/posix_pollq_port.c @@ -32,17 +32,18 @@ typedef struct nni_posix_pollq { nni_thr thr; // worker thread nni_mtx mtx; nni_cv cv; + bool init; } nni_posix_pollq; -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; +static nni_posix_pollq *nni_port_pqs; +static int nni_port_npq; void nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg) { nni_posix_pollq *pq; - pq = &nni_posix_global_pollq; + pq = &nni_port_pqs[fd % nni_port_npq]; (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); @@ -135,7 +136,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) } static void -nni_posix_poll_thr(void *arg) +nni_port_thr(void *arg) { for (;;) { nni_posix_pollq *pq = arg; @@ -193,10 +194,12 @@ nni_posix_poll_thr(void *arg) static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - (void) close(pq->port); - nni_cv_destroy(&pq->cv); - nni_mtx_fini(&pq->mtx); - nni_thr_fini(&pq->thr); + if (pq->init) { + (void) close(pq->port); + nni_cv_destroy(&pq->cv); + nni_mtx_fini(&pq->mtx); + nni_thr_fini(&pq->thr); + } } static int @@ -204,18 +207,21 @@ nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, pq->mtx); + pq->init = true; + pq->port = -1; + if ((pq->port = port_create()) < 0) { return (nni_plat_errno(errno)); } - if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + if ((rv = nni_thr_init(&pq->thr, nni_port_thr, pq)) != 0) { nni_posix_pollq_destroy(pq); return (rv); } nni_thr_set_name(&pq->thr, "nng:poll:port"); - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, pq->mtx); nni_thr_run(&pq->thr); return (0); } @@ -223,14 +229,46 @@ nni_posix_pollq_create(nni_posix_pollq *pq) int nni_posix_pollq_sysinit(nng_init_params *params) { - NNI_ARG_UNUSED(params); - return (nni_posix_pollq_create(&nni_posix_global_pollq)); + int16_t num_thr; + int16_t max_thr; + + max_thr = params->max_poller_threads; + num_thr = params->num_poller_threads; + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + params->num_poller_threads = num_thr; + if ((nni_port_pqs = NNI_ALLOC_STRUCTS(nni_port_pqs, num_thr)) == + NULL) { + return (NNG_ENOMEM); + } + + nni_port_npq = num_thr; + for (int i = 0; i < num_thr; i++) { + int rv; + if ((rv = nni_port_pq_create(&nni_port_pqs[i])) != 0) { + nni_posix_pollq_sysfini(); + return (rv); + } + } + return (0); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_destroy(&nni_posix_global_pollq); + if (nni_port_npq > 0) { + for (int i = 0; i < nni_port_npq; i++) { + nni_port_pq_destroy(&nni_port_pqs[i]); + } + NNI_FREE_STRUCTS(nni_port_pqs, nni_port_npq); + nni_port_pqs = NULL; + nni_port_npq = 0; + } } #endif // NNG_HAVE_PORT_CREATE -- cgit v1.2.3-70-g09d2