aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pollq_poll.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-04 16:15:02 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-04 16:15:02 -0700
commit16a43040ef29f77375d226f669770e64a42d278c (patch)
tree1fd80d0d761fbdf812c257817a3b23ae1dc6519c /src/platform/posix/posix_pollq_poll.c
parent58c5fbb731f50a952864bc500a8efd3b7077ee65 (diff)
downloadnng-16a43040ef29f77375d226f669770e64a42d278c.tar.gz
nng-16a43040ef29f77375d226f669770e64a42d278c.tar.bz2
nng-16a43040ef29f77375d226f669770e64a42d278c.zip
Separate out poller/pollq from basic socket operations.
Diffstat (limited to 'src/platform/posix/posix_pollq_poll.c')
-rw-r--r--src/platform/posix/posix_pollq_poll.c367
1 files changed, 367 insertions, 0 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
new file mode 100644
index 00000000..a70c902d
--- /dev/null
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -0,0 +1,367 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
+
+#ifdef NNG_USE_POSIX_POLLQ_POLL
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+// POSIX AIO using poll(). We use a single poll thread to perform
+// I/O operations for the entire system. This isn't entirely scalable,
+// and it might be a good idea to create a few threads and group the
+// I/O operations into separate pollers to limit the amount of work each
+// thread does, and to scale across multiple cores. For now we don't
+// worry about it.
+
+// nni_posix_pollq is a work structure used by the poller thread, that keeps
+// track of all the underlying pipe handles and so forth being used by poll().
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ nni_cv cv;
+ struct pollfd * fds;
+ struct pollfd * newfds;
+ int nfds;
+ int nnewfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list nodes; // poll list
+ nni_list notify; // notify list
+ int nnodes; // num of nodes in nodes list
+ int cancel; // waiters for cancellation
+ int inpoll; // poller asleep in poll
+
+ nni_posix_pollq_node * active; // active node (in callback)
+};
+
+
+
+static int
+nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->nnodes + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pollq = arg;
+ nni_posix_pollq_node *node, *nextnode;
+
+ nni_mtx_lock(&pollq->mtx);
+ for (;;) {
+ int rv;
+ int nfds;
+ struct pollfd *fds;
+
+ if (pollq->close) {
+ break;
+ }
+
+ if (pollq->newfds != NULL) {
+ // We have "grown" by the caller. Free up the old
+ // space, and start using the new.
+ nni_free(pollq->fds,
+ pollq->nfds * sizeof (struct pollfd));
+ pollq->fds = pollq->newfds;
+ pollq->nfds = pollq->nnewfds;
+ pollq->newfds = NULL;
+ }
+ fds = pollq->fds;
+ nfds = 0;
+
+ // The waker pipe is set up so that we will be woken
+ // when it is written (this allows us to be signaled).
+ fds[nfds].fd = pollq->wakerfd;
+ fds[nfds].events = POLLIN;
+ fds[nfds].revents = 0;
+ nfds++;
+
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pollq->nodes, node) {
+ fds[nfds].fd = node->fd;
+ fds[nfds].events = node->armed;
+ fds[nfds].revents = 0;
+ node->index = nfds;
+ nfds++;
+ }
+
+ // Now poll it. We block indefinitely, since we use separate
+ // timeouts to wake and remove the elements from the list.
+ pollq->inpoll = 1;
+ nni_mtx_unlock(&pollq->mtx);
+ rv = poll(fds, nfds, -1);
+ nni_mtx_lock(&pollq->mtx);
+ pollq->inpoll = 0;
+
+ if (rv < 0) {
+ // This shouldn't happen really. If it does, we
+ // just try again. (EINTR is probably the only
+ // reasonable failure here, unless internal memory
+ // allocations fail in the kernel, leading to EAGAIN.)
+ continue;
+ }
+
+
+ // If the waker pipe was signaled, read from it.
+ if (fds[0].revents & POLLIN) {
+ NNI_ASSERT(fds[0].fd == pollq->wakerfd);
+ nni_plat_pipe_clear(pollq->wakerfd);
+ }
+
+ // Now we iterate through all the nodes. Note that one
+ // may have been added or removed. New pipedescs will have
+ // their index set to -1. Removed ones will just be absent.
+ // Note that we may remove the pipedesc from the list, so we
+ // have to use a custom iterator.
+ nextnode = nni_list_first(&pollq->nodes);
+ while ((node = nextnode) != NULL) {
+ int index;
+
+ // Save the nextpd for our next iteration. This
+ // way we can remove the PD from the list without
+ // breaking the iteration.
+
+ nextnode = nni_list_next(&pollq->nodes, node);
+
+ // If index is less than 1, then we have just added
+ // this and there is no FD for it in the pollfds.
+ if ((index = node->index) < 1) {
+ continue;
+ }
+
+ // Clear the index for the next time around.
+ node->index = 0;
+
+ node->revents = fds[index].revents;
+
+ // Now we move this node to the callback list.
+ node->armed = 0;
+ nni_list_remove(&pollq->nodes, node);
+ nni_list_append(&pollq->notify, node);
+ pollq->nnodes--;
+ }
+
+ // Finally we can call the callbacks. We record the
+ // active callback so any attempt to cancel blocks until
+ // the callback is finished.
+ while ((node = nni_list_first(&pollq->notify)) != NULL) {
+ nni_list_remove(&pollq->notify, node);
+ if (node->cb != NULL) {
+ pollq->active = node;
+ nni_mtx_unlock(&pollq->mtx);
+ node->cb(node->data);
+ nni_mtx_lock(&pollq->mtx);
+ pollq->active = NULL;
+ }
+ }
+
+ // Wake any cancelers.
+ if (pollq->cancel != 0) {
+ pollq->cancel = 0;
+ nni_cv_wake(&pollq->cv);
+ }
+ }
+ nni_mtx_unlock(&pollq->mtx);
+}
+
+
+void
+nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+{
+ nni_mtx_lock(&pq->mtx);
+ while (pq->active == node) {
+ pq->cancel++;
+ nni_cv_wait(&pq->cv);
+ }
+ if (nni_list_active(&pq->nodes, node)) {
+ node->armed = 0;
+ nni_list_remove(&pq->nodes, node);
+ }
+ // Since we're not removing the fd from the outstanding poll, we
+ // may get an event. In that case, we'll wake and rebuild the
+ // pollset without it, with no further action. Otherwise having the
+ // poll present does no harm beyond the "spurious" wake of the poller
+ // thread. (If we had port_disassociate or somesuch, this would be
+ // a great time for that.)
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+{
+ int wake;
+ int rv;
+ int evs;
+
+ nni_mtx_lock(&pq->mtx);
+
+ if (node->events == 0) {
+ // Nothing to schedule?
+ nni_mtx_unlock(&pq->mtx);
+ return (0);
+ }
+
+ if (node->armed == 0) {
+ NNI_ASSERT(!nni_list_active(&pq->nodes, node));
+
+ rv = nni_posix_pollq_poll_grow(pq);
+ if (rv != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return (rv);
+ }
+
+ nni_list_append(&pq->nodes, node);
+ pq->nnodes++;
+ }
+
+ node->armed |= node->events;
+
+ // Wake up the poller since we're adding a new poll, but only bother
+ // if it's already asleep. (Frequently it will *not* be.)
+ if (pq->inpoll) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+ return (0);
+}
+
+
+static void
+nni_posix_pollq_fini(nni_posix_pollq *pq)
+{
+ if (pq->started) {
+ nni_mtx_lock(&pq->mtx);
+ pq->close = 1;
+ pq->started = 0;
+ nni_plat_pipe_raise(pq->wakewfd);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_empty(&pq->nodes));
+ nni_mtx_unlock(&pq->mtx);
+ }
+
+ nni_thr_fini(&pq->thr);
+ if (pq->wakewfd >= 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ pq->wakewfd = pq->wakerfd = -1;
+ }
+ nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd));
+ nni_free(pq->fds, pq->nfds * sizeof (struct pollfd));
+ nni_mtx_fini(&pq->mtx);
+}
+
+
+static int
+nni_posix_pollq_init(nni_posix_pollq *pq)
+{
+ int rv;
+
+ NNI_LIST_INIT(&pq->nodes, nni_posix_pollq_node, node);
+ NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node);
+ pq->wakewfd = -1;
+ pq->wakerfd = -1;
+ pq->close = 0;
+
+ if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
+ ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
+ ((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
+ ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
+ ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
+ nni_posix_pollq_fini(pq);
+ return (rv);
+ }
+ pq->started = 1;
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+
+// We use a single pollq for the entire system, which means only a single
+// thread is polling. This may be somewhat less than optimally efficient,
+// and it may be worth investigating having multiple threads to improve
+// efficiency and scalability. (This would shorten the linked lists,
+// improving C10K scalability, and also allow us to engage multiple cores.)
+// It's not entirely clear how many threads are "optimal".
+static nni_posix_pollq nni_posix_global_pollq;
+
+nni_posix_pollq *
+nni_posix_pollq_get(int fd)
+{
+ // This is the point where we could choose a pollq based on FD.
+ return (&nni_posix_global_pollq);
+}
+
+
+int
+nni_posix_pollq_sysinit(void)
+{
+ int rv;
+
+ rv = nni_posix_pollq_init(&nni_posix_global_pollq);
+ return (rv);
+}
+
+
+void
+nni_posix_pollq_sysfini(void)
+{
+ nni_posix_pollq_fini(&nni_posix_global_pollq);
+}
+
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_pollq_poll_used = 0;
+
+#endif // NNG_USE_POSIX_POLLQ_POLL