aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-19 13:05:23 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-19 13:05:23 -0800
commit60f63557d87528497fe1392fa6a676b2a51efb16 (patch)
tree36efdb0fa027594b2e293c21f1a8115cd359eb0f /src/platform/posix
parent5e18eb4f18af570abf84a615db5235e2e9415c75 (diff)
downloadnng-60f63557d87528497fe1392fa6a676b2a51efb16.tar.gz
nng-60f63557d87528497fe1392fa6a676b2a51efb16.tar.bz2
nng-60f63557d87528497fe1392fa6a676b2a51efb16.zip
poll: performance improvements, simplifications
We preallocate the arrays used for pollfds, based on what the system can tolerate (tunable with NNG_MAX_OPEN), and we change the code for inserting and removing pollfds from the list so that it can run without acquiring the locks during the main loop, only when adding or removing files. The poll() implementation is very nearly lock free in the hot code path, and soon will be.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_pollq_poll.c169
1 files changed, 84 insertions, 85 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index f5a4e153..7adef5ed 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -17,6 +17,7 @@
#include <poll.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
@@ -32,26 +33,18 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
-// Locking strategy: We use the pollq lock to guard the lists on the pollq,
-// the nfds (which counts the number of items in the pollq), the pollq
-// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
-// use a lock on the pfd only to protect the the events field (which we treat
-// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
-// lock is therefore a leaf lock, which is sometimes acquired while holding
-// the pq lock. Every reasonable effort is made to minimize holding locks.
-// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
-// persists until the pfd is destroyed.)
-
typedef struct nni_posix_pollq {
- nni_mtx mtx;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- bool closing; // request for worker to exit
- bool closed;
- nni_thr thr; // worker thread
- nni_list pollq; // armed nodes
- nni_list reapq;
+ nni_mtx mtx;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ nni_thr thr; // worker thread
+ nni_list pollq; // armed nodes
+ nni_list reapq;
+ nni_list addq;
+ struct pollfd *fds;
+ nni_posix_pfd **pfds;
+ int nalloc;
+ bool closed;
} nni_posix_pollq;
static nni_posix_pollq nni_posix_global_pollq;
@@ -87,16 +80,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->arg = NULL;
pfd->pq = pq;
nni_mtx_lock(&pq->mtx);
- if (pq->closing) {
- nni_mtx_unlock(&pq->mtx);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (NNG_ECLOSED);
- }
- nni_list_append(&pq->pollq, pfd);
- pq->nfds++;
+ nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wakewfd);
*pfdp = pfd;
return (0);
}
@@ -132,7 +118,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
nni_mtx_lock(&pq->mtx);
if (nni_list_active(&pq->pollq, pfd)) {
nni_list_remove(&pq->pollq, pfd);
- pq->nfds--;
}
if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
@@ -170,44 +155,41 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
}
static void
+posix_poll_add(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pfd;
+ // Also lets reap anything that was in the reaplist!
+ while ((pfd = nni_list_first(&pq->addq)) != NULL) {
+ nni_list_remove(&pq->addq, pfd);
+ nni_list_append(&pq->pollq, pfd);
+ }
+}
+
+static void
+posix_poll_reap(nni_posix_pollq *pq, unsigned events)
+{
+ nni_posix_pfd *pfd;
+ if (events & POLLHUP) {
+ pq->closed = true;
+ }
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ nni_cv_wake(&pfd->cv);
+ }
+}
+
+static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq *pq = arg;
- int nalloc = 0;
- struct pollfd *fds = NULL;
- nni_posix_pfd **pfds = NULL;
+ nni_posix_pollq *pq = arg;
+ struct pollfd *fds = pq->fds;
+ nni_posix_pfd **pfds = pq->pfds;
for (;;) {
int nfds;
unsigned events;
nni_posix_pfd *pfd;
- nni_mtx_lock(&pq->mtx);
- while (nalloc < (pq->nfds + 1)) {
- int n = pq->nfds + 128;
-
- // Drop the lock while we sleep or allocate. This
- // allows additional items to be added or removed (!)
- // while we wait.
- nni_mtx_unlock(&pq->mtx);
-
- // Toss the old ones first; avoids *doubling* memory
- // consumption during alloc.
- NNI_FREE_STRUCTS(fds, nalloc);
- NNI_FREE_STRUCTS(pfds, nalloc);
- nalloc = 0;
-
- if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
- nni_msleep(10); // sleep for a bit, try later
- } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
- NNI_FREE_STRUCTS(pfds, n);
- nni_msleep(10);
- } else {
- nalloc = n;
- }
- nni_mtx_lock(&pq->mtx);
- }
-
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
fds[0].fd = pq->wakerfd;
@@ -216,21 +198,6 @@ nni_posix_poll_thr(void *arg)
pfds[pq->wakerfd] = NULL;
nfds = 1;
- // Also lets reap anything that was in the reaplist!
- while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
- nni_list_remove(&pq->reapq, pfd);
- nni_cv_wake(&pfd->cv);
- }
-
- // If we're closing down, bail now. This is done *after* we
- // have ensured that the reapq is empty. Anything still in
- // the pollq is not going to receive further callbacks.
- if (pq->closing) {
- pq->closed = true;
- nni_mtx_unlock(&pq->mtx);
- break;
- }
-
// Set up the poll list.
NNI_LIST_FOREACH (&pq->pollq, pfd) {
@@ -246,7 +213,6 @@ nni_posix_poll_thr(void *arg)
nfds++;
}
}
- nni_mtx_unlock(&pq->mtx);
// We could get the result from poll, and avoid iterating
// over the entire set of pollfds, but since on average we
@@ -267,6 +233,10 @@ nni_posix_poll_thr(void *arg)
}
if (pfd == NULL || fd == pq->wakerfd) {
nni_plat_pipe_clear(pq->wakerfd);
+ nni_mtx_lock(&pq->mtx);
+ posix_poll_add(pq);
+ posix_poll_reap(pq, events);
+ nni_mtx_unlock(&pq->mtx);
if (events & POLLHUP) {
return;
}
@@ -291,23 +261,25 @@ nni_posix_poll_thr(void *arg)
}
}
}
-
- NNI_FREE_STRUCTS(fds, nalloc);
- NNI_FREE_STRUCTS(pfds, nalloc);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- nni_mtx_lock(&pq->mtx);
- pq->closing = true;
- nni_mtx_unlock(&pq->mtx);
-
nni_plat_pipe_raise(pq->wakewfd);
+ (void) close(pq->wakewfd);
nni_thr_fini(&pq->thr);
- nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ (void) close(pq->wakerfd);
nni_mtx_fini(&pq->mtx);
+ if (pq->fds != NULL) {
+ NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
+ pq->fds = NULL;
+ }
+ if (pq->pfds != NULL) {
+ NNI_FREE_STRUCTS(pq->pfds, pq->nalloc);
+ pq->pfds = NULL;
+ }
}
static int
@@ -317,8 +289,35 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
- pq->closing = false;
- pq->closed = false;
+ NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node);
+
+ pq->closed = false;
+#if NNG_MAX_OPEN
+ pq->nalloc = NNG_MAX_OPEN;
+#else
+ struct rlimit limits;
+ pq->nalloc = 0;
+ if (getrlimit(RLIMIT_NOFILE, &limits) == 0) {
+ if (limits.rlim_cur != RLIM_INFINITY) {
+ pq->nalloc = (int) limits.rlim_cur;
+ } else if (limits.rlim_max != RLIM_INFINITY) {
+ pq->nalloc = (int) limits.rlim_max;
+ }
+ pq->nalloc = (int) limits.rlim_max;
+ }
+#endif
+ if (pq->nalloc == 0) {
+ // 5K files default. If you need more, either set
+ // rlimit properly, or
+ pq->nalloc = 5000;
+ }
+ if (pq->nalloc < 20) { // minimum allowed per POSIX
+ pq->nalloc = 20;
+ }
+ if (((pq->pfds = NNI_ALLOC_STRUCTS(pq->pfds, pq->nalloc)) == NULL) ||
+ ((pq->fds = NNI_ALLOC_STRUCTS(pq->fds, pq->nalloc)) == NULL)) {
+ return (NNG_ENOMEM);
+ }
if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);