diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
| commit | 5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch) | |
| tree | 41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix/posix_pollq_poll.c | |
| parent | 3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff) | |
| download | nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2 nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip | |
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the
pollq. This triggers an assertion failure when it occurs. I am still
trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix/posix_pollq_poll.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 144 |
1 files changed, 85 insertions, 59 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d378c45a..1bc814bf 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -32,20 +32,21 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - struct pollfd *fds; - int nfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - int close; // request for worker to exit - int started; - nni_thr thr; // worker thread - nni_list armed; // armed nodes - nni_list idle; // idle nodes - int nnodes; // num of nodes in nodes list - int inpoll; // poller asleep in poll - + nni_mtx mtx; + nni_cv cv; + struct pollfd * fds; + int nfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list polled; // polled nodes + nni_list armed; // armed nodes + nni_list idle; // idle nodes + int nnodes; // num of nodes in nodes list + int inpoll; // poller asleep in poll + nni_posix_pollq_node *wait; // cancel waiting on this nni_posix_pollq_node *active; // active node (in callback) }; @@ -78,7 +79,7 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq * pollq = arg; - nni_posix_pollq_node *node, *nextnode; + nni_posix_pollq_node *node; nni_mtx_lock(&pollq->mtx); for (;;) { @@ -101,7 +102,9 @@ nni_posix_poll_thr(void *arg) nfds++; // Set up the poll list. - NNI_LIST_FOREACH (&pollq->armed, node) { + while ((node = nni_list_first(&pollq->armed)) != NULL) { + nni_list_remove(&pollq->armed, node); + nni_list_append(&pollq->polled, node); fds[nfds].fd = node->fd; fds[nfds].events = node->events; fds[nfds].revents = 0; @@ -131,48 +134,53 @@ nni_posix_poll_thr(void *arg) nni_plat_pipe_clear(pollq->wakerfd); } - // Now we iterate through all the nodes. Note that one - // may have been added or removed. New pipedescs will have - // their index set to -1. Removed ones will just be absent. - // Note that we may remove the pipedesc from the list, so we - // have to use a custom iterator. - nextnode = nni_list_first(&pollq->armed); - while ((node = nextnode) != NULL) { - int index; - - // Save the next node, so that we can remove this - // one if needed. - nextnode = nni_list_next(&pollq->armed, node); - - // If index is less than 1, then we have just added - // this and there is no FD for it in the pollfds. - if ((index = node->index) < 1) { - continue; - } - // Was there any activity? + while ((node = nni_list_first(&pollq->polled)) != NULL) { + int index = node->index; + + // We remove ourselves from the polled list, and + // then put it on either the idle or armed list + // depending on whether it remains armed. + node->index = 0; + nni_list_remove(&pollq->polled, node); + NNI_ASSERT(index > 0); if (fds[index].revents == 0) { + // If still watching for events, return it + // to the armed list. + if (node->events) { + nni_list_append(&pollq->armed, node); + } else { + nni_list_append(&pollq->idle, node); + } continue; } - // Clear the index for the next time around. - node->index = 0; + // We are calling the callback, so disarm + // all events; the node can rearm them in its + // callback. node->revents = fds[index].revents; - - // Execute callbacks. Note that these occur with - // the lock held. - if (node->cb != NULL) { - node->cb(node->data); + node->events &= ~node->revents; + if (node->events == 0) { + nni_list_append(&pollq->idle, node); } else { - // No further events for you! - node->events = 0; + nni_list_append(&pollq->armed, node); } - // Callback should clear events. If none were - // rearmed, then move to the idle list so we won't - // keep looking at it. - if (node->events == 0) { - nni_list_remove(&pollq->armed, node); - nni_list_append(&pollq->idle, node); + // Save the active node; we can notice this way + // when it is busy, and avoid freeing it until + // we are sure that it is not in use. + pollq->active = node; + + // Execute the callback -- without locks held. + nni_mtx_unlock(&pollq->mtx); + node->cb(node->data); + nni_mtx_lock(&pollq->mtx); + + // We finished with this node. If something + // was blocked waiting for that, wake it up. + pollq->active = NULL; + if (pollq->wait == node) { + pollq->wait = NULL; + nni_cv_wake(&pollq->cv); } } } @@ -188,6 +196,10 @@ nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node) int rv; NNI_ASSERT(!nni_list_node_active(&node->node)); + if (pq->close) { + // This shouldn't happen! + return (NNG_ECLOSED); + } node->pq = pq; if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) { return (rv); @@ -221,9 +233,17 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node) return; } nni_mtx_lock(&pq->mtx); - NNI_ASSERT(nni_list_node_active(&node->node)); - nni_list_node_remove(&node->node); - pq->nnodes--; + while (pq->active == node) { + pq->wait = node; + nni_cv_wait(&pq->cv); + } + if (nni_list_node_active(&node->node)) { + nni_list_node_remove(&node->node); + pq->nnodes--; + } + if (pq->close) { + nni_cv_wake(&pq->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -241,7 +261,10 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) oevents = node->events; node->events |= events; - if ((oevents == 0) && (events != 0)) { + // We move this to the armed list if its not armed, or already + // on the polled list. The polled list would be the case where + // the index is set to a positive value. + if ((oevents == 0) && (events != 0) && (node->index < 1)) { if (nni_list_node_active(&node->node)) { nni_list_node_remove(&node->node); } @@ -289,15 +312,17 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) pq->close = 1; pq->started = 0; nni_plat_pipe_raise(pq->wakewfd); - - // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_empty(&pq->armed)); - NNI_ASSERT(nni_list_empty(&pq->idle)); - NNI_ASSERT(pq->nnodes == 0); nni_mtx_unlock(&pq->mtx); } nni_thr_fini(&pq->thr); + + // All pipes should have been closed before this is called. + NNI_ASSERT(nni_list_empty(&pq->polled)); + NNI_ASSERT(nni_list_empty(&pq->armed)); + NNI_ASSERT(nni_list_empty(&pq->idle)); + NNI_ASSERT(pq->nnodes == 0); + if (pq->wakewfd >= 0) { nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); pq->wakewfd = pq->wakerfd = -1; @@ -313,6 +338,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq) { int rv; + NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node); NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node); NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node); pq->wakewfd = -1; |
