aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pollq_poll.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-10 15:02:38 -0700
commit795aebbee77bb74d8792df96dfe1aa79ec9548fc (patch)
tree58c16424c16b9e71cebdceaee4507ab6608f80da /src/platform/posix/posix_pollq_poll.c
parentde90f97167d2df6739db47b2c6aad85f06250270 (diff)
downloadnng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.gz
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.bz2
nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.zip
Give up on uncrustify; switch to clang-format.
Diffstat (limited to 'src/platform/posix/posix_pollq_poll.c')
-rw-r--r--src/platform/posix/posix_pollq_poll.c104
1 files changed, 45 insertions, 59 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index cd2eea7b..fe1359ec 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -13,15 +13,14 @@
#ifdef NNG_USE_POSIX_POLLQ_POLL
#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/types.h>
#include <sys/uio.h>
-#include <fcntl.h>
#include <unistd.h>
-#include <poll.h>
-
// POSIX AIO using poll(). We use a single poll thread to perform
// I/O operations for the entire system. This isn't entirely scalable,
@@ -33,33 +32,31 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd * fds;
- struct pollfd * newfds;
- int nfds;
- int nnewfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list nodes; // poll list
- nni_list notify; // notify list
- int nnodes; // num of nodes in nodes list
- int cancel; // waiters for cancellation
- int inpoll; // poller asleep in poll
-
- nni_posix_pollq_node * active; // active node (in callback)
+ nni_mtx mtx;
+ nni_cv cv;
+ struct pollfd *fds;
+ struct pollfd *newfds;
+ int nfds;
+ int nnewfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list nodes; // poll list
+ nni_list notify; // notify list
+ int nnodes; // num of nodes in nodes list
+ int cancel; // waiters for cancellation
+ int inpoll; // poller asleep in poll
+
+ nni_posix_pollq_node *active; // active node (in callback)
};
-
-
static int
nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
{
- int grow = pq->nnodes + 2; // one for us, one for waker
- int noldfds;
+ int grow = pq->nnodes + 2; // one for us, one for waker
+ int noldfds;
struct pollfd *oldfds;
struct pollfd *newfds;
@@ -72,34 +69,32 @@ nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
// Maybe we are adding a *lot* of pipes at once, and have to grow
// multiple times before the poller gets scheduled. In that case
// toss the old array before we finish.
- oldfds = pq->newfds;
+ oldfds = pq->newfds;
noldfds = pq->nnewfds;
- if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ if ((newfds = nni_alloc(grow * sizeof(struct pollfd))) == NULL) {
return (NNG_ENOMEM);
}
-
- pq->newfds = newfds;
+ pq->newfds = newfds;
pq->nnewfds = grow;
if (noldfds != 0) {
- nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ nni_free(oldfds, noldfds * sizeof(struct pollfd));
}
return (0);
}
-
static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq *pollq = arg;
+ nni_posix_pollq * pollq = arg;
nni_posix_pollq_node *node, *nextnode;
nni_mtx_lock(&pollq->mtx);
for (;;) {
- int rv;
- int nfds;
+ int rv;
+ int nfds;
struct pollfd *fds;
if (pollq->close) {
@@ -109,28 +104,28 @@ nni_posix_poll_thr(void *arg)
if (pollq->newfds != NULL) {
// We have "grown" by the caller. Free up the old
// space, and start using the new.
- nni_free(pollq->fds,
- pollq->nfds * sizeof (struct pollfd));
- pollq->fds = pollq->newfds;
- pollq->nfds = pollq->nnewfds;
+ nni_free(
+ pollq->fds, pollq->nfds * sizeof(struct pollfd));
+ pollq->fds = pollq->newfds;
+ pollq->nfds = pollq->nnewfds;
pollq->newfds = NULL;
}
- fds = pollq->fds;
+ fds = pollq->fds;
nfds = 0;
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
- fds[nfds].fd = pollq->wakerfd;
- fds[nfds].events = POLLIN;
+ fds[nfds].fd = pollq->wakerfd;
+ fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
nfds++;
// Set up the poll list.
NNI_LIST_FOREACH (&pollq->nodes, node) {
- fds[nfds].fd = node->fd;
- fds[nfds].events = node->armed;
+ fds[nfds].fd = node->fd;
+ fds[nfds].events = node->armed;
fds[nfds].revents = 0;
- node->index = nfds;
+ node->index = nfds;
nfds++;
}
@@ -150,7 +145,6 @@ nni_posix_poll_thr(void *arg)
continue;
}
-
// If the waker pipe was signaled, read from it.
if (fds[0].revents & POLLIN) {
NNI_ASSERT(fds[0].fd == pollq->wakerfd);
@@ -183,7 +177,7 @@ nni_posix_poll_thr(void *arg)
}
// Clear the index for the next time around.
- node->index = 0;
+ node->index = 0;
node->revents = fds[index].revents;
// Now we move this node to the callback list.
@@ -216,7 +210,6 @@ nni_posix_poll_thr(void *arg)
nni_mtx_unlock(&pollq->mtx);
}
-
void
nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node)
{
@@ -238,7 +231,6 @@ nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node)
nni_mtx_unlock(&pq->mtx);
}
-
int
nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node)
{
@@ -278,13 +270,12 @@ nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node)
return (0);
}
-
static void
nni_posix_pollq_fini(nni_posix_pollq *pq)
{
if (pq->started) {
nni_mtx_lock(&pq->mtx);
- pq->close = 1;
+ pq->close = 1;
pq->started = 0;
nni_plat_pipe_raise(pq->wakewfd);
@@ -298,12 +289,11 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
pq->wakewfd = pq->wakerfd = -1;
}
- nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd));
- nni_free(pq->fds, pq->nfds * sizeof (struct pollfd));
+ nni_free(pq->newfds, pq->nnewfds * sizeof(struct pollfd));
+ nni_free(pq->fds, pq->nfds * sizeof(struct pollfd));
nni_mtx_fini(&pq->mtx);
}
-
static int
nni_posix_pollq_init(nni_posix_pollq *pq)
{
@@ -313,7 +303,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node);
pq->wakewfd = -1;
pq->wakerfd = -1;
- pq->close = 0;
+ pq->close = 0;
if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
@@ -328,7 +318,6 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
return (0);
}
-
// We use a single pollq for the entire system, which means only a single
// thread is polling. This may be somewhat less than optimally efficient,
// and it may be worth investigating having multiple threads to improve
@@ -344,7 +333,6 @@ nni_posix_pollq_get(int fd)
return (&nni_posix_global_pollq);
}
-
int
nni_posix_pollq_sysinit(void)
{
@@ -354,14 +342,12 @@ nni_posix_pollq_sysinit(void)
return (rv);
}
-
void
nni_posix_pollq_sysfini(void)
{
nni_posix_pollq_fini(&nni_posix_global_pollq);
}
-
#else
// Suppress empty symbols warnings in ranlib.