From 1514a532617b1fa7d4568a79a9d8ae24ceba1be9 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 21 Dec 2024 10:57:52 -0800 Subject: epoll: fixes for races and early wakeups --- src/platform/posix/posix_pollq_epoll.c | 110 ++++++++++++++------------------- src/platform/posix/posix_pollq_epoll.h | 6 +- 2 files changed, 51 insertions(+), 65 deletions(-) (limited to 'src/platform') diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index e789e5d2..675d0b22 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -32,25 +32,11 @@ #define NNI_MAX_EPOLL_EVENTS 64 -// flags we always want enabled as long as at least one event is active -#define NNI_EPOLL_FLAGS ((unsigned) EPOLLONESHOT | (unsigned) EPOLLERR) - -// Locking strategy: -// -// The pollq mutex protects its own reapq, close state, and the close -// state of the individual pfds. It also protects the pfd cv, which is -// only signaled when the pfd is closed. This mutex is only acquired -// when shutting down the pollq, or closing a pfd. For normal hot-path -// operations we don't need it. -// -// The pfd mutex protects the pfd's own "closing" flag (test and set), -// the callback and arg, and its event mask. This mutex is used a lot, -// but it should be uncontended excepting possibly when closing. - // nni_posix_pollq is a work structure that manages state for the epoll-based // pollq implementation typedef struct nni_posix_pollq { nni_mtx mtx; + nni_cv cv; int epfd; // epoll handle int evfd; // event fd (to wake us for other stuff) bool close; // request for worker to exit @@ -64,8 +50,7 @@ static nni_posix_pollq nni_posix_global_pollq; void nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq; - struct epoll_event ev; + nni_posix_pollq *pq; pq = &nni_posix_global_pollq; @@ -73,33 +58,25 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) (void) fcntl(fd, F_SETFL, O_NONBLOCK); nni_mtx_init(&pfd->mtx); - nni_cv_init(&pfd->cv, &pq->mtx); nni_atomic_flag_reset(&pfd->stopped); - nni_atomic_flag_reset(&pfd->closed); + nni_atomic_flag_reset(&pfd->closing); pfd->pq = pq; pfd->fd = fd; pfd->cb = cb; pfd->arg = arg; pfd->events = 0; - pfd->reap = false; + pfd->closed = false; + pfd->added = false; NNI_LIST_NODE_INIT(&pfd->node); - - // notifications disabled to begin with - memset(&ev, 0, sizeof(ev)); - ev.events = 0; - ev.data.ptr = pfd; - - // if this fails the system is probably out of memory - it will fail in - // arm - (void) epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev); } int nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) { nni_posix_pollq *pq = pfd->pq; + int rv; // NB: We depend on epoll event flags being the same as their POLLIN // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so @@ -108,20 +85,33 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) nni_mtx_lock(&pfd->mtx); struct epoll_event ev; + + if (pfd->closed) { + nni_mtx_unlock(&pfd->mtx); + return (NNG_ECLOSED); + } pfd->events |= events; events = pfd->events; memset(&ev, 0, sizeof(ev)); - ev.events = events | NNI_EPOLL_FLAGS; + ev.events = events | EPOLLONESHOT; ev.data.ptr = pfd; - if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) { - int rv = nni_plat_errno(errno); - nni_mtx_unlock(&pfd->mtx); - return (rv); + // if this fails the system is probably out of memory - it will fail in + // arm with ENOENT most likely. + if (!pfd->added) { + rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pfd->fd, &ev); + if (rv == 0) { + pfd->added = true; + } + } else { + rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev); + } + if (rv != 0) { + rv = nni_plat_errno(errno); } nni_mtx_unlock(&pfd->mtx); - return (0); + return (rv); } int @@ -137,7 +127,7 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) if (pq == NULL) { return; } - if (nni_atomic_flag_test_and_set(&pfd->closed)) { + if (nni_atomic_flag_test_and_set(&pfd->closing)) { return; } @@ -145,6 +135,7 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) struct epoll_event ev; // Not actually used. (void) shutdown(pfd->fd, SHUT_RDWR); + pfd->closed = true; (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); nni_mtx_unlock(&pfd->mtx); } @@ -162,28 +153,31 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd) return; } + nni_posix_pfd_close(pfd); + // We have to synchronize with the pollq thread (unless we are // on that thread!) NNI_ASSERT(!nni_thr_is_self(&pq->thr)); nni_mtx_lock(&pq->mtx); - nni_list_append(&pq->reapq, pfd); - pfd->reap = true; - - // Wake the remote side. For now we assume this always - // succeeds. The only failure modes here occur when we - // have already excessively signaled this (2^64 times - // with no read!!), or when the evfd is closed, or some - // kernel bug occurs. Those errors would manifest as - // a hang waiting for the poller to reap the pfd in fini, - // if it were possible for them to occur. (Barring other - // bugs, it isn't.) - if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) { - nni_panic("BUG! write to epoll fd incorrect!"); - } + if (!pq->close) { + nni_list_append(&pq->reapq, pfd); + + // Wake the remote side. For now we assume this always + // succeeds. The only failure modes here occur when we + // have already excessively signaled this (2^64 times + // with no read!!), or when the evfd is closed, or some + // kernel bug occurs. Those errors would manifest as + // a hang waiting for the poller to reap the pfd in fini, + // if it were possible for them to occur. (Barring other + // bugs, it isn't.) + if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) { + nni_panic("BUG! write to epoll fd incorrect!"); + } - while (pfd->reap) { - nni_cv_wait(&pfd->cv); + while (nni_list_node_active(&pfd->node)) { + nni_cv_wait(&pq->cv); + } } nni_mtx_unlock(&pq->mtx); } @@ -196,12 +190,7 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) return; } - nni_posix_pfd_stop(pfd); - - // We're exclusive now. - (void) close(pfd->fd); - nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); } @@ -211,12 +200,8 @@ nni_posix_pollq_reap(nni_posix_pollq *pq) nni_posix_pfd *pfd; while ((pfd = nni_list_first(&pq->reapq)) != NULL) { nni_list_remove(&pq->reapq, pfd); - - // Let fini know we're done with it, and it's safe to - // remove. - pfd->reap = false; - nni_cv_wake(&pfd->cv); } + nni_cv_wake(&pq->cv); } static void @@ -347,6 +332,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) { (void) close(pq->epfd); diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h index 0be2a4c5..1ce7e3cb 100644 --- a/src/platform/posix/posix_pollq_epoll.h +++ b/src/platform/posix/posix_pollq_epoll.h @@ -21,12 +21,12 @@ struct nni_posix_pfd { int fd; nni_posix_pfd_cb cb; void *arg; - bool reap; unsigned events; nni_mtx mtx; - nni_cv cv; + bool added; + bool closed; nni_atomic_flag stopped; - nni_atomic_flag closed; + nni_atomic_flag closing; }; #define NNI_POLL_IN ((unsigned) POLLIN) -- cgit v1.2.3-70-g09d2