diff options
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 169 |
1 files changed, 84 insertions, 85 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index f5a4e153..7adef5ed 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -17,6 +17,7 @@ #include <poll.h> #include <stdlib.h> #include <string.h> +#include <sys/resource.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/uio.h> @@ -32,26 +33,18 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). -// Locking strategy: We use the pollq lock to guard the lists on the pollq, -// the nfds (which counts the number of items in the pollq), the pollq -// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We -// use a lock on the pfd only to protect the the events field (which we treat -// as an atomic bitfield), and the cb and arg pointers. Note that the pfd -// lock is therefore a leaf lock, which is sometimes acquired while holding -// the pq lock. Every reasonable effort is made to minimize holding locks. -// (Btw, pfd->fd is not guarded, because it is set at pfd creation and -// persists until the pfd is destroyed.) - typedef struct nni_posix_pollq { - nni_mtx mtx; - int nfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - bool closing; // request for worker to exit - bool closed; - nni_thr thr; // worker thread - nni_list pollq; // armed nodes - nni_list reapq; + nni_mtx mtx; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + nni_thr thr; // worker thread + nni_list pollq; // armed nodes + nni_list reapq; + nni_list addq; + struct pollfd *fds; + nni_posix_pfd **pfds; + int nalloc; + bool closed; } nni_posix_pollq; static nni_posix_pollq nni_posix_global_pollq; @@ -87,16 +80,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pfd->arg = NULL; pfd->pq = pq; nni_mtx_lock(&pq->mtx); - if (pq->closing) { - nni_mtx_unlock(&pq->mtx); - nni_cv_fini(&pfd->cv); - nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); - return (NNG_ECLOSED); - } - nni_list_append(&pq->pollq, pfd); - pq->nfds++; + nni_list_append(&pq->addq, pfd); nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wakewfd); *pfdp = pfd; return (0); } @@ -132,7 +118,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) nni_mtx_lock(&pq->mtx); if (nni_list_active(&pq->pollq, pfd)) { nni_list_remove(&pq->pollq, pfd); - pq->nfds--; } if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { @@ -170,44 +155,41 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) } static void +posix_poll_add(nni_posix_pollq *pq) +{ + nni_posix_pfd *pfd; + // Also lets reap anything that was in the reaplist! + while ((pfd = nni_list_first(&pq->addq)) != NULL) { + nni_list_remove(&pq->addq, pfd); + nni_list_append(&pq->pollq, pfd); + } +} + +static void +posix_poll_reap(nni_posix_pollq *pq, unsigned events) +{ + nni_posix_pfd *pfd; + if (events & POLLHUP) { + pq->closed = true; + } + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pfd); + nni_cv_wake(&pfd->cv); + } +} + +static void nni_posix_poll_thr(void *arg) { - nni_posix_pollq *pq = arg; - int nalloc = 0; - struct pollfd *fds = NULL; - nni_posix_pfd **pfds = NULL; + nni_posix_pollq *pq = arg; + struct pollfd *fds = pq->fds; + nni_posix_pfd **pfds = pq->pfds; for (;;) { int nfds; unsigned events; nni_posix_pfd *pfd; - nni_mtx_lock(&pq->mtx); - while (nalloc < (pq->nfds + 1)) { - int n = pq->nfds + 128; - - // Drop the lock while we sleep or allocate. This - // allows additional items to be added or removed (!) - // while we wait. - nni_mtx_unlock(&pq->mtx); - - // Toss the old ones first; avoids *doubling* memory - // consumption during alloc. - NNI_FREE_STRUCTS(fds, nalloc); - NNI_FREE_STRUCTS(pfds, nalloc); - nalloc = 0; - - if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) { - nni_msleep(10); // sleep for a bit, try later - } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) { - NNI_FREE_STRUCTS(pfds, n); - nni_msleep(10); - } else { - nalloc = n; - } - nni_mtx_lock(&pq->mtx); - } - // The waker pipe is set up so that we will be woken // when it is written (this allows us to be signaled). fds[0].fd = pq->wakerfd; @@ -216,21 +198,6 @@ nni_posix_poll_thr(void *arg) pfds[pq->wakerfd] = NULL; nfds = 1; - // Also lets reap anything that was in the reaplist! - while ((pfd = nni_list_first(&pq->reapq)) != NULL) { - nni_list_remove(&pq->reapq, pfd); - nni_cv_wake(&pfd->cv); - } - - // If we're closing down, bail now. This is done *after* we - // have ensured that the reapq is empty. Anything still in - // the pollq is not going to receive further callbacks. - if (pq->closing) { - pq->closed = true; - nni_mtx_unlock(&pq->mtx); - break; - } - // Set up the poll list. NNI_LIST_FOREACH (&pq->pollq, pfd) { @@ -246,7 +213,6 @@ nni_posix_poll_thr(void *arg) nfds++; } } - nni_mtx_unlock(&pq->mtx); // We could get the result from poll, and avoid iterating // over the entire set of pollfds, but since on average we @@ -267,6 +233,10 @@ nni_posix_poll_thr(void *arg) } if (pfd == NULL || fd == pq->wakerfd) { nni_plat_pipe_clear(pq->wakerfd); + nni_mtx_lock(&pq->mtx); + posix_poll_add(pq); + posix_poll_reap(pq, events); + nni_mtx_unlock(&pq->mtx); if (events & POLLHUP) { return; } @@ -291,23 +261,25 @@ nni_posix_poll_thr(void *arg) } } } - - NNI_FREE_STRUCTS(fds, nalloc); - NNI_FREE_STRUCTS(pfds, nalloc); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - nni_mtx_lock(&pq->mtx); - pq->closing = true; - nni_mtx_unlock(&pq->mtx); - nni_plat_pipe_raise(pq->wakewfd); + (void) close(pq->wakewfd); nni_thr_fini(&pq->thr); - nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + (void) close(pq->wakerfd); nni_mtx_fini(&pq->mtx); + if (pq->fds != NULL) { + NNI_FREE_STRUCTS(pq->fds, pq->nalloc); + pq->fds = NULL; + } + if (pq->pfds != NULL) { + NNI_FREE_STRUCTS(pq->pfds, pq->nalloc); + pq->pfds = NULL; + } } static int @@ -317,8 +289,35 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node); NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); - pq->closing = false; - pq->closed = false; + NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node); + + pq->closed = false; +#if NNG_MAX_OPEN + pq->nalloc = NNG_MAX_OPEN; +#else + struct rlimit limits; + pq->nalloc = 0; + if (getrlimit(RLIMIT_NOFILE, &limits) == 0) { + if (limits.rlim_cur != RLIM_INFINITY) { + pq->nalloc = (int) limits.rlim_cur; + } else if (limits.rlim_max != RLIM_INFINITY) { + pq->nalloc = (int) limits.rlim_max; + } + pq->nalloc = (int) limits.rlim_max; + } +#endif + if (pq->nalloc == 0) { + // 5K files default. If you need more, either set + // rlimit properly, or + pq->nalloc = 5000; + } + if (pq->nalloc < 20) { // minimum allowed per POSIX + pq->nalloc = 20; + } + if (((pq->pfds = NNI_ALLOC_STRUCTS(pq->pfds, pq->nalloc)) == NULL) || + ((pq->fds = NNI_ALLOC_STRUCTS(pq->fds, pq->nalloc)) == NULL)) { + return (NNG_ENOMEM); + } if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { return (rv); |
