aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-21 12:41:29 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-21 16:54:26 -0800
commitbcd57a6d7ac799e7f04000a84b6868948fb137c6 (patch)
tree58a0cb0c06fc88912b39839a77176dd90097eff6 /src/platform
parentc6babe615b8d96acf25721dc8cdf1c5bf37130b5 (diff)
downloadnng-bcd57a6d7ac799e7f04000a84b6868948fb137c6.tar.gz
nng-bcd57a6d7ac799e7f04000a84b6868948fb137c6.tar.bz2
nng-bcd57a6d7ac799e7f04000a84b6868948fb137c6.zip
poll: fix poll based poller
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq.h2
-rw-r--r--src/platform/posix/posix_pollq_poll.c123
-rw-r--r--src/platform/posix/posix_pollq_poll.h19
3 files changed, 78 insertions, 66 deletions
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 121834cf..a6306643 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -31,7 +31,7 @@ typedef void (*nni_posix_pfd_cb)(void *, unsigned);
#elif defined(NNG_POLLQ_EPOLL)
#include "posix_pollq_epoll.h"
#elif defined(NNG_POLLQ_POLL)
-#include "posix_pollq_epoll.h"
+#include "posix_pollq_poll.h"
#elif defined(NNG_POLLQ_SELECT)
#include "posix_pollq_select.h"
#else
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 6011ab50..1da1f5df 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -35,12 +35,13 @@
typedef struct nni_posix_pollq {
nni_mtx mtx;
+ nni_cv cv;
int wakewfd; // write side of waker pipe
int wakerfd; // read side of waker pipe
nni_thr thr; // worker thread
- nni_list pollq; // armed nodes
- nni_list reapq;
- nni_list addq;
+ nni_list pollq; // armed nodes - only updated by thread
+ nni_list reapq; // list of nodes to reap, protected by mtx
+ nni_list addq; // list of nodes to add, protected by mtx
struct pollfd *fds;
nni_posix_pfd **pfds;
int nalloc;
@@ -68,13 +69,14 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
#endif
NNI_LIST_NODE_INIT(&pfd->node);
+ NNI_LIST_NODE_INIT(&pfd->reap);
nni_mtx_init(&pfd->mtx);
- nni_cv_init(&pfd->cv, &pq->mtx);
pfd->fd = fd;
pfd->events = 0;
pfd->cb = cb;
pfd->arg = arg;
pfd->pq = pq;
+ pfd->reaped = false;
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
@@ -105,17 +107,19 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd)
nni_posix_pfd_close(pfd);
nni_mtx_lock(&pq->mtx);
- if (!nni_list_active(&pq->pollq, pfd)) {
- nni_mtx_unlock(&pq->mtx);
- return;
- }
- nni_list_remove(&pq->pollq, pfd);
-
- if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
- nni_list_append(&pq->reapq, pfd);
- nni_plat_pipe_raise(pq->wakewfd);
- while (nni_list_active(&pq->reapq, pfd)) {
- nni_cv_wait(&pfd->cv);
+ if (!pfd->reaped) {
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ // it might have been on the addq
+ nni_list_node_remove(&pfd->reap);
+ nni_list_append(&pq->reapq, pfd);
+ nni_plat_pipe_raise(pq->wakewfd);
+ while (!pfd->reaped) {
+ nni_cv_wait(&pq->cv);
+ }
+ } else {
+ nni_list_node_remove(&pfd->node);
+ nni_list_node_remove(&pfd->reap);
+ pfd->reaped = true;
}
}
nni_mtx_unlock(&pq->mtx);
@@ -133,7 +137,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
// We're exclusive now.
(void) close(pfd->fd);
- nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
}
@@ -156,40 +159,16 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
}
static void
-posix_poll_add(nni_posix_pollq *pq)
-{
- nni_posix_pfd *pfd;
- // Also lets reap anything that was in the reaplist!
- while ((pfd = nni_list_first(&pq->addq)) != NULL) {
- nni_list_remove(&pq->addq, pfd);
- nni_list_append(&pq->pollq, pfd);
- }
-}
-
-static void
-posix_poll_reap(nni_posix_pollq *pq, unsigned events)
-{
- nni_posix_pfd *pfd;
- if (events & POLLHUP) {
- pq->closed = true;
- }
- while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
- nni_list_remove(&pq->reapq, pfd);
- nni_cv_wake(&pfd->cv);
- }
-}
-
-static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
struct pollfd *fds = pq->fds;
nni_posix_pfd **pfds = pq->pfds;
+ nni_posix_pfd *pfd;
+ int nfds;
+ unsigned events;
for (;;) {
- int nfds;
- unsigned events;
- nni_posix_pfd *pfd;
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
@@ -210,7 +189,6 @@ nni_posix_poll_thr(void *arg)
fds[nfds].fd = pfd->fd;
fds[nfds].events = events;
fds[nfds].revents = 0;
- pfds[pfd->fd] = pfd;
nfds++;
}
}
@@ -225,6 +203,8 @@ nni_posix_poll_thr(void *arg)
// If the waker pipe was signaled, read from it.
+ bool update = false;
+ bool stop = false;
for (int i = 0; i < nfds; i++) {
int fd = fds[i].fd;
events = fds[i].revents;
@@ -233,16 +213,12 @@ nni_posix_poll_thr(void *arg)
continue;
}
if (pfd == NULL || fd == pq->wakerfd) {
- nni_plat_pipe_clear(pq->wakerfd);
- nni_mtx_lock(&pq->mtx);
- posix_poll_add(pq);
- posix_poll_reap(pq, events);
- nni_mtx_unlock(&pq->mtx);
- if (events & POLLHUP) {
- return;
+ update = true;
+ if (fd == pq->wakerfd &&
+ ((events & POLLHUP) != 0)) {
+ stop = true;
}
} else {
-
if ((events & (POLLIN | POLLOUT)) != 0) {
// don't emit pollhup yet, we want
// to finish reading.
@@ -255,7 +231,42 @@ nni_posix_poll_thr(void *arg)
pfd->cb(pfd->arg, events);
}
}
+
+ if (stop) {
+ break;
+ }
+ if (update) {
+ // process adds first
+ nni_mtx_lock(&pq->mtx);
+ nni_plat_pipe_clear(pq->wakerfd);
+ while ((pfd = nni_list_first(&pq->addq)) != NULL) {
+ nni_list_remove(&pq->addq, pfd);
+ nni_list_append(&pq->pollq, pfd);
+ pfds[pfd->fd] = pfd;
+ }
+ // then reaps
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_node_remove(&pfd->node);
+ nni_list_node_remove(&pfd->reap);
+ pfd->reaped = true;
+ pfds[pfd->fd] = NULL;
+ }
+ nni_cv_wake(&pq->cv);
+ nni_mtx_unlock(&pq->mtx);
+ }
+ }
+ nni_mtx_lock(&pq->mtx);
+ nni_plat_pipe_clear(pq->wakerfd);
+ while (((pfd = nni_list_first(&pq->reapq)) != NULL) ||
+ ((pfd = nni_list_first(&pq->addq)) != NULL) ||
+ ((pfd = nni_list_first(&pq->pollq)) != NULL)) {
+ nni_list_node_remove(&pfd->node);
+ nni_list_node_remove(&pfd->reap);
+ pfd->reaped = true;
+ pfds[pfd->fd] = NULL;
}
+ nni_cv_wake(&pq->cv);
+ nni_mtx_unlock(&pq->mtx);
}
static void
@@ -266,6 +277,7 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
(void) close(pq->wakewfd);
nni_thr_fini(&pq->thr);
(void) close(pq->wakerfd);
+ nni_cv_fini(&pq->cv);
nni_mtx_fini(&pq->mtx);
if (pq->fds != NULL) {
NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
@@ -283,8 +295,10 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
int rv;
NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
- NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
- NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, reap);
+ NNI_LIST_INIT(&pq->addq, nni_posix_pfd, reap);
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
pq->closed = false;
#if NNG_MAX_OPEN
@@ -322,7 +336,6 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (rv);
}
nni_thr_set_name(&pq->thr, "nng:poll:poll");
- nni_mtx_init(&pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
diff --git a/src/platform/posix/posix_pollq_poll.h b/src/platform/posix/posix_pollq_poll.h
index 6ad3cc5b..82467642 100644
--- a/src/platform/posix/posix_pollq_poll.h
+++ b/src/platform/posix/posix_pollq_poll.h
@@ -13,19 +13,18 @@
#include <poll.h>
-typedef struct nni_posix_pollq nni_posix_pollq;
-
// nni_posix_pfd is the handle used by the poller. It's internals are private
// to the poller.
struct nni_posix_pfd {
- nni_posix_pollq *pq;
- int fd;
- nni_list_node node;
- nni_cv cv;
- nni_mtx mtx;
- unsigned events;
- nni_posix_pfd_cb cb;
- void *arg;
+ struct nni_posix_pollq *pq;
+ int fd;
+ nni_list_node node;
+ nni_list_node reap;
+ nni_mtx mtx;
+ unsigned events;
+ nni_posix_pfd_cb cb;
+ void *arg;
+ bool reaped;
};
#define NNI_POLL_IN ((unsigned) POLLIN)