aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_poll.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-30 13:33:22 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-30 13:33:22 -0700
commit1a2efa40eeeb140982e11932019dd165fe6fcdd5 (patch)
tree1d4f88099f2c3baae08cc4ddcba5b12fc28e8b06 /src/platform/posix/posix_poll.c
parent69c309ec479900f9389aacba18d8c1d3026ece46 (diff)
downloadnng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.tar.gz
nng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.tar.bz2
nng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.zip
More progress on POSIX async connect stuff.
Note that we're going to refactor this again, for both TCP and IPC, to actually push the endpoint abstraction further down instead of using a combined "socket" abstraction. This may help solve other problems, such as parallel outgoing connections. Nonetheless, most of the work to make POSIX sockets fully async is now done.
Diffstat (limited to 'src/platform/posix/posix_poll.c')
-rw-r--r--src/platform/posix/posix_poll.c248
1 files changed, 204 insertions, 44 deletions
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index 751044f0..07ab1dc5 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -55,6 +55,8 @@ struct nni_posix_epdesc {
nni_posix_pollq * pq;
struct sockaddr_storage locaddr;
struct sockaddr_storage remaddr;
+ socklen_t loclen;
+ socklen_t remlen;
};
@@ -80,6 +82,41 @@ struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
+static int
+nni_posix_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->npds + pq->neds + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
static void
nni_posix_epdesc_cancel(nni_aio *aio)
{
@@ -131,9 +168,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
rv = -1;
sz = sizeof (rv);
if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- nni_posix_epdesc_finish(aio,
- nni_plat_errno(errno), 0);
- continue;
+ rv = errno;
}
switch (rv) {
case 0:
@@ -178,6 +213,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
if (newfd >= 0) {
// successful connection request!
+ // We abuse the count to hold our new file descriptor.
nni_posix_epdesc_finish(aio, 0, newfd);
continue;
}
@@ -218,6 +254,165 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed)
}
+static int
+nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed)
+{
+ int rv;
+
+ // Add epdesc to the pollq if it isn't already there.
+ if (!nni_list_active(&pq->eds, ed)) {
+ if ((rv = nni_posix_poll_grow(pq)) != 0) {
+ return (rv);
+ }
+ nni_list_append(&pq->eds, ed);
+ pq->neds++;
+ }
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ nni_posix_pollq *pq = ed->pq;
+ int wake;
+
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (ed->fd < 0) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ if (rv == 0) {
+ // Immediate connect, cool! This probably only happens on
+ // loopback, and probably not on every platform.
+ nni_posix_epdesc_finish(aio, 0, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (errno != EINPROGRESS) {
+ // Some immediate failure occurred.
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ NNI_ASSERT(!nni_list_active(&ed->connectq, aio));
+ wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0;
+ nni_list_append(&ed->connectq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+void
+nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ int wake;
+ nni_posix_pollq *pq = ed->pq;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ if (ed->fd < 0) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_lock(&pq->mtx);
+ }
+ NNI_ASSERT(!nni_list_active(&ed->acceptq, aio));
+ wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0;
+ nni_list_append(&ed->acceptq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+{
+ nni_posix_epdesc *ed;
+
+
+ if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the ed to a single pollq we may get some kind of cache warmth.
+
+ ed->pq = &nni_posix_global_pollq;
+ ed->fd = fd;
+ ed->index = 0;
+
+ // Ensure we are in non-blocking mode.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ NNI_LIST_INIT(&ed->connectq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&ed->acceptq, nni_aio, a_prov_node);
+
+ *edp = ed;
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_fini(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+ nni_posix_pollq *pq = ed->pq;
+
+ nni_mtx_lock(&pq->mtx);
+
+ // This removes any aios from our list.
+ nni_posix_poll_epclose(ed);
+
+ if (nni_list_active(&pq->eds, ed)) {
+ nni_list_remove(&pq->eds, ed);
+ pq->neds--;
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ NNI_FREE_STRUCT(ed);
+}
+
+
static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
@@ -568,41 +763,6 @@ nni_posix_pipedesc_cancel(nni_aio *aio)
}
-static int
-nni_posix_poll_grow(nni_posix_pollq *pq)
-{
- int grow = pq->npds + pq->neds + 2; // one for us, one for waker
- int noldfds;
- struct pollfd *oldfds;
- struct pollfd *newfds;
-
- if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
- return (0);
- }
-
- grow = grow + 128;
-
- // Maybe we are adding a *lot* of pipes at once, and have to grow
- // multiple times before the poller gets scheduled. In that case
- // toss the old array before we finish.
- oldfds = pq->newfds;
- noldfds = pq->nnewfds;
-
- if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
- return (NNG_ENOMEM);
- }
-
-
- pq->newfds = newfds;
- pq->nnewfds = grow;
-
- if (noldfds != 0) {
- nni_free(oldfds, noldfds * sizeof (struct pollfd));
- }
- return (0);
-}
-
-
static void
nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
{
@@ -611,9 +771,13 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
nni_posix_pollq *pq = pd->pq;
nni_mtx_lock(&pq->mtx);
+ if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
if (pd->fd < 0) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pq->mtx);
- nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
return;
}
// XXX: We really should just make all the FDs nonblocking, but we
@@ -622,13 +786,9 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
(void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
pd->nonblocking = 1;
}
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return;
- }
if (!nni_list_active(&pq->pds, pd)) {
if ((rv = nni_posix_poll_grow(pq)) != 0) {
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_posix_pipedesc_finish(aio, rv);
nni_mtx_unlock(&pq->mtx);
return;
}