diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/platform/posix/posix_poll.c | 94 |
1 files changed, 78 insertions, 16 deletions
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c index 76e6267f..751044f0 100644 --- a/src/platform/posix/posix_poll.c +++ b/src/platform/posix/posix_poll.c @@ -74,7 +74,7 @@ struct nni_posix_pollq { nni_list pds; // nni_posix_pipedescs. int npds; // length of pds list nni_list eds; // nni_posix_epdescs - nni_list neds; // length of eds list + int neds; // length of eds list }; static nni_posix_pollq nni_posix_global_pollq; @@ -100,6 +100,21 @@ nni_posix_epdesc_cancel(nni_aio *aio) static void +nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) +{ + nni_posix_epdesc *ed; + + ed = aio->a_prov_data; + if (nni_list_active(&ed->connectq, aio)) { + nni_list_remove(&ed->connectq, aio); + } + + // Abuse the count to hold our new fd. This is only for accept. + nni_aio_finish(aio, rv, newfd); +} + + +static void nni_posix_poll_connect(nni_posix_epdesc *ed) { nni_aio *aio; @@ -116,15 +131,14 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) rv = -1; sz = sizeof (rv); if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - nni_list_remove(&ed->connectq, aio); - nni_aio_finish(aio, nni_plat_errno(errno), 0); + nni_posix_epdesc_finish(aio, + nni_plat_errno(errno), 0); continue; } switch (rv) { case 0: // Success! - nni_list_remove(&ed->connectq, aio); - nni_aio_finish(aio, 0, 0); + nni_posix_epdesc_finish(aio, 0, 0); continue; case EINPROGRESS: @@ -132,8 +146,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) return; default: - nni_list_remove(&ed->connectq, aio); - nni_aio_finish(aio, nni_plat_errno(rv), 0); + nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); continue; } } @@ -165,9 +178,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) if (newfd >= 0) { // successful connection request! - nni_list_remove(&ed->acceptq, aio); - // Abuse the count to hold our new fd. - nni_aio_finish(aio, 0, newfd); + nni_posix_epdesc_finish(aio, 0, newfd); continue; } @@ -188,8 +199,21 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) continue; } - nni_list_remove(&ed->acceptq, aio); - nni_aio_finish(aio, nni_plat_errno(errno), 0); + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + } +} + + +static void +nni_posix_poll_epclose(nni_posix_epdesc *ed) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + } + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } } @@ -203,8 +227,6 @@ nni_posix_pipedesc_finish(nni_aio *aio, int rv) if (nni_list_active(&pd->readq, aio)) { nni_list_remove(&pd->readq, aio); } - aio->a_prov_data = NULL; - aio->a_prov_cancel = NULL; nni_aio_finish(aio, rv, aio->a_count); } @@ -376,6 +398,8 @@ nni_posix_poll_thr(void *arg) { nni_posix_pollq *pollq = arg; nni_posix_pipedesc *pd, *nextpd; + nni_posix_epdesc *ed, *nexted; + nni_mtx_lock(&pollq->mtx); for (;;) { @@ -420,7 +444,19 @@ nni_posix_poll_thr(void *arg) pd->index = nfds; nfds++; } - + NNI_LIST_FOREACH (&pollq->eds, ed) { + fds[nfds].fd = ed->fd; + fds[nfds].events = 0; + fds[nfds].revents = 0; + if (nni_list_first(&ed->connectq) != NULL) { + fds[nfds].events |= POLLOUT; + } + if (nni_list_first(&ed->acceptq) != NULL) { + fds[nfds].events |= POLLIN; + } + ed->index = nfds; + nfds++; + } // Now poll it. We block indefinitely, since we use separate // timeouts to wake and remove the elements from the list. @@ -480,6 +516,32 @@ nni_posix_poll_thr(void *arg) if ((nni_list_first(&pd->readq) == NULL) && (nni_list_first(&pd->writeq) == NULL)) { nni_list_remove(&pollq->pds, pd); + pollq->npds--; + } + } + // Same thing for ep descs. + nexted = nni_list_first(&pollq->eds); + while ((ed = nexted) != NULL) { + int index; + + nexted = nni_list_next(&pollq->eds, ed); + if ((index = ed->index) < 1) { + continue; + } + ed->index = 0; + if (fds[index].revents & POLLIN) { + nni_posix_poll_accept(ed); + } + if (fds[index].revents & POLLOUT) { + nni_posix_poll_connect(ed); + } + if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) { + nni_posix_poll_epclose(ed); + } + if ((nni_list_first(&ed->connectq) == NULL) && + (nni_list_first(&ed->acceptq) == NULL)) { + nni_list_remove(&pollq->eds, ed); + pollq->neds--; } } } @@ -509,7 +571,7 @@ nni_posix_pipedesc_cancel(nni_aio *aio) static int nni_posix_poll_grow(nni_posix_pollq *pq) { - int grow = pq->npds + 2; // one for us, one for waker + int grow = pq->npds + pq->neds + 2; // one for us, one for waker int noldfds; struct pollfd *oldfds; struct pollfd *newfds; |
