diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-10 15:02:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-10 15:02:38 -0700 |
| commit | 795aebbee77bb74d8792df96dfe1aa79ec9548fc (patch) | |
| tree | 58c16424c16b9e71cebdceaee4507ab6608f80da /src/platform/posix/posix_pollq_poll.c | |
| parent | de90f97167d2df6739db47b2c6aad85f06250270 (diff) | |
| download | nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.gz nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.tar.bz2 nng-795aebbee77bb74d8792df96dfe1aa79ec9548fc.zip | |
Give up on uncrustify; switch to clang-format.
Diffstat (limited to 'src/platform/posix/posix_pollq_poll.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 104 |
1 files changed, 45 insertions, 59 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index cd2eea7b..fe1359ec 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -13,15 +13,14 @@ #ifdef NNG_USE_POSIX_POLLQ_POLL #include <errno.h> +#include <fcntl.h> +#include <poll.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <sys/uio.h> -#include <fcntl.h> #include <unistd.h> -#include <poll.h> - // POSIX AIO using poll(). We use a single poll thread to perform // I/O operations for the entire system. This isn't entirely scalable, @@ -33,33 +32,31 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - struct pollfd * fds; - struct pollfd * newfds; - int nfds; - int nnewfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - int close; // request for worker to exit - int started; - nni_thr thr; // worker thread - nni_list nodes; // poll list - nni_list notify; // notify list - int nnodes; // num of nodes in nodes list - int cancel; // waiters for cancellation - int inpoll; // poller asleep in poll - - nni_posix_pollq_node * active; // active node (in callback) + nni_mtx mtx; + nni_cv cv; + struct pollfd *fds; + struct pollfd *newfds; + int nfds; + int nnewfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list nodes; // poll list + nni_list notify; // notify list + int nnodes; // num of nodes in nodes list + int cancel; // waiters for cancellation + int inpoll; // poller asleep in poll + + nni_posix_pollq_node *active; // active node (in callback) }; - - static int nni_posix_pollq_poll_grow(nni_posix_pollq *pq) { - int grow = pq->nnodes + 2; // one for us, one for waker - int noldfds; + int grow = pq->nnodes + 2; // one for us, one for waker + int noldfds; struct pollfd *oldfds; struct pollfd *newfds; @@ -72,34 +69,32 @@ nni_posix_pollq_poll_grow(nni_posix_pollq *pq) // Maybe we are adding a *lot* of pipes at once, and have to grow // multiple times before the poller gets scheduled. In that case // toss the old array before we finish. - oldfds = pq->newfds; + oldfds = pq->newfds; noldfds = pq->nnewfds; - if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + if ((newfds = nni_alloc(grow * sizeof(struct pollfd))) == NULL) { return (NNG_ENOMEM); } - - pq->newfds = newfds; + pq->newfds = newfds; pq->nnewfds = grow; if (noldfds != 0) { - nni_free(oldfds, noldfds * sizeof (struct pollfd)); + nni_free(oldfds, noldfds * sizeof(struct pollfd)); } return (0); } - static void nni_posix_poll_thr(void *arg) { - nni_posix_pollq *pollq = arg; + nni_posix_pollq * pollq = arg; nni_posix_pollq_node *node, *nextnode; nni_mtx_lock(&pollq->mtx); for (;;) { - int rv; - int nfds; + int rv; + int nfds; struct pollfd *fds; if (pollq->close) { @@ -109,28 +104,28 @@ nni_posix_poll_thr(void *arg) if (pollq->newfds != NULL) { // We have "grown" by the caller. Free up the old // space, and start using the new. - nni_free(pollq->fds, - pollq->nfds * sizeof (struct pollfd)); - pollq->fds = pollq->newfds; - pollq->nfds = pollq->nnewfds; + nni_free( + pollq->fds, pollq->nfds * sizeof(struct pollfd)); + pollq->fds = pollq->newfds; + pollq->nfds = pollq->nnewfds; pollq->newfds = NULL; } - fds = pollq->fds; + fds = pollq->fds; nfds = 0; // The waker pipe is set up so that we will be woken // when it is written (this allows us to be signaled). - fds[nfds].fd = pollq->wakerfd; - fds[nfds].events = POLLIN; + fds[nfds].fd = pollq->wakerfd; + fds[nfds].events = POLLIN; fds[nfds].revents = 0; nfds++; // Set up the poll list. NNI_LIST_FOREACH (&pollq->nodes, node) { - fds[nfds].fd = node->fd; - fds[nfds].events = node->armed; + fds[nfds].fd = node->fd; + fds[nfds].events = node->armed; fds[nfds].revents = 0; - node->index = nfds; + node->index = nfds; nfds++; } @@ -150,7 +145,6 @@ nni_posix_poll_thr(void *arg) continue; } - // If the waker pipe was signaled, read from it. if (fds[0].revents & POLLIN) { NNI_ASSERT(fds[0].fd == pollq->wakerfd); @@ -183,7 +177,7 @@ nni_posix_poll_thr(void *arg) } // Clear the index for the next time around. - node->index = 0; + node->index = 0; node->revents = fds[index].revents; // Now we move this node to the callback list. @@ -216,7 +210,6 @@ nni_posix_poll_thr(void *arg) nni_mtx_unlock(&pollq->mtx); } - void nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node) { @@ -238,7 +231,6 @@ nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node) nni_mtx_unlock(&pq->mtx); } - int nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node) { @@ -278,13 +270,12 @@ nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node) return (0); } - static void nni_posix_pollq_fini(nni_posix_pollq *pq) { if (pq->started) { nni_mtx_lock(&pq->mtx); - pq->close = 1; + pq->close = 1; pq->started = 0; nni_plat_pipe_raise(pq->wakewfd); @@ -298,12 +289,11 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); pq->wakewfd = pq->wakerfd = -1; } - nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd)); - nni_free(pq->fds, pq->nfds * sizeof (struct pollfd)); + nni_free(pq->newfds, pq->nnewfds * sizeof(struct pollfd)); + nni_free(pq->fds, pq->nfds * sizeof(struct pollfd)); nni_mtx_fini(&pq->mtx); } - static int nni_posix_pollq_init(nni_posix_pollq *pq) { @@ -313,7 +303,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node); pq->wakewfd = -1; pq->wakerfd = -1; - pq->close = 0; + pq->close = 0; if (((rv = nni_mtx_init(&pq->mtx)) != 0) || ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || @@ -328,7 +318,6 @@ nni_posix_pollq_init(nni_posix_pollq *pq) return (0); } - // We use a single pollq for the entire system, which means only a single // thread is polling. This may be somewhat less than optimally efficient, // and it may be worth investigating having multiple threads to improve @@ -344,7 +333,6 @@ nni_posix_pollq_get(int fd) return (&nni_posix_global_pollq); } - int nni_posix_pollq_sysinit(void) { @@ -354,14 +342,12 @@ nni_posix_pollq_sysinit(void) return (rv); } - void nni_posix_pollq_sysfini(void) { nni_posix_pollq_fini(&nni_posix_global_pollq); } - #else // Suppress empty symbols warnings in ranlib. |
