aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h5
-rw-r--r--src/platform/posix/posix_atomic.c51
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c23
-rw-r--r--src/platform/posix/posix_pollq_kqueue.h3
-rw-r--r--src/platform/posix/posix_pollq_poll.c22
-rw-r--r--src/platform/posix/posix_pollq_poll.h3
-rw-r--r--src/platform/windows/win_thread.c12
7 files changed, 84 insertions, 35 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index 2f5b5c17..8b7aabbc 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -217,6 +217,11 @@ extern int nni_atomic_get(nni_atomic_int *);
extern void nni_atomic_set(nni_atomic_int *, int);
extern int nni_atomic_swap(nni_atomic_int *, int);
+// These are used with acquire release semantics.
+// Used for pollers in the POSIX code. They return the previous value.
+extern int nni_atomic_or(nni_atomic_int *, int);
+extern int nni_atomic_and(nni_atomic_int *, int);
+
// These versions are tuned for use as reference
// counters. Relaxed order when possible to increase
// reference count, acquire-release order for dropping
diff --git a/src/platform/posix/posix_atomic.c b/src/platform/posix/posix_atomic.c
index 69c2a2c1..ad608d45 100644
--- a/src/platform/posix/posix_atomic.c
+++ b/src/platform/posix/posix_atomic.c
@@ -1,5 +1,5 @@
//
-// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -72,6 +72,21 @@ nni_atomic_sub(nni_atomic_int *v, int bump)
(void) atomic_fetch_sub_explicit(&v->v, bump, memory_order_relaxed);
}
+// atomic or is used to set events in the posix pollers.. no other use at
+// present. We use acquire release semantics.
+int
+nni_atomic_or(nni_atomic_int *v, int mask)
+{
+ return (atomic_fetch_or_explicit(&v->v, mask, memory_order_acq_rel));
+}
+
+// and is used in the pollers to clear the events that we have processed
+int
+nni_atomic_and(nni_atomic_int *v, int mask)
+{
+ return (atomic_fetch_and_explicit(&v->v, mask, memory_order_acq_rel));
+}
+
int
nni_atomic_get(nni_atomic_int *v)
{
@@ -338,6 +353,18 @@ nni_atomic_dec_nv(nni_atomic_int *v)
return (__atomic_sub_fetch(&v->v, 1, __ATOMIC_SEQ_CST));
}
+int
+nni_atomic_or(nni_atomic_int *v, int mask)
+{
+ return (__atomic_fetch_or(&v->v, mask, __ATOMIC_ACQ_REL));
+}
+
+int
+nni_atomic_and(nni_atomic_int *v, int mask)
+{
+ return (__atomic_fetch_and(&v->v, mask, __ATOMIC_ACQ_REL));
+}
+
bool
nni_atomic_cas(nni_atomic_int *v, int comp, int new)
{
@@ -571,6 +598,28 @@ nni_atomic_swap(nni_atomic_int *v, int i)
return (rv);
}
+int
+nni_atomic_or(nni_atomic_int *v, int mask)
+{
+ int rv;
+ pthread_mutex_lock(&plat_atomic_lock);
+ rv = v->v;
+ v->v |= mask;
+ pthread_mutex_unlock(&plat_atomic_lock);
+ return (rv);
+}
+
+int
+nni_atomic_and(nni_atomic_int *v, int mask)
+{
+ int rv;
+ pthread_mutex_lock(&plat_atomic_lock);
+ rv = v->v;
+ v->v &= mask;
+ pthread_mutex_unlock(&plat_atomic_lock);
+ return (rv);
+}
+
void
nni_atomic_inc(nni_atomic_int *v)
{
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index c754535a..f1f742db 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -60,14 +60,13 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
pq = &nni_posix_global_pollq;
- nni_mtx_init(&pf->mtx);
+ nni_atomic_init(&pf->events);
nni_cv_init(&pf->cv, &pq->mtx);
- pf->pq = pq;
- pf->fd = fd;
- pf->cb = cb;
- pf->arg = arg;
- pf->events = 0;
+ pf->pq = pq;
+ pf->fd = fd;
+ pf->cb = cb;
+ pf->arg = arg;
nni_atomic_flag_reset(&pf->closed);
nni_atomic_flag_reset(&pf->stopped);
@@ -150,7 +149,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pf)
(void) close(pf->fd);
nni_cv_fini(&pf->cv);
- nni_mtx_fini(&pf->mtx);
}
int
@@ -167,16 +165,13 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
unsigned flags = EV_ENABLE | EV_DISPATCH | EV_CLEAR;
nni_posix_pollq *pq = pf->pq;
- nni_mtx_lock(&pf->mtx);
- pf->events |= events;
- events = pf->events;
- nni_mtx_unlock(&pf->mtx);
-
if (events == 0) {
// No events, and kqueue is oneshot, so nothing to do.
return (0);
}
+ nni_atomic_or(&pf->events, (int) events);
+
if (events & NNI_POLL_IN) {
EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
@@ -244,9 +239,7 @@ nni_posix_poll_thr(void *arg)
revents |= NNI_POLL_HUP;
}
- nni_mtx_lock(&pf->mtx);
- pf->events &= ~(revents);
- nni_mtx_unlock(&pf->mtx);
+ nni_atomic_and(&pf->events, (int) (~revents));
pf->cb(pf->arg, revents);
}
diff --git a/src/platform/posix/posix_pollq_kqueue.h b/src/platform/posix/posix_pollq_kqueue.h
index a153b363..69cf6dae 100644
--- a/src/platform/posix/posix_pollq_kqueue.h
+++ b/src/platform/posix/posix_pollq_kqueue.h
@@ -22,9 +22,8 @@ struct nni_posix_pfd {
nni_atomic_flag closed;
nni_atomic_flag stopped;
bool reaped;
- unsigned events;
+ nni_atomic_int events;
nni_cv cv; // signaled when poller has unregistered
- nni_mtx mtx;
};
#define NNI_POLL_IN (0x0001)
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index e5b095ff..76a9d3bc 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -70,13 +70,12 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
NNI_LIST_NODE_INIT(&pfd->node);
NNI_LIST_NODE_INIT(&pfd->reap);
- nni_mtx_init(&pfd->mtx);
pfd->fd = fd;
- pfd->events = 0;
pfd->cb = cb;
pfd->arg = arg;
pfd->pq = pq;
pfd->reaped = false;
+ nni_atomic_init(&pfd->events);
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
@@ -137,7 +136,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
// We're exclusive now.
(void) close(pfd->fd);
- nni_mtx_fini(&pfd->mtx);
}
int
@@ -145,9 +143,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
{
nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&pfd->mtx);
- pfd->events |= events;
- nni_mtx_unlock(&pfd->mtx);
+ (void) nni_atomic_or(&pfd->events, (int) events);
// If we're running on the callback, then don't bother to kick
// the pollq again. This is necessary because we cannot modify
@@ -166,7 +162,7 @@ nni_posix_poll_thr(void *arg)
nni_posix_pfd **pfds = pq->pfds;
nni_posix_pfd *pfd;
int nfds;
- unsigned events;
+ int events;
for (;;) {
@@ -181,13 +177,11 @@ nni_posix_poll_thr(void *arg)
// Set up the poll list.
NNI_LIST_FOREACH (&pq->pollq, pfd) {
- nni_mtx_lock(&pfd->mtx);
- events = pfd->events;
- nni_mtx_unlock(&pfd->mtx);
+ events = nni_atomic_get(&pfd->events);
if (events != 0) {
fds[nfds].fd = pfd->fd;
- fds[nfds].events = events;
+ fds[nfds].events = (unsigned) events;
fds[nfds].revents = 0;
nfds++;
}
@@ -207,7 +201,7 @@ nni_posix_poll_thr(void *arg)
bool stop = false;
for (int i = 0; i < nfds; i++) {
int fd = fds[i].fd;
- events = fds[i].revents;
+ events = (int) fds[i].revents;
pfd = pfds[fd];
if (events == 0) {
continue;
@@ -224,9 +218,7 @@ nni_posix_poll_thr(void *arg)
// to finish reading.
events &= ~POLLHUP;
}
- nni_mtx_lock(&pfd->mtx);
- pfd->events &= ~events;
- nni_mtx_unlock(&pfd->mtx);
+ (void) nni_atomic_and(&pfd->events, ~events);
pfd->cb(pfd->arg, events);
}
diff --git a/src/platform/posix/posix_pollq_poll.h b/src/platform/posix/posix_pollq_poll.h
index 82467642..b96d2451 100644
--- a/src/platform/posix/posix_pollq_poll.h
+++ b/src/platform/posix/posix_pollq_poll.h
@@ -20,8 +20,7 @@ struct nni_posix_pfd {
int fd;
nni_list_node node;
nni_list_node reap;
- nni_mtx mtx;
- unsigned events;
+ nni_atomic_int events;
nni_posix_pfd_cb cb;
void *arg;
bool reaped;
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index 9e74056d..c76209c1 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -325,6 +325,18 @@ nni_atomic_init(nni_atomic_int *v)
InterlockedExchange(&v->v, 0);
}
+int
+nni_atomic_or(nni_atomic_int *v, int mask)
+{
+ return (InterlockedOr(&v->v, mask));
+}
+
+int
+nni_atomic_and(nni_atomic_int *v, int mask)
+{
+ return (InterlockedAnd(&v->v, mask));
+}
+
void
nni_atomic_inc(nni_atomic_int *v)
{