aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_pollq_poll.c169
1 files changed, 84 insertions, 85 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index f5a4e153..7adef5ed 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -17,6 +17,7 @@
#include <poll.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
@@ -32,26 +33,18 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
-// Locking strategy: We use the pollq lock to guard the lists on the pollq,
-// the nfds (which counts the number of items in the pollq), the pollq
-// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
-// use a lock on the pfd only to protect the the events field (which we treat
-// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
-// lock is therefore a leaf lock, which is sometimes acquired while holding
-// the pq lock. Every reasonable effort is made to minimize holding locks.
-// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
-// persists until the pfd is destroyed.)
-
typedef struct nni_posix_pollq {
- nni_mtx mtx;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- bool closing; // request for worker to exit
- bool closed;
- nni_thr thr; // worker thread
- nni_list pollq; // armed nodes
- nni_list reapq;
+ nni_mtx mtx;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ nni_thr thr; // worker thread
+ nni_list pollq; // armed nodes
+ nni_list reapq;
+ nni_list addq;
+ struct pollfd *fds;
+ nni_posix_pfd **pfds;
+ int nalloc;
+ bool closed;
} nni_posix_pollq;
static nni_posix_pollq nni_posix_global_pollq;
@@ -87,16 +80,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->arg = NULL;
pfd->pq = pq;
nni_mtx_lock(&pq->mtx);
- if (pq->closing) {
- nni_mtx_unlock(&pq->mtx);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (NNG_ECLOSED);
- }
- nni_list_append(&pq->pollq, pfd);
- pq->nfds++;
+ nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wakewfd);
*pfdp = pfd;
return (0);
}
@@ -132,7 +118,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
nni_mtx_lock(&pq->mtx);
if (nni_list_active(&pq->pollq, pfd)) {
nni_list_remove(&pq->pollq, pfd);
- pq->nfds--;
}
if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
@@ -170,44 +155,41 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
}
static void
+posix_poll_add(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pfd;
+ // Also lets reap anything that was in the reaplist!
+ while ((pfd = nni_list_first(&pq->addq)) != NULL) {
+ nni_list_remove(&pq->addq, pfd);
+ nni_list_append(&pq->pollq, pfd);
+ }
+}
+
+static void
+posix_poll_reap(nni_posix_pollq *pq, unsigned events)
+{
+ nni_posix_pfd *pfd;
+ if (events & POLLHUP) {
+ pq->closed = true;
+ }
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ nni_cv_wake(&pfd->cv);
+ }
+}
+
+static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq *pq = arg;
- int nalloc = 0;
- struct pollfd *fds = NULL;
- nni_posix_pfd **pfds = NULL;
+ nni_posix_pollq *pq = arg;
+ struct pollfd *fds = pq->fds;
+ nni_posix_pfd **pfds = pq->pfds;
for (;;) {
int nfds;
unsigned events;
nni_posix_pfd *pfd;
- nni_mtx_lock(&pq->mtx);
- while (nalloc < (pq->nfds + 1)) {
- int n = pq->nfds + 128;
-
- // Drop the lock while we sleep or allocate. This
- // allows additional items to be added or removed (!)
- // while we wait.
- nni_mtx_unlock(&pq->mtx);
-
- // Toss the old ones first; avoids *doubling* memory
- // consumption during alloc.
- NNI_FREE_STRUCTS(fds, nalloc);
- NNI_FREE_STRUCTS(pfds, nalloc);
- nalloc = 0;
-
- if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
- nni_msleep(10); // sleep for a bit, try later
- } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
- NNI_FREE_STRUCTS(pfds, n);
- nni_msleep(10);
- } else {
- nalloc = n;
- }
- nni_mtx_lock(&pq->mtx);
- }
-
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
fds[0].fd = pq->wakerfd;
@@ -216,21 +198,6 @@ nni_posix_poll_thr(void *arg)
pfds[pq->wakerfd] = NULL;
nfds = 1;
- // Also lets reap anything that was in the reaplist!
- while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
- nni_list_remove(&pq->reapq, pfd);
- nni_cv_wake(&pfd->cv);
- }
-
- // If we're closing down, bail now. This is done *after* we
- // have ensured that the reapq is empty. Anything still in
- // the pollq is not going to receive further callbacks.
- if (pq->closing) {
- pq->closed = true;
- nni_mtx_unlock(&pq->mtx);
- break;
- }
-
// Set up the poll list.
NNI_LIST_FOREACH (&pq->pollq, pfd) {
@@ -246,7 +213,6 @@ nni_posix_poll_thr(void *arg)
nfds++;
}
}
- nni_mtx_unlock(&pq->mtx);
// We could get the result from poll, and avoid iterating
// over the entire set of pollfds, but since on average we
@@ -267,6 +233,10 @@ nni_posix_poll_thr(void *arg)
}
if (pfd == NULL || fd == pq->wakerfd) {
nni_plat_pipe_clear(pq->wakerfd);
+ nni_mtx_lock(&pq->mtx);
+ posix_poll_add(pq);
+ posix_poll_reap(pq, events);
+ nni_mtx_unlock(&pq->mtx);
if (events & POLLHUP) {
return;
}
@@ -291,23 +261,25 @@ nni_posix_poll_thr(void *arg)
}
}
}
-
- NNI_FREE_STRUCTS(fds, nalloc);
- NNI_FREE_STRUCTS(pfds, nalloc);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- nni_mtx_lock(&pq->mtx);
- pq->closing = true;
- nni_mtx_unlock(&pq->mtx);
-
nni_plat_pipe_raise(pq->wakewfd);
+ (void) close(pq->wakewfd);
nni_thr_fini(&pq->thr);
- nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ (void) close(pq->wakerfd);
nni_mtx_fini(&pq->mtx);
+ if (pq->fds != NULL) {
+ NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
+ pq->fds = NULL;
+ }
+ if (pq->pfds != NULL) {
+ NNI_FREE_STRUCTS(pq->pfds, pq->nalloc);
+ pq->pfds = NULL;
+ }
}
static int
@@ -317,8 +289,35 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
- pq->closing = false;
- pq->closed = false;
+ NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node);
+
+ pq->closed = false;
+#if NNG_MAX_OPEN
+ pq->nalloc = NNG_MAX_OPEN;
+#else
+ struct rlimit limits;
+ pq->nalloc = 0;
+ if (getrlimit(RLIMIT_NOFILE, &limits) == 0) {
+ if (limits.rlim_cur != RLIM_INFINITY) {
+ pq->nalloc = (int) limits.rlim_cur;
+ } else if (limits.rlim_max != RLIM_INFINITY) {
+ pq->nalloc = (int) limits.rlim_max;
+ }
+ pq->nalloc = (int) limits.rlim_max;
+ }
+#endif
+ if (pq->nalloc == 0) {
+ // 5K files default. If you need more, either set
+ // rlimit properly, or
+ pq->nalloc = 5000;
+ }
+ if (pq->nalloc < 20) { // minimum allowed per POSIX
+ pq->nalloc = 20;
+ }
+ if (((pq->pfds = NNI_ALLOC_STRUCTS(pq->pfds, pq->nalloc)) == NULL) ||
+ ((pq->fds = NNI_ALLOC_STRUCTS(pq->fds, pq->nalloc)) == NULL)) {
+ return (NNG_ENOMEM);
+ }
if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);