diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 18 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 218 |
3 files changed, 127 insertions, 112 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index c6512eb4..f9927e9d 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -137,9 +137,6 @@ nni_aio_wait(nni_aio *aio) int nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data) { - NNI_ASSERT(aio->a_prov_data == NULL); - NNI_ASSERT(aio->a_prov_cancel == NULL); - nni_mtx_lock(&aio->a_lk); aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE); if (aio->a_flags & NNI_AIO_FINI) { diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index e42b120d..9fa7c92e 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -25,22 +25,26 @@ typedef struct nni_posix_pollq_node nni_posix_pollq_node; typedef struct nni_posix_pollq nni_posix_pollq; struct nni_posix_pollq_node { - nni_posix_pollq *pq; // associated pollq nni_list_node node; // linkage into the pollq list + nni_posix_pollq *pq; // associated pollq int index; // used by the poller impl int armed; // used by the poller impl int fd; // file descriptor to poll int events; // events to watch for int revents; // events received - void * data; // user data - nni_cb cb; // user callback on event + nni_taskq_ent task; + void * data; // user data + nni_cb cb; // user callback on event }; extern nni_posix_pollq *nni_posix_pollq_get(int); -extern int nni_posix_pollq_submit(nni_posix_pollq *, nni_posix_pollq_node *); -extern void nni_posix_pollq_cancel(nni_posix_pollq *, nni_posix_pollq_node *); -extern int nni_posix_pollq_sysinit(void); -extern void nni_posix_pollq_sysfini(void); +extern int nni_posix_pollq_sysinit(void); +extern void nni_posix_pollq_sysfini(void); + +extern int nni_posix_pollq_add(nni_posix_pollq *, nni_posix_pollq_node *); +extern void nni_posix_pollq_remove(nni_posix_pollq_node *); +extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int); +extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int); #endif // PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index fe1359ec..d378c45a 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -35,18 +35,15 @@ struct nni_posix_pollq { nni_mtx mtx; nni_cv cv; struct pollfd *fds; - struct pollfd *newfds; int nfds; - int nnewfds; int wakewfd; // write side of waker pipe int wakerfd; // read side of waker pipe int close; // request for worker to exit int started; nni_thr thr; // worker thread - nni_list nodes; // poll list - nni_list notify; // notify list + nni_list armed; // armed nodes + nni_list idle; // idle nodes int nnodes; // num of nodes in nodes list - int cancel; // waiters for cancellation int inpoll; // poller asleep in poll nni_posix_pollq_node *active; // active node (in callback) @@ -56,32 +53,24 @@ static int nni_posix_pollq_poll_grow(nni_posix_pollq *pq) { int grow = pq->nnodes + 2; // one for us, one for waker - int noldfds; - struct pollfd *oldfds; struct pollfd *newfds; - if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + if (grow < pq->nfds) { return (0); } grow = grow + 128; - // Maybe we are adding a *lot* of pipes at once, and have to grow - // multiple times before the poller gets scheduled. In that case - // toss the old array before we finish. - oldfds = pq->newfds; - noldfds = pq->nnewfds; - - if ((newfds = nni_alloc(grow * sizeof(struct pollfd))) == NULL) { + if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) { return (NNG_ENOMEM); } - pq->newfds = newfds; - pq->nnewfds = grow; - - if (noldfds != 0) { - nni_free(oldfds, noldfds * sizeof(struct pollfd)); + if (pq->nfds != 0) { + NNI_FREE_STRUCTS(pq->fds, pq->nfds); } + pq->fds = newfds; + pq->nfds = grow; + return (0); } @@ -101,15 +90,6 @@ nni_posix_poll_thr(void *arg) break; } - if (pollq->newfds != NULL) { - // We have "grown" by the caller. Free up the old - // space, and start using the new. - nni_free( - pollq->fds, pollq->nfds * sizeof(struct pollfd)); - pollq->fds = pollq->newfds; - pollq->nfds = pollq->nnewfds; - pollq->newfds = NULL; - } fds = pollq->fds; nfds = 0; @@ -121,9 +101,9 @@ nni_posix_poll_thr(void *arg) nfds++; // Set up the poll list. - NNI_LIST_FOREACH (&pollq->nodes, node) { + NNI_LIST_FOREACH (&pollq->armed, node) { fds[nfds].fd = node->fd; - fds[nfds].events = node->armed; + fds[nfds].events = node->events; fds[nfds].revents = 0; node->index = nfds; nfds++; @@ -156,22 +136,20 @@ nni_posix_poll_thr(void *arg) // their index set to -1. Removed ones will just be absent. // Note that we may remove the pipedesc from the list, so we // have to use a custom iterator. - nextnode = nni_list_first(&pollq->nodes); + nextnode = nni_list_first(&pollq->armed); while ((node = nextnode) != NULL) { int index; - // Save the nextpd for our next iteration. This - // way we can remove the PD from the list without - // breaking the iteration. - - nextnode = nni_list_next(&pollq->nodes, node); + // Save the next node, so that we can remove this + // one if needed. + nextnode = nni_list_next(&pollq->armed, node); // If index is less than 1, then we have just added // this and there is no FD for it in the pollfds. if ((index = node->index) < 1) { continue; } - + // Was there any activity? if (fds[index].revents == 0) { continue; } @@ -180,94 +158,127 @@ nni_posix_poll_thr(void *arg) node->index = 0; node->revents = fds[index].revents; - // Now we move this node to the callback list. - node->armed = 0; - nni_list_remove(&pollq->nodes, node); - nni_list_append(&pollq->notify, node); - pollq->nnodes--; - } - - // Finally we can call the callbacks. We record the - // active callback so any attempt to cancel blocks until - // the callback is finished. - while ((node = nni_list_first(&pollq->notify)) != NULL) { - nni_list_remove(&pollq->notify, node); + // Execute callbacks. Note that these occur with + // the lock held. if (node->cb != NULL) { - pollq->active = node; - nni_mtx_unlock(&pollq->mtx); node->cb(node->data); - nni_mtx_lock(&pollq->mtx); - pollq->active = NULL; + } else { + // No further events for you! + node->events = 0; } - } - // Wake any cancelers. - if (pollq->cancel != 0) { - pollq->cancel = 0; - nni_cv_wake(&pollq->cv); + // Callback should clear events. If none were + // rearmed, then move to the idle list so we won't + // keep looking at it. + if (node->events == 0) { + nni_list_remove(&pollq->armed, node); + nni_list_append(&pollq->idle, node); + } } } nni_mtx_unlock(&pollq->mtx); } -void -nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node) +// nni_posix_pollq_add_cb is intended to be uesd during endpoint +// operations that create new pollq nodes in callbacks. The lock +// for the pollq must be held. +int +nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node) { - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->cancel++; - nni_cv_wait(&pq->cv); + int rv; + NNI_ASSERT(!nni_list_node_active(&node->node)); + + node->pq = pq; + if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) { + return (rv); } - if (nni_list_active(&pq->nodes, node)) { - node->armed = 0; - nni_list_remove(&pq->nodes, node); + pq->nnodes++; + if (node->events != 0) { + nni_list_append(&pq->armed, node); + } else { + nni_list_append(&pq->idle, node); } - // Since we're not removing the fd from the outstanding poll, we - // may get an event. In that case, we'll wake and rebuild the - // pollset without it, with no further action. Otherwise having the - // poll present does no harm beyond the "spurious" wake of the poller - // thread. (If we had port_disassociate or somesuch, this would be - // a great time for that.) - nni_mtx_unlock(&pq->mtx); + return (0); } int -nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node) +nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node) { - int wake; int rv; - int evs; nni_mtx_lock(&pq->mtx); + rv = nni_posix_pollq_add_cb(pq, node); + nni_mtx_unlock(&pq->mtx); + return (rv); +} - if (node->events == 0) { - // Nothing to schedule? - nni_mtx_unlock(&pq->mtx); - return (0); +void +nni_posix_pollq_remove(nni_posix_pollq_node *node) +{ + nni_posix_pollq *pq = node->pq; + + if (pq == NULL) { + return; } + nni_mtx_lock(&pq->mtx); + NNI_ASSERT(nni_list_node_active(&node->node)); + nni_list_node_remove(&node->node); + pq->nnodes--; + nni_mtx_unlock(&pq->mtx); +} - if (node->armed == 0) { - NNI_ASSERT(!nni_list_active(&pq->nodes, node)); +void +nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +{ + nni_posix_pollq *pq = node->pq; + int oevents; - rv = nni_posix_pollq_poll_grow(pq); - if (rv != 0) { - nni_mtx_unlock(&pq->mtx); - return (rv); - } + if (pq == NULL) { + return; + } + + nni_mtx_lock(&pq->mtx); + oevents = node->events; + node->events |= events; - nni_list_append(&pq->nodes, node); - pq->nnodes++; + if ((oevents == 0) && (events != 0)) { + if (nni_list_node_active(&node->node)) { + nni_list_node_remove(&node->node); + } + nni_list_append(&pq->armed, node); } + if ((events != 0) && (oevents != events)) { + // Possibly wake up the poller since we're looking for + // new events. + if (pq->inpoll) { + nni_plat_pipe_raise(pq->wakewfd); + } + } + nni_mtx_unlock(&pq->mtx); +} - node->armed |= node->events; +void +nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) +{ + nni_posix_pollq *pq = node->pq; + int oevents; - // Wake up the poller since we're adding a new poll, but only bother - // if it's already asleep. (Frequently it will *not* be.) - if (pq->inpoll) { - nni_plat_pipe_raise(pq->wakewfd); + if (pq == NULL) { + return; } + + nni_mtx_lock(&pq->mtx); + oevents = node->events; + node->events &= ~events; + if ((node->events == 0) && (oevents != 0)) { + if (nni_list_node_active(&node->node)) { + nni_list_node_remove(&node->node); + } + nni_list_append(&pq->idle, node); + } + // No need to wake anything, we might get a spurious wake up but + // that's harmless. nni_mtx_unlock(&pq->mtx); - return (0); } static void @@ -280,7 +291,9 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) nni_plat_pipe_raise(pq->wakewfd); // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_empty(&pq->nodes)); + NNI_ASSERT(nni_list_empty(&pq->armed)); + NNI_ASSERT(nni_list_empty(&pq->idle)); + NNI_ASSERT(pq->nnodes == 0); nni_mtx_unlock(&pq->mtx); } @@ -289,8 +302,9 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); pq->wakewfd = pq->wakerfd = -1; } - nni_free(pq->newfds, pq->nnewfds * sizeof(struct pollfd)); - nni_free(pq->fds, pq->nfds * sizeof(struct pollfd)); + if (pq->nfds != 0) { + NNI_FREE_STRUCTS(pq->fds, pq->nfds); + } nni_mtx_fini(&pq->mtx); } @@ -299,8 +313,8 @@ nni_posix_pollq_init(nni_posix_pollq *pq) { int rv; - NNI_LIST_INIT(&pq->nodes, nni_posix_pollq_node, node); - NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node); + NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node); + NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node); pq->wakewfd = -1; pq->wakerfd = -1; pq->close = 0; |
