aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_poll.c94
1 files changed, 78 insertions, 16 deletions
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index 76e6267f..751044f0 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -74,7 +74,7 @@ struct nni_posix_pollq {
nni_list pds; // nni_posix_pipedescs.
int npds; // length of pds list
nni_list eds; // nni_posix_epdescs
- nni_list neds; // length of eds list
+ int neds; // length of eds list
};
static nni_posix_pollq nni_posix_global_pollq;
@@ -100,6 +100,21 @@ nni_posix_epdesc_cancel(nni_aio *aio)
static void
+nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
+{
+ nni_posix_epdesc *ed;
+
+ ed = aio->a_prov_data;
+ if (nni_list_active(&ed->connectq, aio)) {
+ nni_list_remove(&ed->connectq, aio);
+ }
+
+ // Abuse the count to hold our new fd. This is only for accept.
+ nni_aio_finish(aio, rv, newfd);
+}
+
+
+static void
nni_posix_poll_connect(nni_posix_epdesc *ed)
{
nni_aio *aio;
@@ -116,15 +131,14 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
rv = -1;
sz = sizeof (rv);
if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- nni_list_remove(&ed->connectq, aio);
- nni_aio_finish(aio, nni_plat_errno(errno), 0);
+ nni_posix_epdesc_finish(aio,
+ nni_plat_errno(errno), 0);
continue;
}
switch (rv) {
case 0:
// Success!
- nni_list_remove(&ed->connectq, aio);
- nni_aio_finish(aio, 0, 0);
+ nni_posix_epdesc_finish(aio, 0, 0);
continue;
case EINPROGRESS:
@@ -132,8 +146,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
return;
default:
- nni_list_remove(&ed->connectq, aio);
- nni_aio_finish(aio, nni_plat_errno(rv), 0);
+ nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
continue;
}
}
@@ -165,9 +178,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
if (newfd >= 0) {
// successful connection request!
- nni_list_remove(&ed->acceptq, aio);
- // Abuse the count to hold our new fd.
- nni_aio_finish(aio, 0, newfd);
+ nni_posix_epdesc_finish(aio, 0, newfd);
continue;
}
@@ -188,8 +199,21 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
continue;
}
- nni_list_remove(&ed->acceptq, aio);
- nni_aio_finish(aio, nni_plat_errno(errno), 0);
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ }
+}
+
+
+static void
+nni_posix_poll_epclose(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ }
+ while ((aio = nni_list_first(&ed->connectq)) != NULL) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
}
@@ -203,8 +227,6 @@ nni_posix_pipedesc_finish(nni_aio *aio, int rv)
if (nni_list_active(&pd->readq, aio)) {
nni_list_remove(&pd->readq, aio);
}
- aio->a_prov_data = NULL;
- aio->a_prov_cancel = NULL;
nni_aio_finish(aio, rv, aio->a_count);
}
@@ -376,6 +398,8 @@ nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pollq = arg;
nni_posix_pipedesc *pd, *nextpd;
+ nni_posix_epdesc *ed, *nexted;
+
nni_mtx_lock(&pollq->mtx);
for (;;) {
@@ -420,7 +444,19 @@ nni_posix_poll_thr(void *arg)
pd->index = nfds;
nfds++;
}
-
+ NNI_LIST_FOREACH (&pollq->eds, ed) {
+ fds[nfds].fd = ed->fd;
+ fds[nfds].events = 0;
+ fds[nfds].revents = 0;
+ if (nni_list_first(&ed->connectq) != NULL) {
+ fds[nfds].events |= POLLOUT;
+ }
+ if (nni_list_first(&ed->acceptq) != NULL) {
+ fds[nfds].events |= POLLIN;
+ }
+ ed->index = nfds;
+ nfds++;
+ }
// Now poll it. We block indefinitely, since we use separate
// timeouts to wake and remove the elements from the list.
@@ -480,6 +516,32 @@ nni_posix_poll_thr(void *arg)
if ((nni_list_first(&pd->readq) == NULL) &&
(nni_list_first(&pd->writeq) == NULL)) {
nni_list_remove(&pollq->pds, pd);
+ pollq->npds--;
+ }
+ }
+ // Same thing for ep descs.
+ nexted = nni_list_first(&pollq->eds);
+ while ((ed = nexted) != NULL) {
+ int index;
+
+ nexted = nni_list_next(&pollq->eds, ed);
+ if ((index = ed->index) < 1) {
+ continue;
+ }
+ ed->index = 0;
+ if (fds[index].revents & POLLIN) {
+ nni_posix_poll_accept(ed);
+ }
+ if (fds[index].revents & POLLOUT) {
+ nni_posix_poll_connect(ed);
+ }
+ if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ nni_posix_poll_epclose(ed);
+ }
+ if ((nni_list_first(&ed->connectq) == NULL) &&
+ (nni_list_first(&ed->acceptq) == NULL)) {
+ nni_list_remove(&pollq->eds, ed);
+ pollq->neds--;
}
}
}
@@ -509,7 +571,7 @@ nni_posix_pipedesc_cancel(nni_aio *aio)
static int
nni_posix_poll_grow(nni_posix_pollq *pq)
{
- int grow = pq->npds + 2; // one for us, one for waker
+ int grow = pq->npds + pq->neds + 2; // one for us, one for waker
int noldfds;
struct pollfd *oldfds;
struct pollfd *newfds;