diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-30 13:33:22 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-30 13:33:22 -0700 |
| commit | 1a2efa40eeeb140982e11932019dd165fe6fcdd5 (patch) | |
| tree | 1d4f88099f2c3baae08cc4ddcba5b12fc28e8b06 /src/platform | |
| parent | 69c309ec479900f9389aacba18d8c1d3026ece46 (diff) | |
| download | nng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.tar.gz nng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.tar.bz2 nng-1a2efa40eeeb140982e11932019dd165fe6fcdd5.zip | |
More progress on POSIX async connect stuff.
Note that we're going to refactor this again, for both TCP and
IPC, to actually push the endpoint abstraction further down
instead of using a combined "socket" abstraction. This may help
solve other problems, such as parallel outgoing connections.
Nonetheless, most of the work to make POSIX sockets fully async
is now done.
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 248 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 9 |
3 files changed, 219 insertions, 44 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 26ef19b9..3bb59b95 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -30,4 +30,10 @@ extern void nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_close(nni_posix_pipedesc *); +extern int nni_posix_epdesc_init(nni_posix_epdesc **, int); +extern void nni_posix_epdesc_fini(nni_posix_epdesc *); +extern void nni_posix_epdesc_close(nni_posix_epdesc *); +extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *); +extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); + #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c index 751044f0..07ab1dc5 100644 --- a/src/platform/posix/posix_poll.c +++ b/src/platform/posix/posix_poll.c @@ -55,6 +55,8 @@ struct nni_posix_epdesc { nni_posix_pollq * pq; struct sockaddr_storage locaddr; struct sockaddr_storage remaddr; + socklen_t loclen; + socklen_t remlen; }; @@ -80,6 +82,41 @@ struct nni_posix_pollq { static nni_posix_pollq nni_posix_global_pollq; +static int +nni_posix_poll_grow(nni_posix_pollq *pq) +{ + int grow = pq->npds + pq->neds + 2; // one for us, one for waker + int noldfds; + struct pollfd *oldfds; + struct pollfd *newfds; + + if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + return (0); + } + + grow = grow + 128; + + // Maybe we are adding a *lot* of pipes at once, and have to grow + // multiple times before the poller gets scheduled. In that case + // toss the old array before we finish. + oldfds = pq->newfds; + noldfds = pq->nnewfds; + + if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + return (NNG_ENOMEM); + } + + + pq->newfds = newfds; + pq->nnewfds = grow; + + if (noldfds != 0) { + nni_free(oldfds, noldfds * sizeof (struct pollfd)); + } + return (0); +} + + static void nni_posix_epdesc_cancel(nni_aio *aio) { @@ -131,9 +168,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) rv = -1; sz = sizeof (rv); if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - nni_posix_epdesc_finish(aio, - nni_plat_errno(errno), 0); - continue; + rv = errno; } switch (rv) { case 0: @@ -178,6 +213,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) if (newfd >= 0) { // successful connection request! + // We abuse the count to hold our new file descriptor. nni_posix_epdesc_finish(aio, 0, newfd); continue; } @@ -218,6 +254,165 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed) } +static int +nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed) +{ + int rv; + + // Add epdesc to the pollq if it isn't already there. + if (!nni_list_active(&pq->eds, ed)) { + if ((rv = nni_posix_poll_grow(pq)) != 0) { + return (rv); + } + nni_list_append(&pq->eds, ed); + pq->neds++; + } + return (0); +} + + +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + nni_posix_pollq *pq = ed->pq; + int wake; + + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + if (ed->fd < 0) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { + // Immediate connect, cool! This probably only happens on + // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + if (errno != EINPROGRESS) { + // Some immediate failure occurred. + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + NNI_ASSERT(!nni_list_active(&ed->connectq, aio)); + wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0; + nni_list_append(&ed->connectq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +void +nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + int wake; + nni_posix_pollq *pq = ed->pq; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + + if (ed->fd < 0) { + nni_mtx_unlock(&pq->mtx); + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_lock(&pq->mtx); + } + NNI_ASSERT(!nni_list_active(&ed->acceptq, aio)); + wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0; + nni_list_append(&ed->acceptq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +int +nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +{ + nni_posix_epdesc *ed; + + + if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { + return (NNG_ENOMEM); + } + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the ed to a single pollq we may get some kind of cache warmth. + + ed->pq = &nni_posix_global_pollq; + ed->fd = fd; + ed->index = 0; + + // Ensure we are in non-blocking mode. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + NNI_LIST_INIT(&ed->connectq, nni_aio, a_prov_node); + NNI_LIST_INIT(&ed->acceptq, nni_aio, a_prov_node); + + *edp = ed; + return (0); +} + + +void +nni_posix_epdesc_fini(nni_posix_epdesc *ed) +{ + nni_aio *aio; + nni_posix_pollq *pq = ed->pq; + + nni_mtx_lock(&pq->mtx); + + // This removes any aios from our list. + nni_posix_poll_epclose(ed); + + if (nni_list_active(&pq->eds, ed)) { + nni_list_remove(&pq->eds, ed); + pq->neds--; + } + nni_mtx_unlock(&pq->mtx); + + NNI_FREE_STRUCT(ed); +} + + static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { @@ -568,41 +763,6 @@ nni_posix_pipedesc_cancel(nni_aio *aio) } -static int -nni_posix_poll_grow(nni_posix_pollq *pq) -{ - int grow = pq->npds + pq->neds + 2; // one for us, one for waker - int noldfds; - struct pollfd *oldfds; - struct pollfd *newfds; - - if ((grow < pq->nfds) || (grow < pq->nnewfds)) { - return (0); - } - - grow = grow + 128; - - // Maybe we are adding a *lot* of pipes at once, and have to grow - // multiple times before the poller gets scheduled. In that case - // toss the old array before we finish. - oldfds = pq->newfds; - noldfds = pq->nnewfds; - - if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { - return (NNG_ENOMEM); - } - - - pq->newfds = newfds; - pq->nnewfds = grow; - - if (noldfds != 0) { - nni_free(oldfds, noldfds * sizeof (struct pollfd)); - } - return (0); -} - - static void nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) { @@ -611,9 +771,13 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) nni_posix_pollq *pq = pd->pq; nni_mtx_lock(&pq->mtx); + if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } if (pd->fd < 0) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pq->mtx); - nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); return; } // XXX: We really should just make all the FDs nonblocking, but we @@ -622,13 +786,9 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); pd->nonblocking = 1; } - if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pq->mtx); - return; - } if (!nni_list_active(&pq->pds, pd)) { if ((rv = nni_posix_poll_grow(pq)) != 0) { - nni_aio_finish(aio, rv, aio->a_count); + nni_posix_pipedesc_finish(aio, rv); nni_mtx_unlock(&pq->mtx); return; } diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c index ce887a78..6d2b6c29 100644 --- a/src/platform/posix/posix_socket.c +++ b/src/platform/posix/posix_socket.c @@ -46,6 +46,7 @@ struct nni_posix_sock { int devnull; // for shutting down accept() char * unlink; // path to unlink at unbind nni_posix_pipedesc * pd; + nni_posix_epdesc * ed; int tcpnodelay; }; @@ -213,6 +214,9 @@ nni_posix_sock_fini(nni_posix_sock *s) if (s->pd != NULL) { nni_posix_pipedesc_fini(s->pd); } + if (s->ed != NULL) { + nni_posix_epdesc_fini(s->ed); + } if (s->unlink != NULL) { (void) unlink(s->unlink); nni_free(s->unlink, strlen(s->unlink) + 1); @@ -498,6 +502,11 @@ nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, (void) close(fd); return (rv); } + if (s->pd != NULL) { + // If we had a prior pipedesc hanging around, nuke it. + nni_posix_pipedesc_fini(s->pd); + s->pd = NULL; + } if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { (void) close(fd); return (rv); |
