aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_aio.h44
-rw-r--r--src/platform/posix/posix_aiothr.c323
-rw-r--r--src/platform/posix/posix_config.h6
-rw-r--r--src/platform/posix/posix_impl.h5
-rw-r--r--src/platform/posix/posix_ipc.c14
-rw-r--r--src/platform/posix/posix_net.c15
-rw-r--r--src/platform/posix/posix_poll.c575
-rw-r--r--src/platform/posix/posix_thread.c11
8 files changed, 616 insertions, 377 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 797f9e43..9ab322a0 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,43 +18,13 @@
#include "core/nng_impl.h"
-typedef struct nni_posix_aioq nni_posix_aioq;
-typedef struct nni_posix_aiof nni_posix_aiof;
-typedef struct nni_posix_aio_pipe nni_posix_aio_pipe;
-typedef struct nni_posix_aio_ep nni_posix_aio_ep;
-// Head structure representing file operations for read/write. We process
-// the list of aios serially, and each file has its own thread for now.
-struct nni_posix_aioq {
- nni_list aq_aios;
- int aq_fd;
- nni_mtx aq_lk;
- nni_cv aq_cv;
-#ifdef NNG_USE_POSIX_AIOTHR
- nni_thr aq_thr;
-#endif
-};
-
-struct nni_posix_aio_pipe {
- int ap_fd;
- nni_posix_aioq ap_readq;
- nni_posix_aioq ap_writeq;
-};
-
-struct nni_posix_aio_ep {
- int ap_fd;
- nni_posix_aioq ap_q;
-};
-
-extern int nni_posix_aio_pipe_init(nni_posix_aio_pipe *, int);
-extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *);
-
-// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
-// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *);
-extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *);
-
-// extern int nni_posix_aio_connect();
-// extern int nni_posix_aio_accept();
+typedef struct nni_posix_pipedesc nni_posix_pipedesc;
+extern int nni_posix_pipedesc_sysinit(void);
+extern void nni_posix_pipedesc_sysfini(void);
+extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
+extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
+extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
+extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
deleted file mode 100644
index a01fa194..00000000
--- a/src/platform/posix/posix_aiothr.c
+++ /dev/null
@@ -1,323 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-#include "platform/posix/posix_aio.h"
-
-#ifdef NNG_USE_POSIX_AIOTHR
-
-#include <errno.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/uio.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-// POSIX AIO using threads. This allows us to use normal synchronous AIO,
-// along with underlying threads, to simulate asynchronous I/O. This will be
-// unscalable for systems where threads are a finite resource, but it should
-// be sufficient for systems where threads are efficient, and cheap, or for
-// applications that do not need excessive amounts of open files. It also
-// serves as a model upon which we can build more scalable forms of asynch
-// I/O, using non-blocking I/O and pollers.
-
-
-// nni_plat_aiothr_write is used to attempt a write, sending
-// as much as it can. On success, it returns 0, otherwise an errno. It will
-// retry if EINTR is received.
-static int
-nni_plat_aiothr_write(int fd, nni_aio *aio)
-{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- int niov = aio->a_niov;
- int progress = 0;
-
- for (i = 0; i < niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
- }
- iovp = &iovec[0];
- rv = 0;
-
- while (niov != 0) {
- n = writev(fd, iovp, niov);
- if (n < 0) {
- if (errno == EINTR) {
- continue;
- }
- rv = nni_plat_errno(errno);
- break;
- }
-
- aio->a_count += n;
- progress += n;
- while (n) {
- // If we didn't finish it yet, try again.
- if (n < iovp->iov_len) {
- iovp->iov_len -= n;
- iovp->iov_base += n;
- break;
- }
-
- n -= iovp->iov_len;
- iovp++;
- niov--;
- }
- }
-
- // Either we got it all, or we didn't.
- if ((rv != 0) && (progress != 0)) {
- for (i = 0; i < niov; i++) {
- aio->a_iov[i].iov_len = iovp[i].iov_len;
- aio->a_iov[i].iov_buf = iovp[i].iov_base;
- }
- aio->a_niov = niov;
- }
-
- return (rv);
-}
-
-
-// nni_plat_aiothr_read is used to attempt a read, sending as much as it can
-// (limited by the requested read). On success, it returns 0, otherwise an
-// errno. It will retry if EINTR is received.
-static int
-nni_plat_aiothr_read(int fd, nni_aio *aio)
-{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- int niov = aio->a_niov;
- int progress = 0;
-
- for (i = 0; i < niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
- }
- iovp = &iovec[0];
- rv = 0;
-
- while (niov != 0) {
- n = readv(fd, iovp, niov);
- if (n < 0) {
- if (errno == EINTR) {
- continue;
- }
- rv = nni_plat_errno(errno);
- break;
- }
- if (n == 0) {
- rv = NNG_ECLOSED;
- break;
- }
-
- aio->a_count += n;
- progress += n;
- while (n) {
- // If we didn't finish it yet, try again.
- if (n < iovp->iov_len) {
- iovp->iov_len -= n;
- iovp->iov_base += n;
- break;
- }
-
- n -= iovp->iov_len;
- iovp++;
- niov--;
- }
- }
-
- // Either we got it all, or we didn't.
- if ((rv != 0) && (progress != 0)) {
- for (i = 0; i < niov; i++) {
- aio->a_iov[i].iov_len = iovp[i].iov_len;
- aio->a_iov[i].iov_buf = iovp[i].iov_base;
- }
- aio->a_niov = niov;
- }
-
- return (rv);
-}
-
-
-static void
-nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *))
-{
- nni_aio *aio;
- int rv;
-
- nni_mtx_lock(&q->aq_lk);
- for (;;) {
- if (q->aq_fd < 0) {
- break;
- }
- if ((aio = nni_list_first(&q->aq_aios)) == NULL) {
- nni_cv_wait(&q->aq_cv);
- continue;
- }
- rv = fn(q->aq_fd, aio);
- if (rv == NNG_EAGAIN) {
- continue;
- }
- if (rv == NNG_ECLOSED) {
- break;
- }
-
- nni_list_remove(&q->aq_aios, aio);
-
- // Call the callback.
- nni_aio_finish(aio, rv, aio->a_count);
- }
-
- while ((aio = nni_list_first(&q->aq_aios)) != NULL) {
- nni_list_remove(&q->aq_aios, aio);
- nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
- }
-
- nni_mtx_unlock(&q->aq_lk);
-}
-
-
-static void
-nni_plat_aiothr_readthr(void *arg)
-{
- nni_plat_aiothr_dothr(arg, nni_plat_aiothr_read);
-}
-
-
-static void
-nni_plat_aiothr_writethr(void *arg)
-{
- nni_plat_aiothr_dothr(arg, nni_plat_aiothr_write);
-}
-
-
-static int
-nni_posix_aioq_init(nni_posix_aioq *q, int fd, nni_cb cb)
-{
- int rv;
-
- NNI_LIST_INIT(&q->aq_aios, nni_aio, a_prov_node);
- if ((rv = nni_mtx_init(&q->aq_lk)) != 0) {
- return (rv);
- }
- if ((rv = nni_cv_init(&q->aq_cv, &q->aq_lk)) != 0) {
- nni_mtx_fini(&q->aq_lk);
- return (rv);
- }
- if ((rv = nni_thr_init(&q->aq_thr, cb, q)) != 0) {
- nni_cv_fini(&q->aq_cv);
- nni_mtx_fini(&q->aq_lk);
- return (rv);
- }
- q->aq_fd = fd;
- return (0);
-}
-
-
-static void
-nni_posix_aioq_start(nni_posix_aioq *q)
-{
- nni_thr_run(&q->aq_thr);
-}
-
-
-static void
-nni_posix_aioq_fini(nni_posix_aioq *q)
-{
- if (q->aq_fd > 0) {
- nni_mtx_lock(&q->aq_lk);
- q->aq_fd = -1;
- nni_cv_wake(&q->aq_cv);
- nni_mtx_unlock(&q->aq_lk);
-
- nni_thr_fini(&q->aq_thr);
- nni_cv_fini(&q->aq_cv);
- nni_mtx_fini(&q->aq_lk);
- }
-}
-
-
-int
-nni_posix_aio_pipe_init(nni_posix_aio_pipe *p, int fd)
-{
- int rv;
-
- rv = nni_posix_aioq_init(&p->ap_readq, fd, nni_plat_aiothr_readthr);
- if (rv != 0) {
- return (rv);
- }
- rv = nni_posix_aioq_init(&p->ap_writeq, fd, nni_plat_aiothr_writethr);
- if (rv != 0) {
- nni_posix_aioq_fini(&p->ap_readq);
- return (rv);
- }
- nni_posix_aioq_start(&p->ap_readq);
- nni_posix_aioq_start(&p->ap_writeq);
- return (0);
-}
-
-
-void
-nni_posix_aio_pipe_fini(nni_posix_aio_pipe *p)
-{
- nni_posix_aioq_fini(&p->ap_readq);
- nni_posix_aioq_fini(&p->ap_writeq);
-}
-
-
-// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
-// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-
-static int
-nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio)
-{
- nni_mtx_lock(&q->aq_lk);
- if (q->aq_fd < 0) {
- nni_mtx_unlock(&q->aq_lk);
- return (NNG_ECLOSED);
- }
- nni_list_append(&q->aq_aios, aio);
- nni_cv_wake(&q->aq_cv);
- nni_mtx_unlock(&q->aq_lk);
- return (0);
-}
-
-
-int
-nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio)
-{
- return (nni_posix_aio_submit(&p->ap_readq, aio));
-}
-
-
-int
-nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio)
-{
- return (nni_posix_aio_submit(&p->ap_writeq, aio));
-}
-
-
-// extern int nni_posix_aio_connect();
-// extern int nni_posix_aio_accept();
-
-#else
-
-// Suppress empty symbols warnings in ranlib.
-int nni_posix_aiothr_not_used = 0;
-
-#endif // NNG_USE_POSIX_AIOTHR
diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h
index 34ab2c6d..bf121243 100644
--- a/src/platform/posix/posix_config.h
+++ b/src/platform/posix/posix_config.h
@@ -53,9 +53,9 @@
#ifndef CLOCK_REALTIME
#define NNG_USE_GETTIMEOFDAY
#elif !defined(NNG_USE_CLOCKID)
-#define NNG_USE_CLOCKID CLOCK_MONOTONIC
+#define NNG_USE_CLOCKID CLOCK_MONOTONIC
#else
-#define NNG_USE_CLOCKID CLOCK_REALTIME
+#define NNG_USE_CLOCKID CLOCK_REALTIME
#endif // CLOCK_REALTIME
-#define NNG_USE_POSIX_AIOTHR 1
+#define NNG_USE_POSIX_AIOPOLL 1
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 1030b6de..9c8d00dc 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -34,7 +34,6 @@ extern int nni_plat_errno(int);
#endif
-
// Define types that this platform uses.
#ifdef PLATFORM_POSIX_THREAD
@@ -64,4 +63,8 @@ struct nni_plat_cv {
#endif
+
+extern int nni_posix_pipedesc_sysinit(void);
+extern void nni_posix_pipedesc_sysfini(void);
+
#endif // PLATFORM_POSIX_IMPL_H
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index e75edeca..ccf19fed 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -34,7 +34,7 @@ struct nni_plat_ipcsock {
int fd;
int devnull; // for shutting down accept()
char * unlink; // path to unlink at fini
- nni_posix_aio_pipe aiop;
+ nni_posix_pipedesc * pd;
};
#ifdef SOCK_CLOEXEC
@@ -69,14 +69,14 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path)
int
nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_aio_write(&isp->aiop, aio));
+ return (nni_posix_pipedesc_write(isp->pd, aio));
}
int
nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_aio_read(&isp->aiop, aio));
+ return (nni_posix_pipedesc_read(isp->pd, aio));
}
@@ -225,7 +225,9 @@ nni_plat_ipc_fini(nni_plat_ipcsock *isp)
nni_free(isp->unlink, strlen(isp->unlink) + 1);
}
- nni_posix_aio_pipe_fini(&isp->aiop);
+ if (isp->pd != NULL) {
+ nni_posix_pipedesc_fini(isp->pd);
+ }
NNI_FREE_STRUCT(isp);
}
@@ -338,7 +340,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path)
return (rv);
}
- if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
@@ -380,7 +382,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server)
nni_plat_ipc_setopts(fd);
- if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 94dc2667..c8b7766e 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -34,7 +34,7 @@
struct nni_plat_tcpsock {
int fd;
int devnull; // for shutting down accept()
- nni_posix_aio_pipe aiop;
+ nni_posix_pipedesc * pd;
};
static int
@@ -173,14 +173,14 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
int
nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_aio_write(&s->aiop, aio));
+ return (nni_posix_pipedesc_write(s->pd, aio));
}
int
nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_aio_read(&s->aiop, aio));
+ return (nni_posix_pipedesc_read(s->pd, aio));
}
@@ -282,7 +282,9 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
(void) close(tsp->fd);
tsp->fd = -1;
}
- nni_posix_aio_pipe_fini(&tsp->aiop);
+ if (tsp->pd != NULL) {
+ nni_posix_pipedesc_fini(tsp->pd);
+ }
NNI_FREE_STRUCT(tsp);
}
@@ -387,7 +389,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr,
(void) close(fd);
return (rv);
}
- if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
@@ -421,11 +423,10 @@ nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server)
nni_plat_tcp_setopts(fd);
- if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
close(fd);
return (rv);
}
-
tsp->fd = fd;
return (0);
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
new file mode 100644
index 00000000..a6024db0
--- /dev/null
+++ b/src/platform/posix/posix_poll.c
@@ -0,0 +1,575 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
+
+#ifdef NNG_USE_POSIX_AIOPOLL
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+// POSIX AIO using poll(). We use a single poll thread to perform
+// I/O operations for the entire system. This isn't entirely scalable,
+// and it might be a good idea to create a few threads and group the
+// I/O operations into separate pollers to limit the amount of work each
+// thread does, and to scale across multiple cores. For now we don't
+// worry about it.
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
+// file descriptor for TCP socket, etc.) This contains the list of pending
+// aios for that underlying socket, as well as the socket itself.
+struct nni_posix_pipedesc {
+ int fd;
+ int index;
+ nni_list readq;
+ nni_list writeq;
+ nni_list_node node;
+ nni_posix_pollq * pq;
+};
+
+// nni_posix_pollq is a work structure used by the poller thread, that keeps
+// track of all the underlying pipe handles and so forth being used by poll().
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ struct pollfd * fds;
+ struct pollfd * newfds;
+ int nfds;
+ int nnewfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list pds; // linked list of nni_posix_pipedescs.
+ int npds; // number of pipe descriptors
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
+
+static void
+nni_posix_poll_finish(nni_aio *aio, int rv)
+{
+ nni_posix_pipedesc *pd;
+
+ pd = aio->a_prov_data;
+ if (nni_list_active(&pd->readq, aio)) {
+ nni_list_remove(&pd->readq, aio);
+ }
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+ nni_aio_finish(aio, rv, aio->a_count);
+}
+
+
+static void
+nni_posix_poll_write(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = writev(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+ nni_list_remove(&pd->writeq, aio);
+
+ nni_posix_poll_finish(aio, rv);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_list_remove(&pd->writeq, aio);
+ nni_posix_poll_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_poll_read(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = readv(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+
+ nni_posix_poll_finish(aio, rv);
+ return;
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_posix_poll_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_poll_close(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ }
+}
+
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pollq = arg;
+ nni_posix_pipedesc *pd, *nextpd;
+
+ nni_mtx_lock(&pollq->mtx);
+ for (;;) {
+ int rv;
+ int nfds;
+ struct pollfd *fds;
+
+ if (pollq->close) {
+ break;
+ }
+
+ if (pollq->newfds != NULL) {
+ // We have "grown" by the caller. Free up the old
+ // space, and start using the new.
+ nni_free(pollq->fds,
+ pollq->nfds * sizeof (struct pollfd));
+ pollq->fds = pollq->newfds;
+ pollq->nfds = pollq->nnewfds;
+ pollq->newfds = NULL;
+ }
+ fds = pollq->fds;
+ nfds = 0;
+
+ // The waker pipe is set up so that we will be woken
+ // when it is written (this allows us to be signaled).
+ fds[nfds].fd = pollq->wakerfd;
+ fds[nfds].events = POLLIN;
+ fds[nfds].revents = 0;
+ nfds++;
+
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pollq->pds, pd) {
+ fds[nfds].fd = pd->fd;
+ fds[nfds].events = 0;
+ fds[nfds].revents = 0;
+ if (nni_list_first(&pd->readq) != NULL) {
+ fds[nfds].events |= POLLIN;
+ }
+ if (nni_list_first(&pd->writeq) != NULL) {
+ fds[nfds].events |= POLLOUT;
+ }
+ pd->index = nfds;
+ nfds++;
+ }
+
+
+ // Now poll it. We block indefinitely, since we use separate
+ // timeouts to wake and remove the elements from the list.
+ nni_mtx_unlock(&pollq->mtx);
+ rv = poll(fds, nfds, -1);
+ nni_mtx_lock(&pollq->mtx);
+
+ if (rv < 0) {
+ // This shouldn't happen really. If it does, we
+ // just try again. (EINTR is probably the only
+ // reasonable failure here, unless internal memory
+ // allocations fail in the kernel, leading to EAGAIN.)
+ continue;
+ }
+
+
+ // If the waker pipe was signaled, read from it.
+ if (fds[0].revents & POLLIN) {
+ NNI_ASSERT(fds[0].fd == pollq->wakerfd);
+ nni_plat_pipe_clear(pollq->wakerfd);
+ }
+
+ // Now we iterate through all the pipedescs. Note that one
+ // may have been added or removed. New pipedescs will have
+ // their index set to -1. Removed ones will just be absent.
+ // Note that we may remove the pipedesc from the list, so we
+ // have to use a custom iterator.
+ nextpd = nni_list_first(&pollq->pds);
+ while ((pd = nextpd) != NULL) {
+ int index;
+
+ // Save the nextpd for our next iteration. This
+ // way we can remove the PD from the list without
+ // breaking the iteration.
+
+ nextpd = nni_list_next(&pollq->pds, pd);
+ if ((index = pd->index) < 1) {
+ continue;
+ }
+ pd->index = 0;
+ if (fds[index].revents & POLLIN) {
+ // process the read q.
+ nni_posix_poll_read(pd);
+ }
+ if (fds[index].revents & POLLOUT) {
+ // process the write q.
+ nni_posix_poll_write(pd);
+ }
+ if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ // the pipe is closed. wake all the
+ // aios with NNG_ECLOSED.
+ nni_posix_poll_close(pd);
+ }
+
+ // If we have completed all the AIOs outstanding,
+ // then remove this pipedesc from the pollq.
+ if ((nni_list_first(&pd->readq) == NULL) &&
+ (nni_list_first(&pd->writeq) == NULL)) {
+ nni_list_remove(&pollq->pds, pd);
+ }
+ }
+ }
+ nni_mtx_unlock(&pollq->mtx);
+}
+
+
+static void
+nni_posix_pipedesc_cancel(nni_aio *aio)
+{
+ nni_posix_pipedesc *pd;
+ nni_posix_pollq *pq;
+
+ pd = aio->a_prov_data;
+ pq = pd->pq;
+
+ nni_mtx_lock(&pq->mtx);
+ // This will remove the aio from either the read or the write
+ // list; it doesn't matter which.
+ if (nni_list_active(&pd->readq, aio)) {
+ nni_list_remove(&pd->readq, aio);
+ }
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+static int
+nni_posix_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->npds + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
+static void
+nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
+{
+ int wake;
+ int rv;
+ nni_posix_pollq *pq = pd->pq;
+
+ (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+
+ nni_mtx_lock(&pq->mtx);
+ if (!nni_list_active(&pq->pds, pd)) {
+ if ((rv = nni_posix_poll_grow(pq)) != 0) {
+ nni_aio_finish(aio, rv, aio->a_count);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ nni_list_append(&pq->pds, pd);
+ }
+ NNI_ASSERT(!nni_list_active(l, aio));
+ aio->a_prov_data = pd;
+ aio->a_prov_cancel = nni_posix_pipedesc_cancel;
+ // Only wake if we aren't already waiting for this type of I/O on
+ // this descriptor.
+ wake = nni_list_first(l) == NULL ? 1 : 0;
+ nni_list_append(l, aio);
+
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
+{
+ nni_posix_pipedesc *pd;
+
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the pd to a single pollq we may get some kind of cache warmth.
+
+ pd->pq = &nni_posix_global_pollq;
+ pd->fd = fd;
+ pd->index = 0;
+
+ NNI_LIST_INIT(&pd->readq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&pd->writeq, nni_aio, a_prov_node);
+
+ *pdp = pd;
+ return (0);
+}
+
+
+void
+nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+ nni_posix_pollq *pq = pd->pq;
+
+ nni_mtx_lock(&pq->mtx);
+
+ // This removes any aios from our list.
+ nni_posix_poll_close(pd);
+
+ if (nni_list_active(&pq->pds, pd)) {
+ nni_list_remove(&pq->pds, pd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ NNI_FREE_STRUCT(pd);
+}
+
+
+static void
+nni_posix_pollq_fini(nni_posix_pollq *pq)
+{
+ if (pq->started) {
+ nni_mtx_lock(&pq->mtx);
+ pq->close = 1;
+ pq->started = 0;
+ nni_plat_pipe_raise(pq->wakewfd);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_first(&pq->pds) == NULL);
+ nni_mtx_unlock(&pq->mtx);
+ }
+
+ nni_thr_fini(&pq->thr);
+ if (pq->wakewfd >= 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ pq->wakewfd = pq->wakerfd = -1;
+ }
+ nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd));
+ nni_free(pq->fds, pq->nfds * sizeof (struct pollfd));
+ nni_mtx_fini(&pq->mtx);
+}
+
+
+static int
+nni_posix_pollq_init(nni_posix_pollq *pq)
+{
+ int rv;
+
+ NNI_LIST_INIT(&pq->pds, nni_posix_pipedesc, node);
+ pq->wakewfd = -1;
+ pq->wakerfd = -1;
+ pq->close = 0;
+
+ if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
+ ((rv = nni_posix_poll_grow(pq)) != 0) ||
+ ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
+ ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
+ nni_posix_pollq_fini(pq);
+ return (rv);
+ }
+ pq->started = 1;
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+
+int
+nni_posix_pipedesc_sysinit(void)
+{
+ int rv;
+
+ rv = nni_posix_pollq_init(&nni_posix_global_pollq);
+ return (rv);
+}
+
+
+void
+nni_posix_pipedesc_sysfini(void)
+{
+ nni_posix_pollq_fini(&nni_posix_global_pollq);
+}
+
+
+// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
+// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
+
+int
+nni_posix_pipedesc_read(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ aio->a_count = 0;
+
+ nni_posix_pipedesc_submit(pd, &pd->readq, aio);
+ return (0);
+}
+
+
+int
+nni_posix_pipedesc_write(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ aio->a_count = 0;
+ nni_posix_pipedesc_submit(pd, &pd->writeq, aio);
+ return (0);
+}
+
+
+// extern int nni_posix_aio_connect();
+// extern int nni_posix_aio_accept();
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_poll_not_used = 0;
+
+#endif // NNG_USE_POSIX_AIOPOLL
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index de491c01..b052ed53 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -299,8 +299,18 @@ nni_plat_init(int (*helper)(void))
// as scalable / thrifty with our use of VM.
(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ if ((rv = nni_posix_pipedesc_sysinit()) != 0) {
+ pthread_mutex_unlock(&nni_plat_lock);
+ (void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
+ return (rv);
+ }
+
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
+ nni_posix_pipedesc_sysfini();
(void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
@@ -322,6 +332,7 @@ nni_plat_fini(void)
{
pthread_mutex_lock(&nni_plat_lock);
if (nni_plat_inited) {
+ nni_posix_pipedesc_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
pthread_attr_destroy(&nni_pthread_attr);