aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pollq_poll.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
commit5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch)
tree41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix/posix_pollq_poll.c
parent3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff)
downloadnng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the pollq. This triggers an assertion failure when it occurs. I am still trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix/posix_pollq_poll.c')
-rw-r--r--src/platform/posix/posix_pollq_poll.c144
1 files changed, 85 insertions, 59 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d378c45a..1bc814bf 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -32,20 +32,21 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd *fds;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list armed; // armed nodes
- nni_list idle; // idle nodes
- int nnodes; // num of nodes in nodes list
- int inpoll; // poller asleep in poll
-
+ nni_mtx mtx;
+ nni_cv cv;
+ struct pollfd * fds;
+ int nfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list polled; // polled nodes
+ nni_list armed; // armed nodes
+ nni_list idle; // idle nodes
+ int nnodes; // num of nodes in nodes list
+ int inpoll; // poller asleep in poll
+ nni_posix_pollq_node *wait; // cancel waiting on this
nni_posix_pollq_node *active; // active node (in callback)
};
@@ -78,7 +79,7 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq * pollq = arg;
- nni_posix_pollq_node *node, *nextnode;
+ nni_posix_pollq_node *node;
nni_mtx_lock(&pollq->mtx);
for (;;) {
@@ -101,7 +102,9 @@ nni_posix_poll_thr(void *arg)
nfds++;
// Set up the poll list.
- NNI_LIST_FOREACH (&pollq->armed, node) {
+ while ((node = nni_list_first(&pollq->armed)) != NULL) {
+ nni_list_remove(&pollq->armed, node);
+ nni_list_append(&pollq->polled, node);
fds[nfds].fd = node->fd;
fds[nfds].events = node->events;
fds[nfds].revents = 0;
@@ -131,48 +134,53 @@ nni_posix_poll_thr(void *arg)
nni_plat_pipe_clear(pollq->wakerfd);
}
- // Now we iterate through all the nodes. Note that one
- // may have been added or removed. New pipedescs will have
- // their index set to -1. Removed ones will just be absent.
- // Note that we may remove the pipedesc from the list, so we
- // have to use a custom iterator.
- nextnode = nni_list_first(&pollq->armed);
- while ((node = nextnode) != NULL) {
- int index;
-
- // Save the next node, so that we can remove this
- // one if needed.
- nextnode = nni_list_next(&pollq->armed, node);
-
- // If index is less than 1, then we have just added
- // this and there is no FD for it in the pollfds.
- if ((index = node->index) < 1) {
- continue;
- }
- // Was there any activity?
+ while ((node = nni_list_first(&pollq->polled)) != NULL) {
+ int index = node->index;
+
+ // We remove ourselves from the polled list, and
+ // then put it on either the idle or armed list
+ // depending on whether it remains armed.
+ node->index = 0;
+ nni_list_remove(&pollq->polled, node);
+ NNI_ASSERT(index > 0);
if (fds[index].revents == 0) {
+ // If still watching for events, return it
+ // to the armed list.
+ if (node->events) {
+ nni_list_append(&pollq->armed, node);
+ } else {
+ nni_list_append(&pollq->idle, node);
+ }
continue;
}
- // Clear the index for the next time around.
- node->index = 0;
+ // We are calling the callback, so disarm
+ // all events; the node can rearm them in its
+ // callback.
node->revents = fds[index].revents;
-
- // Execute callbacks. Note that these occur with
- // the lock held.
- if (node->cb != NULL) {
- node->cb(node->data);
+ node->events &= ~node->revents;
+ if (node->events == 0) {
+ nni_list_append(&pollq->idle, node);
} else {
- // No further events for you!
- node->events = 0;
+ nni_list_append(&pollq->armed, node);
}
- // Callback should clear events. If none were
- // rearmed, then move to the idle list so we won't
- // keep looking at it.
- if (node->events == 0) {
- nni_list_remove(&pollq->armed, node);
- nni_list_append(&pollq->idle, node);
+ // Save the active node; we can notice this way
+ // when it is busy, and avoid freeing it until
+ // we are sure that it is not in use.
+ pollq->active = node;
+
+ // Execute the callback -- without locks held.
+ nni_mtx_unlock(&pollq->mtx);
+ node->cb(node->data);
+ nni_mtx_lock(&pollq->mtx);
+
+ // We finished with this node. If something
+ // was blocked waiting for that, wake it up.
+ pollq->active = NULL;
+ if (pollq->wait == node) {
+ pollq->wait = NULL;
+ nni_cv_wake(&pollq->cv);
}
}
}
@@ -188,6 +196,10 @@ nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node)
int rv;
NNI_ASSERT(!nni_list_node_active(&node->node));
+ if (pq->close) {
+ // This shouldn't happen!
+ return (NNG_ECLOSED);
+ }
node->pq = pq;
if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
return (rv);
@@ -221,9 +233,17 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node)
return;
}
nni_mtx_lock(&pq->mtx);
- NNI_ASSERT(nni_list_node_active(&node->node));
- nni_list_node_remove(&node->node);
- pq->nnodes--;
+ while (pq->active == node) {
+ pq->wait = node;
+ nni_cv_wait(&pq->cv);
+ }
+ if (nni_list_node_active(&node->node)) {
+ nni_list_node_remove(&node->node);
+ pq->nnodes--;
+ }
+ if (pq->close) {
+ nni_cv_wake(&pq->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -241,7 +261,10 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
oevents = node->events;
node->events |= events;
- if ((oevents == 0) && (events != 0)) {
+ // We move this to the armed list if its not armed, or already
+ // on the polled list. The polled list would be the case where
+ // the index is set to a positive value.
+ if ((oevents == 0) && (events != 0) && (node->index < 1)) {
if (nni_list_node_active(&node->node)) {
nni_list_node_remove(&node->node);
}
@@ -289,15 +312,17 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
pq->close = 1;
pq->started = 0;
nni_plat_pipe_raise(pq->wakewfd);
-
- // All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->armed));
- NNI_ASSERT(nni_list_empty(&pq->idle));
- NNI_ASSERT(pq->nnodes == 0);
nni_mtx_unlock(&pq->mtx);
}
nni_thr_fini(&pq->thr);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_empty(&pq->polled));
+ NNI_ASSERT(nni_list_empty(&pq->armed));
+ NNI_ASSERT(nni_list_empty(&pq->idle));
+ NNI_ASSERT(pq->nnodes == 0);
+
if (pq->wakewfd >= 0) {
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
pq->wakewfd = pq->wakerfd = -1;
@@ -313,6 +338,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
{
int rv;
+ NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node);
NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
pq->wakewfd = -1;