From 0fb33189287f643b98760a663b75e6016537878a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 30 Dec 2024 17:42:32 -0800 Subject: kqueue: support concurrent pollers This allows greatly increased scalability for kqueue based systems with lots of cores (more likely FreeBSD than Darwin, as most macs only have a smattering of cores), but even for macs we can engage a few cores for system calls giving improvements. The implementation here is pretty simple -- each file descriptor gets assigned to its own kqueue by taking the numeric value of the file descriptor modulo the number of kqueues we have opened. The same approach will be adopted for epoll and Solaris/illumos port events. --- src/platform/posix/posix_pollq_kqueue.c | 99 ++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 27 deletions(-) (limited to 'src/platform') diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index f1f742db..a1479d25 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -30,15 +30,16 @@ typedef struct nni_posix_pollq { int wake_wfd; // write side of wake pipe int wake_rfd; // read side of wake pipe bool closed; // request for worker to exit - int kq; // kqueue handle - nni_thr thr; // worker thread - nni_list reapq; // items to reap + bool init; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap } nni_posix_pollq; #define NNI_MAX_KQUEUE_EVENTS 64 -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; +static nni_posix_pollq *nni_kqueue_pqs; +static int nni_kqueue_npq; void nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) @@ -58,7 +59,8 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); #endif - pq = &nni_posix_global_pollq; + // hopefully FDs are distributed somewhat + pq = &nni_kqueue_pqs[fd % nni_kqueue_npq]; nni_atomic_init(&pf->events); nni_cv_init(&pf->cv, &pq->mtx); @@ -247,40 +249,45 @@ nni_posix_poll_thr(void *arg) } static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) +nni_kqueue_pollq_destroy(nni_posix_pollq *pq) { - nni_mtx_lock(&pq->mtx); - pq->closed = true; - nni_mtx_unlock(&pq->mtx); - nni_plat_pipe_raise(pq->wake_wfd); + if (pq->init) { + nni_mtx_lock(&pq->mtx); + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wake_wfd); - (void) close(pq->wake_wfd); - nni_thr_fini(&pq->thr); - (void) close(pq->wake_rfd); + (void) close(pq->wake_wfd); + nni_thr_fini(&pq->thr); + (void) close(pq->wake_rfd); - if (pq->kq >= 0) { - close(pq->kq); - pq->kq = -1; + if (pq->kq >= 0) { + close(pq->kq); + pq->kq = -1; + } + nni_mtx_fini(&pq->mtx); + pq->init = false; } - nni_mtx_fini(&pq->mtx); } static int -nni_posix_pollq_create(nni_posix_pollq *pq) +nni_kqueue_pollq_create(nni_posix_pollq *pq) { int rv; struct kevent ev; + nni_mtx_init(&pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); + pq->kq = -1; + pq->init = true; + if ((pq->kq = kqueue()) < 0) { return (nni_plat_errno(errno)); } - nni_mtx_init(&pq->mtx); - NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); - if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) { - nni_posix_pollq_destroy(pq); + nni_kqueue_pollq_destroy(pq); return (rv); } @@ -291,7 +298,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq) if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { rv = nni_plat_errno(rv); - nni_posix_pollq_destroy(pq); + nni_kqueue_pollq_destroy(pq); return (rv); } @@ -299,17 +306,55 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (0); } +// Our concurrency model is simple. We assign PFDs to one of pollq threads. +// The PFD will be bound to that pollq for its entire lifetime. The way to get +// good balance is to have lots of PFDs. If you don't have lots of PFDs, you +// do not need this scalability anyway. + int nni_posix_pollq_sysinit(nng_init_params *params) { - NNI_ARG_UNUSED(params); - return (nni_posix_pollq_create(&nni_posix_global_pollq)); + int16_t num_thr; + int16_t max_thr; + + max_thr = params->max_poller_threads; + num_thr = params->num_poller_threads; + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + params->num_poller_threads = num_thr; + if ((nni_kqueue_pqs = NNI_ALLOC_STRUCTS(nni_kqueue_pqs, num_thr)) == + NULL) { + return (NNG_ENOMEM); + } + + nni_kqueue_npq = num_thr; + for (int i = 0; i < num_thr; i++) { + int rv; + if ((rv = nni_kqueue_pollq_create(&nni_kqueue_pqs[i])) != 0) { + nni_posix_pollq_sysfini(); + return (rv); + } + } + + return (0); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_destroy(&nni_posix_global_pollq); + if (nni_kqueue_npq > 0) { + for (int i = 0; i < nni_kqueue_npq; i++) { + nni_kqueue_pollq_destroy(&nni_kqueue_pqs[i]); + } + NNI_FREE_STRUCTS(nni_kqueue_pqs, nni_kqueue_npq); + nni_kqueue_pqs = NULL; + nni_kqueue_npq = 0; + } } #endif // NNG_HAVE_KQUEUE -- cgit v1.2.3-70-g09d2