aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-30 17:42:32 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-30 17:42:32 -0800
commit0fb33189287f643b98760a663b75e6016537878a (patch)
treeeeb3d8961590d2e457c167530a40eb7ee723b71a /src
parent7f7a8194a60e9f5883866dd8b8c22d4576fc1abc (diff)
downloadnng-0fb33189287f643b98760a663b75e6016537878a.tar.gz
nng-0fb33189287f643b98760a663b75e6016537878a.tar.bz2
nng-0fb33189287f643b98760a663b75e6016537878a.zip
kqueue: support concurrent pollers
This allows greatly increased scalability for kqueue based systems with lots of cores (more likely FreeBSD than Darwin, as most macs only have a smattering of cores), but even for macs we can engage a few cores for system calls giving improvements. The implementation here is pretty simple -- each file descriptor gets assigned to its own kqueue by taking the numeric value of the file descriptor modulo the number of kqueues we have opened. The same approach will be adopted for epoll and Solaris/illumos port events.
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c99
1 files changed, 72 insertions, 27 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index f1f742db..a1479d25 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -30,15 +30,16 @@ typedef struct nni_posix_pollq {
int wake_wfd; // write side of wake pipe
int wake_rfd; // read side of wake pipe
bool closed; // request for worker to exit
- int kq; // kqueue handle
- nni_thr thr; // worker thread
- nni_list reapq; // items to reap
+ bool init;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
} nni_posix_pollq;
#define NNI_MAX_KQUEUE_EVENTS 64
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
+static nni_posix_pollq *nni_kqueue_pqs;
+static int nni_kqueue_npq;
void
nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
@@ -58,7 +59,8 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
(void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
#endif
- pq = &nni_posix_global_pollq;
+ // hopefully FDs are distributed somewhat
+ pq = &nni_kqueue_pqs[fd % nni_kqueue_npq];
nni_atomic_init(&pf->events);
nni_cv_init(&pf->cv, &pq->mtx);
@@ -247,40 +249,45 @@ nni_posix_poll_thr(void *arg)
}
static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
+nni_kqueue_pollq_destroy(nni_posix_pollq *pq)
{
- nni_mtx_lock(&pq->mtx);
- pq->closed = true;
- nni_mtx_unlock(&pq->mtx);
- nni_plat_pipe_raise(pq->wake_wfd);
+ if (pq->init) {
+ nni_mtx_lock(&pq->mtx);
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wake_wfd);
- (void) close(pq->wake_wfd);
- nni_thr_fini(&pq->thr);
- (void) close(pq->wake_rfd);
+ (void) close(pq->wake_wfd);
+ nni_thr_fini(&pq->thr);
+ (void) close(pq->wake_rfd);
- if (pq->kq >= 0) {
- close(pq->kq);
- pq->kq = -1;
+ if (pq->kq >= 0) {
+ close(pq->kq);
+ pq->kq = -1;
+ }
+ nni_mtx_fini(&pq->mtx);
+ pq->init = false;
}
- nni_mtx_fini(&pq->mtx);
}
static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
+nni_kqueue_pollq_create(nni_posix_pollq *pq)
{
int rv;
struct kevent ev;
+ nni_mtx_init(&pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ pq->kq = -1;
+ pq->init = true;
+
if ((pq->kq = kqueue()) < 0) {
return (nni_plat_errno(errno));
}
- nni_mtx_init(&pq->mtx);
- NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
-
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ nni_kqueue_pollq_destroy(pq);
return (rv);
}
@@ -291,7 +298,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
rv = nni_plat_errno(rv);
- nni_posix_pollq_destroy(pq);
+ nni_kqueue_pollq_destroy(pq);
return (rv);
}
@@ -299,17 +306,55 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
+// Our concurrency model is simple. We assign PFDs to one of pollq threads.
+// The PFD will be bound to that pollq for its entire lifetime. The way to get
+// good balance is to have lots of PFDs. If you don't have lots of PFDs, you
+// do not need this scalability anyway.
+
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
- NNI_ARG_UNUSED(params);
- return (nni_posix_pollq_create(&nni_posix_global_pollq));
+ int16_t num_thr;
+ int16_t max_thr;
+
+ max_thr = params->max_poller_threads;
+ num_thr = params->num_poller_threads;
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ params->num_poller_threads = num_thr;
+ if ((nni_kqueue_pqs = NNI_ALLOC_STRUCTS(nni_kqueue_pqs, num_thr)) ==
+ NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_kqueue_npq = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ int rv;
+ if ((rv = nni_kqueue_pollq_create(&nni_kqueue_pqs[i])) != 0) {
+ nni_posix_pollq_sysfini();
+ return (rv);
+ }
+ }
+
+ return (0);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_destroy(&nni_posix_global_pollq);
+ if (nni_kqueue_npq > 0) {
+ for (int i = 0; i < nni_kqueue_npq; i++) {
+ nni_kqueue_pollq_destroy(&nni_kqueue_pqs[i]);
+ }
+ NNI_FREE_STRUCTS(nni_kqueue_pqs, nni_kqueue_npq);
+ nni_kqueue_pqs = NULL;
+ nni_kqueue_npq = 0;
+ }
}
#endif // NNG_HAVE_KQUEUE