aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-30 17:55:47 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-30 18:01:20 -0800
commit90faa53628ebef4b800a9dccd6cf36f52916ebd2 (patch)
treea194447c915be96261811d96cb9d21e7188fb706 /src/platform/posix
parent0fb33189287f643b98760a663b75e6016537878a (diff)
downloadnng-90faa53628ebef4b800a9dccd6cf36f52916ebd2.tar.gz
nng-90faa53628ebef4b800a9dccd6cf36f52916ebd2.tar.bz2
nng-90faa53628ebef4b800a9dccd6cf36f52916ebd2.zip
fixes #530 POSIX pollqs should scale horizontally (epoll)
This should help Linux platforms scale even further with NNG. Port events and *maybe* poll are the last to do. Probably select will remain left in the cold, because honestly select based systems are already performance constrained.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_pollq_epoll.c94
1 files changed, 66 insertions, 28 deletions
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 90d10926..49c93627 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -40,19 +40,21 @@ typedef struct nni_posix_pollq {
int epfd; // epoll handle
int evfd; // event fd (to wake us for other stuff)
bool close; // request for worker to exit
- nni_thr thr; // worker thread
+ bool init;
+ nni_thr thr; // worker thread
nni_list reapq;
} nni_posix_pollq;
// single global instance for now.
-static nni_posix_pollq nni_posix_global_pollq;
+static nni_posix_pollq *nni_epoll_pqs;
+static int nni_epoll_npq;
void
nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
nni_posix_pollq *pq;
- pq = &nni_posix_global_pollq;
+ pq = &nni_epoll_pqs[fd % nni_epoll_npq];
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
@@ -191,7 +193,7 @@ nni_posix_pollq_reap(nni_posix_pollq *pq)
}
static void
-nni_posix_poll_thr(void *arg)
+nni_epoll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
struct epoll_event events[NNI_MAX_EPOLL_EVENTS];
@@ -245,30 +247,32 @@ nni_posix_poll_thr(void *arg)
}
static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
+nni_epoll_pq_destroy(nni_posix_pollq *pq)
{
uint64_t one = 1;
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
+ if (pq->init) {
+ nni_mtx_lock(&pq->mtx);
+ pq->close = true;
- if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
- // This should never occur, and if it does it could
- // lead to a hang.
- nni_panic("BUG! unable to write to evfd!");
- }
- nni_mtx_unlock(&pq->mtx);
+ if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
+ // This should never occur, and if it does it could
+ // lead to a hang.
+ nni_panic("BUG! unable to write to evfd!");
+ }
+ nni_mtx_unlock(&pq->mtx);
- nni_thr_fini(&pq->thr);
+ nni_thr_fini(&pq->thr);
- close(pq->evfd);
- close(pq->epfd);
+ close(pq->evfd);
+ close(pq->epfd);
- nni_mtx_fini(&pq->mtx);
+ nni_mtx_fini(&pq->mtx);
+ }
}
static int
-nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
+nni_epoll_pq_add_eventfd(nni_posix_pollq *pq)
{
// add event fd so we can wake ourself on exit
struct epoll_event ev;
@@ -295,10 +299,16 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
}
static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
+nni_epoll_pq_create(nni_posix_pollq *pq)
{
int rv;
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
+ pq->epfd = -1;
+ pq->init = true;
+
#if NNG_HAVE_EPOLL_CREATE1
if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) {
return (nni_plat_errno(errno));
@@ -314,16 +324,12 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
pq->close = false;
- NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
- nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
-
- if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
+ if ((rv = nni_epoll_pq_add_eventfd(pq)) != 0) {
(void) close(pq->epfd);
nni_mtx_fini(&pq->mtx);
return (rv);
}
- if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ if ((rv = nni_thr_init(&pq->thr, nni_epoll_thr, pq)) != 0) {
(void) close(pq->epfd);
(void) close(pq->evfd);
nni_mtx_fini(&pq->mtx);
@@ -337,14 +343,46 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
- NNI_ARG_UNUSED(params);
- return (nni_posix_pollq_create(&nni_posix_global_pollq));
+ int16_t num_thr;
+ int16_t max_thr;
+
+ max_thr = params->max_poller_threads;
+ num_thr = params->num_poller_threads;
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ params->num_poller_threads = num_thr;
+ if ((nni_epoll_pqs = NNI_ALLOC_STRUCTS(nni_epoll_pqs, num_thr)) ==
+ NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_epoll_npq = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ int rv;
+ if ((rv = nni_epoll_pq_create(&nni_epoll_pqs[i])) != 0) {
+ nni_posix_pollq_sysfini();
+ return (rv);
+ }
+ }
+ return (0);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_destroy(&nni_posix_global_pollq);
+ if (nni_epoll_npq > 0) {
+ for (int i = 0; i < nni_epoll_npq; i++) {
+ nni_epoll_pq_destroy(&nni_epoll_pqs[i]);
+ }
+ NNI_FREE_STRUCTS(nni_epoll_pqs, nni_epoll_npq);
+ nni_epoll_pqs = NULL;
+ nni_epoll_npq = 0;
+ }
}
#endif // NNG_HAVE_EPOLL