aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-21 10:57:52 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-21 11:21:47 -0800
commit1514a532617b1fa7d4568a79a9d8ae24ceba1be9 (patch)
treeb50db6f1cf9d73786ad5b9a440c679876a191897
parent5d88c0e80556effb03bc343a1bcd6847500cdac2 (diff)
downloadnng-1514a532617b1fa7d4568a79a9d8ae24ceba1be9.tar.gz
nng-1514a532617b1fa7d4568a79a9d8ae24ceba1be9.tar.bz2
nng-1514a532617b1fa7d4568a79a9d8ae24ceba1be9.zip
epoll: fixes for races and early wakeups
-rw-r--r--src/platform/posix/posix_pollq_epoll.c110
-rw-r--r--src/platform/posix/posix_pollq_epoll.h6
2 files changed, 51 insertions, 65 deletions
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index e789e5d2..675d0b22 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -32,25 +32,11 @@
#define NNI_MAX_EPOLL_EVENTS 64
-// flags we always want enabled as long as at least one event is active
-#define NNI_EPOLL_FLAGS ((unsigned) EPOLLONESHOT | (unsigned) EPOLLERR)
-
-// Locking strategy:
-//
-// The pollq mutex protects its own reapq, close state, and the close
-// state of the individual pfds. It also protects the pfd cv, which is
-// only signaled when the pfd is closed. This mutex is only acquired
-// when shutting down the pollq, or closing a pfd. For normal hot-path
-// operations we don't need it.
-//
-// The pfd mutex protects the pfd's own "closing" flag (test and set),
-// the callback and arg, and its event mask. This mutex is used a lot,
-// but it should be uncontended excepting possibly when closing.
-
// nni_posix_pollq is a work structure that manages state for the epoll-based
// pollq implementation
typedef struct nni_posix_pollq {
nni_mtx mtx;
+ nni_cv cv;
int epfd; // epoll handle
int evfd; // event fd (to wake us for other stuff)
bool close; // request for worker to exit
@@ -64,8 +50,7 @@ static nni_posix_pollq nni_posix_global_pollq;
void
nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq;
- struct epoll_event ev;
+ nni_posix_pollq *pq;
pq = &nni_posix_global_pollq;
@@ -73,33 +58,25 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
nni_mtx_init(&pfd->mtx);
- nni_cv_init(&pfd->cv, &pq->mtx);
nni_atomic_flag_reset(&pfd->stopped);
- nni_atomic_flag_reset(&pfd->closed);
+ nni_atomic_flag_reset(&pfd->closing);
pfd->pq = pq;
pfd->fd = fd;
pfd->cb = cb;
pfd->arg = arg;
pfd->events = 0;
- pfd->reap = false;
+ pfd->closed = false;
+ pfd->added = false;
NNI_LIST_NODE_INIT(&pfd->node);
-
- // notifications disabled to begin with
- memset(&ev, 0, sizeof(ev));
- ev.events = 0;
- ev.data.ptr = pfd;
-
- // if this fails the system is probably out of memory - it will fail in
- // arm
- (void) epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev);
}
int
nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
{
nni_posix_pollq *pq = pfd->pq;
+ int rv;
// NB: We depend on epoll event flags being the same as their POLLIN
// equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so
@@ -108,20 +85,33 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
nni_mtx_lock(&pfd->mtx);
struct epoll_event ev;
+
+ if (pfd->closed) {
+ nni_mtx_unlock(&pfd->mtx);
+ return (NNG_ECLOSED);
+ }
pfd->events |= events;
events = pfd->events;
memset(&ev, 0, sizeof(ev));
- ev.events = events | NNI_EPOLL_FLAGS;
+ ev.events = events | EPOLLONESHOT;
ev.data.ptr = pfd;
- if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
- int rv = nni_plat_errno(errno);
- nni_mtx_unlock(&pfd->mtx);
- return (rv);
+ // if this fails the system is probably out of memory - it will fail in
+ // arm with ENOENT most likely.
+ if (!pfd->added) {
+ rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pfd->fd, &ev);
+ if (rv == 0) {
+ pfd->added = true;
+ }
+ } else {
+ rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev);
+ }
+ if (rv != 0) {
+ rv = nni_plat_errno(errno);
}
nni_mtx_unlock(&pfd->mtx);
- return (0);
+ return (rv);
}
int
@@ -137,7 +127,7 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
if (pq == NULL) {
return;
}
- if (nni_atomic_flag_test_and_set(&pfd->closed)) {
+ if (nni_atomic_flag_test_and_set(&pfd->closing)) {
return;
}
@@ -145,6 +135,7 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
struct epoll_event ev; // Not actually used.
(void) shutdown(pfd->fd, SHUT_RDWR);
+ pfd->closed = true;
(void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
nni_mtx_unlock(&pfd->mtx);
}
@@ -162,28 +153,31 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd)
return;
}
+ nni_posix_pfd_close(pfd);
+
// We have to synchronize with the pollq thread (unless we are
// on that thread!)
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
nni_mtx_lock(&pq->mtx);
- nni_list_append(&pq->reapq, pfd);
- pfd->reap = true;
-
- // Wake the remote side. For now we assume this always
- // succeeds. The only failure modes here occur when we
- // have already excessively signaled this (2^64 times
- // with no read!!), or when the evfd is closed, or some
- // kernel bug occurs. Those errors would manifest as
- // a hang waiting for the poller to reap the pfd in fini,
- // if it were possible for them to occur. (Barring other
- // bugs, it isn't.)
- if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
- nni_panic("BUG! write to epoll fd incorrect!");
- }
+ if (!pq->close) {
+ nni_list_append(&pq->reapq, pfd);
+
+ // Wake the remote side. For now we assume this always
+ // succeeds. The only failure modes here occur when we
+ // have already excessively signaled this (2^64 times
+ // with no read!!), or when the evfd is closed, or some
+ // kernel bug occurs. Those errors would manifest as
+ // a hang waiting for the poller to reap the pfd in fini,
+ // if it were possible for them to occur. (Barring other
+ // bugs, it isn't.)
+ if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
+ nni_panic("BUG! write to epoll fd incorrect!");
+ }
- while (pfd->reap) {
- nni_cv_wait(&pfd->cv);
+ while (nni_list_node_active(&pfd->node)) {
+ nni_cv_wait(&pq->cv);
+ }
}
nni_mtx_unlock(&pq->mtx);
}
@@ -196,12 +190,7 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
return;
}
- nni_posix_pfd_stop(pfd);
-
- // We're exclusive now.
-
(void) close(pfd->fd);
- nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
}
@@ -211,12 +200,8 @@ nni_posix_pollq_reap(nni_posix_pollq *pq)
nni_posix_pfd *pfd;
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pfd);
-
- // Let fini know we're done with it, and it's safe to
- // remove.
- pfd->reap = false;
- nni_cv_wake(&pfd->cv);
}
+ nni_cv_wake(&pq->cv);
}
static void
@@ -347,6 +332,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
(void) close(pq->epfd);
diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h
index 0be2a4c5..1ce7e3cb 100644
--- a/src/platform/posix/posix_pollq_epoll.h
+++ b/src/platform/posix/posix_pollq_epoll.h
@@ -21,12 +21,12 @@ struct nni_posix_pfd {
int fd;
nni_posix_pfd_cb cb;
void *arg;
- bool reap;
unsigned events;
nni_mtx mtx;
- nni_cv cv;
+ bool added;
+ bool closed;
nni_atomic_flag stopped;
- nni_atomic_flag closed;
+ nni_atomic_flag closing;
};
#define NNI_POLL_IN ((unsigned) POLLIN)