From bcd57a6d7ac799e7f04000a84b6868948fb137c6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 21 Dec 2024 12:41:29 -0800 Subject: poll: fix poll based poller --- src/platform/posix/posix_pollq.h | 2 +- src/platform/posix/posix_pollq_poll.c | 123 +++++++++++++++++++--------------- src/platform/posix/posix_pollq_poll.h | 19 +++--- 3 files changed, 78 insertions(+), 66 deletions(-) (limited to 'src/platform') diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 121834cf..a6306643 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -31,7 +31,7 @@ typedef void (*nni_posix_pfd_cb)(void *, unsigned); #elif defined(NNG_POLLQ_EPOLL) #include "posix_pollq_epoll.h" #elif defined(NNG_POLLQ_POLL) -#include "posix_pollq_epoll.h" +#include "posix_pollq_poll.h" #elif defined(NNG_POLLQ_SELECT) #include "posix_pollq_select.h" #else diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 6011ab50..1da1f5df 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -35,12 +35,13 @@ typedef struct nni_posix_pollq { nni_mtx mtx; + nni_cv cv; int wakewfd; // write side of waker pipe int wakerfd; // read side of waker pipe nni_thr thr; // worker thread - nni_list pollq; // armed nodes - nni_list reapq; - nni_list addq; + nni_list pollq; // armed nodes - only updated by thread + nni_list reapq; // list of nodes to reap, protected by mtx + nni_list addq; // list of nodes to add, protected by mtx struct pollfd *fds; nni_posix_pfd **pfds; int nalloc; @@ -68,13 +69,14 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) #endif NNI_LIST_NODE_INIT(&pfd->node); + NNI_LIST_NODE_INIT(&pfd->reap); nni_mtx_init(&pfd->mtx); - nni_cv_init(&pfd->cv, &pq->mtx); pfd->fd = fd; pfd->events = 0; pfd->cb = cb; pfd->arg = arg; pfd->pq = pq; + pfd->reaped = false; nni_mtx_lock(&pq->mtx); nni_list_append(&pq->addq, pfd); nni_mtx_unlock(&pq->mtx); @@ -105,17 +107,19 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd) nni_posix_pfd_close(pfd); nni_mtx_lock(&pq->mtx); - if (!nni_list_active(&pq->pollq, pfd)) { - nni_mtx_unlock(&pq->mtx); - return; - } - nni_list_remove(&pq->pollq, pfd); - - if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { - nni_list_append(&pq->reapq, pfd); - nni_plat_pipe_raise(pq->wakewfd); - while (nni_list_active(&pq->reapq, pfd)) { - nni_cv_wait(&pfd->cv); + if (!pfd->reaped) { + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + // it might have been on the addq + nni_list_node_remove(&pfd->reap); + nni_list_append(&pq->reapq, pfd); + nni_plat_pipe_raise(pq->wakewfd); + while (!pfd->reaped) { + nni_cv_wait(&pq->cv); + } + } else { + nni_list_node_remove(&pfd->node); + nni_list_node_remove(&pfd->reap); + pfd->reaped = true; } } nni_mtx_unlock(&pq->mtx); @@ -133,7 +137,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) // We're exclusive now. (void) close(pfd->fd); - nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); } @@ -155,41 +158,17 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) return (0); } -static void -posix_poll_add(nni_posix_pollq *pq) -{ - nni_posix_pfd *pfd; - // Also lets reap anything that was in the reaplist! - while ((pfd = nni_list_first(&pq->addq)) != NULL) { - nni_list_remove(&pq->addq, pfd); - nni_list_append(&pq->pollq, pfd); - } -} - -static void -posix_poll_reap(nni_posix_pollq *pq, unsigned events) -{ - nni_posix_pfd *pfd; - if (events & POLLHUP) { - pq->closed = true; - } - while ((pfd = nni_list_first(&pq->reapq)) != NULL) { - nni_list_remove(&pq->reapq, pfd); - nni_cv_wake(&pfd->cv); - } -} - static void nni_posix_poll_thr(void *arg) { nni_posix_pollq *pq = arg; struct pollfd *fds = pq->fds; nni_posix_pfd **pfds = pq->pfds; + nni_posix_pfd *pfd; + int nfds; + unsigned events; for (;;) { - int nfds; - unsigned events; - nni_posix_pfd *pfd; // The waker pipe is set up so that we will be woken // when it is written (this allows us to be signaled). @@ -210,7 +189,6 @@ nni_posix_poll_thr(void *arg) fds[nfds].fd = pfd->fd; fds[nfds].events = events; fds[nfds].revents = 0; - pfds[pfd->fd] = pfd; nfds++; } } @@ -225,6 +203,8 @@ nni_posix_poll_thr(void *arg) // If the waker pipe was signaled, read from it. + bool update = false; + bool stop = false; for (int i = 0; i < nfds; i++) { int fd = fds[i].fd; events = fds[i].revents; @@ -233,16 +213,12 @@ nni_posix_poll_thr(void *arg) continue; } if (pfd == NULL || fd == pq->wakerfd) { - nni_plat_pipe_clear(pq->wakerfd); - nni_mtx_lock(&pq->mtx); - posix_poll_add(pq); - posix_poll_reap(pq, events); - nni_mtx_unlock(&pq->mtx); - if (events & POLLHUP) { - return; + update = true; + if (fd == pq->wakerfd && + ((events & POLLHUP) != 0)) { + stop = true; } } else { - if ((events & (POLLIN | POLLOUT)) != 0) { // don't emit pollhup yet, we want // to finish reading. @@ -255,7 +231,42 @@ nni_posix_poll_thr(void *arg) pfd->cb(pfd->arg, events); } } + + if (stop) { + break; + } + if (update) { + // process adds first + nni_mtx_lock(&pq->mtx); + nni_plat_pipe_clear(pq->wakerfd); + while ((pfd = nni_list_first(&pq->addq)) != NULL) { + nni_list_remove(&pq->addq, pfd); + nni_list_append(&pq->pollq, pfd); + pfds[pfd->fd] = pfd; + } + // then reaps + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_node_remove(&pfd->node); + nni_list_node_remove(&pfd->reap); + pfd->reaped = true; + pfds[pfd->fd] = NULL; + } + nni_cv_wake(&pq->cv); + nni_mtx_unlock(&pq->mtx); + } + } + nni_mtx_lock(&pq->mtx); + nni_plat_pipe_clear(pq->wakerfd); + while (((pfd = nni_list_first(&pq->reapq)) != NULL) || + ((pfd = nni_list_first(&pq->addq)) != NULL) || + ((pfd = nni_list_first(&pq->pollq)) != NULL)) { + nni_list_node_remove(&pfd->node); + nni_list_node_remove(&pfd->reap); + pfd->reaped = true; + pfds[pfd->fd] = NULL; } + nni_cv_wake(&pq->cv); + nni_mtx_unlock(&pq->mtx); } static void @@ -266,6 +277,7 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) (void) close(pq->wakewfd); nni_thr_fini(&pq->thr); (void) close(pq->wakerfd); + nni_cv_fini(&pq->cv); nni_mtx_fini(&pq->mtx); if (pq->fds != NULL) { NNI_FREE_STRUCTS(pq->fds, pq->nalloc); @@ -283,8 +295,10 @@ nni_posix_pollq_create(nni_posix_pollq *pq) int rv; NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node); - NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); - NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, reap); + NNI_LIST_INIT(&pq->addq, nni_posix_pfd, reap); + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); pq->closed = false; #if NNG_MAX_OPEN @@ -322,7 +336,6 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (rv); } nni_thr_set_name(&pq->thr, "nng:poll:poll"); - nni_mtx_init(&pq->mtx); nni_thr_run(&pq->thr); return (0); } diff --git a/src/platform/posix/posix_pollq_poll.h b/src/platform/posix/posix_pollq_poll.h index 6ad3cc5b..82467642 100644 --- a/src/platform/posix/posix_pollq_poll.h +++ b/src/platform/posix/posix_pollq_poll.h @@ -13,19 +13,18 @@ #include -typedef struct nni_posix_pollq nni_posix_pollq; - // nni_posix_pfd is the handle used by the poller. It's internals are private // to the poller. struct nni_posix_pfd { - nni_posix_pollq *pq; - int fd; - nni_list_node node; - nni_cv cv; - nni_mtx mtx; - unsigned events; - nni_posix_pfd_cb cb; - void *arg; + struct nni_posix_pollq *pq; + int fd; + nni_list_node node; + nni_list_node reap; + nni_mtx mtx; + unsigned events; + nni_posix_pfd_cb cb; + void *arg; + bool reaped; }; #define NNI_POLL_IN ((unsigned) POLLIN) -- cgit v1.2.3-70-g09d2