aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_aio.h6
-rw-r--r--src/platform/posix/posix_poll.c248
-rw-r--r--src/platform/posix/posix_socket.c9
3 files changed, 219 insertions, 44 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 26ef19b9..3bb59b95 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -30,4 +30,10 @@ extern void nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
+extern int nni_posix_epdesc_init(nni_posix_epdesc **, int);
+extern void nni_posix_epdesc_fini(nni_posix_epdesc *);
+extern void nni_posix_epdesc_close(nni_posix_epdesc *);
+extern void nni_posix_epdesc_connect(nni_posix_epdesc *, nni_aio *);
+extern void nni_posix_epdesc_accept(nni_posix_epdesc *, nni_aio *);
+
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index 751044f0..07ab1dc5 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -55,6 +55,8 @@ struct nni_posix_epdesc {
nni_posix_pollq * pq;
struct sockaddr_storage locaddr;
struct sockaddr_storage remaddr;
+ socklen_t loclen;
+ socklen_t remlen;
};
@@ -80,6 +82,41 @@ struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
+static int
+nni_posix_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->npds + pq->neds + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
static void
nni_posix_epdesc_cancel(nni_aio *aio)
{
@@ -131,9 +168,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
rv = -1;
sz = sizeof (rv);
if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- nni_posix_epdesc_finish(aio,
- nni_plat_errno(errno), 0);
- continue;
+ rv = errno;
}
switch (rv) {
case 0:
@@ -178,6 +213,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
if (newfd >= 0) {
// successful connection request!
+ // We abuse the count to hold our new file descriptor.
nni_posix_epdesc_finish(aio, 0, newfd);
continue;
}
@@ -218,6 +254,165 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed)
}
+static int
+nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed)
+{
+ int rv;
+
+ // Add epdesc to the pollq if it isn't already there.
+ if (!nni_list_active(&pq->eds, ed)) {
+ if ((rv = nni_posix_poll_grow(pq)) != 0) {
+ return (rv);
+ }
+ nni_list_append(&pq->eds, ed);
+ pq->neds++;
+ }
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ nni_posix_pollq *pq = ed->pq;
+ int wake;
+
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (ed->fd < 0) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ if (rv == 0) {
+ // Immediate connect, cool! This probably only happens on
+ // loopback, and probably not on every platform.
+ nni_posix_epdesc_finish(aio, 0, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (errno != EINPROGRESS) {
+ // Some immediate failure occurred.
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ NNI_ASSERT(!nni_list_active(&ed->connectq, aio));
+ wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0;
+ nni_list_append(&ed->connectq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+void
+nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ int wake;
+ nni_posix_pollq *pq = ed->pq;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ if (ed->fd < 0) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_lock(&pq->mtx);
+ }
+ NNI_ASSERT(!nni_list_active(&ed->acceptq, aio));
+ wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0;
+ nni_list_append(&ed->acceptq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+{
+ nni_posix_epdesc *ed;
+
+
+ if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the ed to a single pollq we may get some kind of cache warmth.
+
+ ed->pq = &nni_posix_global_pollq;
+ ed->fd = fd;
+ ed->index = 0;
+
+ // Ensure we are in non-blocking mode.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ NNI_LIST_INIT(&ed->connectq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&ed->acceptq, nni_aio, a_prov_node);
+
+ *edp = ed;
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_fini(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+ nni_posix_pollq *pq = ed->pq;
+
+ nni_mtx_lock(&pq->mtx);
+
+ // This removes any aios from our list.
+ nni_posix_poll_epclose(ed);
+
+ if (nni_list_active(&pq->eds, ed)) {
+ nni_list_remove(&pq->eds, ed);
+ pq->neds--;
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ NNI_FREE_STRUCT(ed);
+}
+
+
static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
@@ -568,41 +763,6 @@ nni_posix_pipedesc_cancel(nni_aio *aio)
}
-static int
-nni_posix_poll_grow(nni_posix_pollq *pq)
-{
- int grow = pq->npds + pq->neds + 2; // one for us, one for waker
- int noldfds;
- struct pollfd *oldfds;
- struct pollfd *newfds;
-
- if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
- return (0);
- }
-
- grow = grow + 128;
-
- // Maybe we are adding a *lot* of pipes at once, and have to grow
- // multiple times before the poller gets scheduled. In that case
- // toss the old array before we finish.
- oldfds = pq->newfds;
- noldfds = pq->nnewfds;
-
- if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
- return (NNG_ENOMEM);
- }
-
-
- pq->newfds = newfds;
- pq->nnewfds = grow;
-
- if (noldfds != 0) {
- nni_free(oldfds, noldfds * sizeof (struct pollfd));
- }
- return (0);
-}
-
-
static void
nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
{
@@ -611,9 +771,13 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
nni_posix_pollq *pq = pd->pq;
nni_mtx_lock(&pq->mtx);
+ if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
if (pd->fd < 0) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pq->mtx);
- nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
return;
}
// XXX: We really should just make all the FDs nonblocking, but we
@@ -622,13 +786,9 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
(void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
pd->nonblocking = 1;
}
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return;
- }
if (!nni_list_active(&pq->pds, pd)) {
if ((rv = nni_posix_poll_grow(pq)) != 0) {
- nni_aio_finish(aio, rv, aio->a_count);
+ nni_posix_pipedesc_finish(aio, rv);
nni_mtx_unlock(&pq->mtx);
return;
}
diff --git a/src/platform/posix/posix_socket.c b/src/platform/posix/posix_socket.c
index ce887a78..6d2b6c29 100644
--- a/src/platform/posix/posix_socket.c
+++ b/src/platform/posix/posix_socket.c
@@ -46,6 +46,7 @@ struct nni_posix_sock {
int devnull; // for shutting down accept()
char * unlink; // path to unlink at unbind
nni_posix_pipedesc * pd;
+ nni_posix_epdesc * ed;
int tcpnodelay;
};
@@ -213,6 +214,9 @@ nni_posix_sock_fini(nni_posix_sock *s)
if (s->pd != NULL) {
nni_posix_pipedesc_fini(s->pd);
}
+ if (s->ed != NULL) {
+ nni_posix_epdesc_fini(s->ed);
+ }
if (s->unlink != NULL) {
(void) unlink(s->unlink);
nni_free(s->unlink, strlen(s->unlink) + 1);
@@ -498,6 +502,11 @@ nni_posix_sock_connect_sync(nni_posix_sock *s, const nni_sockaddr *addr,
(void) close(fd);
return (rv);
}
+ if (s->pd != NULL) {
+ // If we had a prior pipedesc hanging around, nuke it.
+ nni_posix_pipedesc_fini(s->pd);
+ s->pd = NULL;
+ }
if ((rv = nni_posix_pipedesc_init(&s->pd, fd)) != 0) {
(void) close(fd);
return (rv);