aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-22 18:58:46 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-22 18:58:46 -0800
commite3c017d8b4b81ce8d9436e1db11904da452d360e (patch)
tree001ab7b0239e552ce945ed49b8536493c12203ee /src/platform
parentbe379a29b849b1d3e759e14d97ad9b977ed3da4f (diff)
downloadnng-e3c017d8b4b81ce8d9436e1db11904da452d360e.tar.gz
nng-e3c017d8b4b81ce8d9436e1db11904da452d360e.tar.bz2
nng-e3c017d8b4b81ce8d9436e1db11904da452d360e.zip
port events poller (illumos/Solaris): use atomic events mask
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq_port.c112
-rw-r--r--src/platform/posix/posix_pollq_port.h5
2 files changed, 51 insertions, 66 deletions
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 49658f3b..2e3debad 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -30,6 +30,8 @@
typedef struct nni_posix_pollq {
int port; // port id (from port_create)
nni_thr thr; // worker thread
+ nni_mtx mtx;
+ nni_cv cv;
} nni_posix_pollq;
// single global instance for now
@@ -45,15 +47,13 @@ nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg)
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- nni_mtx_init(&pfd->mtx);
- nni_cv_init(&pfd->cv, &pfd->mtx);
- pfd->closed = false;
- pfd->closing = false;
- pfd->fd = fd;
- pfd->pq = pq;
- pfd->cb = cb;
- pfd->arg = arg;
- pfd->data = NULL;
+ nni_atomic_init(&pfd->events);
+ pfd->closed = false;
+ pfd->fd = fd;
+ pfd->pq = pq;
+ pfd->cb = cb;
+ pfd->arg = arg;
+ pfd->data = NULL;
}
int
@@ -71,13 +71,8 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
return;
}
- nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- pfd->closing = true;
- (void) shutdown(pfd->fd, SHUT_RDWR);
- port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
- }
- nni_mtx_unlock(&pfd->mtx);
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
// Send the wake event to the poller to synchronize with it.
// Note that port_send should only really fail if out of memory
@@ -102,11 +97,11 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd)
}
sched_yield(); // try again later...
}
- nni_mtx_lock(&pfd->mtx);
+ nni_mtx_lock(&pq->mtx);
while (!pfd->closed) {
- nni_cv_wait(&pfd->cv);
+ nni_cv_wait(&pq->cv);
}
- nni_mtx_unlock(&pfd->mtx);
+ nni_mtx_unlock(&pq->mtx);
}
void
@@ -122,27 +117,21 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
// We're exclusive now.
(void) close(pfd->fd);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
}
int
nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
{
nni_posix_pollq *pq = pfd->pq;
+ int rv;
+ int ev = (int) events;
- nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- pfd->events |= events;
- if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd,
- (int) pfd->events, pfd) != 0) {
- int rv = nni_plat_errno(errno);
- nni_mtx_unlock(&pfd->mtx);
- return (rv);
- }
+ ev |= ni_atomic_or(&pfd->events, ev);
+ rv = port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, ev, pfd);
+ if (rv != 0) {
+ nni_plat_errno(errno);
}
- nni_mtx_unlock(&pfd->mtx);
- return (0);
+ return (rv);
}
static void
@@ -152,7 +141,7 @@ nni_posix_poll_thr(void *arg)
nni_posix_pollq *pq = arg;
port_event_t ev[NNI_MAX_PORTEV];
nni_posix_pfd *pfd;
- unsigned events;
+ int events;
nni_posix_pfd_cb cb;
void *arg;
unsigned n;
@@ -168,40 +157,35 @@ nni_posix_poll_thr(void *arg)
// We run through the returned ports twice. First we
// get the callbacks. Then we do the reaps. This way
// we ensure that we only reap *after* callbacks have run.
+ bool user_wake = false;
for (unsigned i = 0; i < n; i++) {
- if (ev[i].portev_source != PORT_SOURCE_FD) {
+ switch (ev[i].portev_source) {
+ case PORT_SOURCE_USER:
+ user_wake = true;
continue;
- }
- pfd = ev[i].portev_user;
- events = ev[i].portev_events;
-
- nni_mtx_lock(&pfd->mtx);
- cb = pfd->cb;
- arg = pfd->data;
- pfd->events &= ~events;
- nni_mtx_unlock(&pfd->mtx);
-
- if (cb != NULL) {
- cb(pfd, events, arg);
+ case PORT_SOURCE_FD:
+ if (ev[i].portev_source != PORT_SOURCE_FD) {
+ continue;
+ }
+ pfd = ev[i].portev_user;
+ events = ev[i].portev_events;
+
+ cb = pfd->cb;
+ arg = pfd->data;
+ nni_atomic_and(&pfd->events, ~events);
+
+ cb(pfd, (unsigned) events, arg);
}
}
- for (unsigned i = 0; i < n; i++) {
- if (ev[i].portev_source != PORT_SOURCE_USER) {
- continue;
+ if (user_wake) {
+ nni_mtx_lock(&pq->mtx);
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source == PORT_SOURCE_USER) {
+ pfd->closed = true;
+ }
}
-
- // User event telling us to stop doing things.
- // We signal back to use this as a coordination
- // event between the pollq and the thread
- // handler. NOTE: It is absolutely critical
- // that there is only a single thread per
- // pollq. Otherwise we cannot be sure that we
- // are blocked completely,
- pfd = ev[i].portev_user;
- nni_mtx_lock(&pfd->mtx);
- pfd->closed = true;
- nni_cv_wake(&pfd->cv);
- nni_mtx_unlock(&pfd->mtx);
+ nni_cv_wake(&pq->cv);
+ nni_mtx_unlock(&pq->mtx);
}
}
}
@@ -210,6 +194,8 @@ static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
(void) close(pq->port);
+ nni_cv_destroy(&pq->cv);
+ nni_mtx_fini(&pq->mtx);
nni_thr_fini(&pq->thr);
}
@@ -228,6 +214,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
}
nni_thr_set_name(&pq->thr, "nng:poll:port");
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
diff --git a/src/platform/posix/posix_pollq_port.h b/src/platform/posix/posix_pollq_port.h
index cbeab694..d8771527 100644
--- a/src/platform/posix/posix_pollq_port.h
+++ b/src/platform/posix/posix_pollq_port.h
@@ -20,11 +20,8 @@ typedef struct nni_posix_pollq nni_posix_pollq;
struct nni_posix_pfd {
nni_posix_pollq *pq;
int fd;
- nni_mtx mtx;
- nni_cv cv;
- unsigned events;
+ nni_atomic_int events;
bool closed;
- bool closing;
nni_posix_pfd_cb cb;
void *data;
};