diff options
| -rw-r--r-- | src/platform/posix/posix_aio.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 248 | ||||
| -rw-r--r-- | src/platform/posix/posix_socket.c | 9 |
3 files changed, 219 insertions, 44 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 26ef19b9..3bb59b95 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -30,4 +30,10 @@ extern void nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_close(nni_posix_pipedesc *); +extern int nni_posix_epdesc_init(nni_posix_epdesc **, int); +extern void nni_posix_epdesc_fini(nni_posix_epdesc *); +extern void nni_posix_epdesc_close(nni_posix_epdesc *); +extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *); +extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *); + #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c index 751044f0..07ab1dc5 100644 --- a/src/platform/posix/posix_poll.c +++ b/src/platform/posix/posix_poll.c @@ -55,6 +55,8 @@ struct nni_posix_epdesc { nni_posix_pollq * pq; struct sockaddr_storage locaddr; struct sockaddr_storage remaddr; + socklen_t loclen; + socklen_t remlen; }; @@ -80,6 +82,41 @@ struct nni_posix_pollq { static nni_posix_pollq nni_posix_global_pollq; +static int +nni_posix_poll_grow(nni_posix_pollq *pq) +{ + int grow = pq->npds + pq->neds + 2; // one for us, one for waker + int noldfds; + struct pollfd *oldfds; + struct pollfd *newfds; + + if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + return (0); + } + + grow = grow + 128; + + // Maybe we are adding a *lot* of pipes at once, and have to grow + // multiple times before the poller gets scheduled. In that case + // toss the old array before we finish. + oldfds = pq->newfds; + noldfds = pq->nnewfds; + + if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + return (NNG_ENOMEM); + } + + + pq->newfds = newfds; + pq->nnewfds = grow; + + if (noldfds != 0) { + nni_free(oldfds, noldfds * sizeof (struct pollfd)); + } + return (0); +} + + static void nni_posix_epdesc_cancel(nni_aio *aio) { @@ -131,9 +168,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) rv = -1; sz = sizeof (rv); if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - nni_posix_epdesc_finish(aio, - nni_plat_errno(errno), 0); - continue; + rv = errno; } switch (rv) { case 0: @@ -178,6 +213,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) if (newfd >= 0) { // successful connection request! + // We abuse the count to hold our new file descriptor. nni_posix_epdesc_finish(aio, 0, newfd); continue; } @@ -218,6 +254,165 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed) } +static int +nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed) +{ + int rv; + + // Add epdesc to the pollq if it isn't already there. + if (!nni_list_active(&pq->eds, ed)) { + if ((rv = nni_posix_poll_grow(pq)) != 0) { + return (rv); + } + nni_list_append(&pq->eds, ed); + pq->neds++; + } + return (0); +} + + +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + nni_posix_pollq *pq = ed->pq; + int wake; + + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + if (ed->fd < 0) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { + // Immediate connect, cool! This probably only happens on + // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + if (errno != EINPROGRESS) { + // Some immediate failure occurred. + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + NNI_ASSERT(!nni_list_active(&ed->connectq, aio)); + wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0; + nni_list_append(&ed->connectq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +void +nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + int wake; + nni_posix_pollq *pq = ed->pq; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + + if (ed->fd < 0) { + nni_mtx_unlock(&pq->mtx); + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_lock(&pq->mtx); + } + NNI_ASSERT(!nni_list_active(&ed->acceptq, aio)); + wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0; + nni_list_append(&ed->acceptq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +int +nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +{ + nni_posix_epdesc *ed; + + + if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { + return (NNG_ENOMEM); + } + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the ed to a single pollq we may get some kind of cache warmth. + + ed->pq = &nni_posix_global_pollq; + ed->fd = fd; + ed->index = 0; + + // Ensure we are in non-blocking mode. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + NNI_LIST_INIT(&ed->connectq, nni_aio, a_prov_node); + NNI_LIST_INIT(&ed->acceptq, nni_aio, a_prov_node); + + *edp = ed; + return (0); +} + + +void +nni_posix_epdesc_fini(nni_posix_epdesc *ed) +{ + nni_aio *aio; + nni_posix_pollq *pq = ed->pq; + + nni_mtx_lock(&pq->mtx); + + // This removes any aios from our list. + nni_posix_poll_epclose(ed); + + if (nni_list_active(&pq->eds, ed)) { + nni_list_remove(&pq->eds, ed); + pq->neds--; + } + nni_mtx_unlock(&pq->mtx); + + NNI_FREE_STRUCT(ed); +} + + static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { @@ -568,41 +763,6 @@ nni_posix_pipedesc_cancel(nni_aio *aio) } -static int -nni_posix_poll_grow(nni_posix_pollq *pq) -{ - int grow = pq->npds + pq->neds + 2; // one for us, one for waker - int noldfds; - struct pollfd *oldfds; - struct pollfd *newfds; - - if ((grow < pq->nfds) || (grow < pq->nnewfds)) { - return (0); - } - - grow = grow + 128; - - // Maybe we are adding a *lot* of pipes at once, and have to grow - // multiple times before the poller gets scheduled. In that case - // toss the old array before we finish. - oldfds = pq->newfds; - noldfds = pq->nnewfds; - - if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { - return (NNG_ENOMEM); - } - - - pq->newfds = newfds; - pq->nnewfds = grow; - - if (noldfds != 0) { - nni_free(oldfds, noldfds * sizeof (struct pollfd)); - } - return (0); -} - - static void nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) { @@ -611,9 +771,13 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) nni_posix_pollq *pq = pd->pq; nni_mtx_lock(&pq->mtx); + if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } if (pd->fd < 0) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pq->mtx); - nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); return; } // XXX: We really should just make all the FDs nonblocking, but we @@ -622,13 +786,9 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); pd->nonblocking = 1; } - if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { - nni_mtx_unlock(&pq->mtx); - return; - } if (!nni_list_active(&pq->pds, pd)) { if ((rv = nni_posix_poll_grow(pq)) != 0) { - nni_aio_finish(aio, rv, aio->a_count); + nni_posix_pipedesc_finish(aio, rv); nni_mtx_unlock(&pq->mtx); return; } diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c index ce887a78..6d2b6c29 100644 --- a/src/platform/posix/posix_socket.c +++ b/src/platform/posix/posix_socket.c @@ -46,6 +46,7 @@ struct nni_posix_sock { int devnull; // for shutting down accept() char * unlink; // path to unlink at unbind nni_posix_pipedesc * pd; + nni_posix_epdesc * ed; int tcpnodelay; }; @@ -213,6 +214,9 @@ nni_posix_sock_fini(nni_posix_sock *s) if (s->pd != NULL) { nni_posix_pipedesc_fini(s->pd); } + if (s->ed != NULL) { + nni_posix_epdesc_fini(s->ed); + } if (s->unlink != NULL) { (void) unlink(s->unlink); nni_free(s->unlink, strlen(s->unlink) + 1); @@ -498,6 +502,11 @@ nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr, (void) close(fd); return (rv); } + if (s->pd != NULL) { + // If we had a prior pipedesc hanging around, nuke it. + nni_posix_pipedesc_fini(s->pd); + s->pd = NULL; + } if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) { (void) close(fd); return (rv); |
