aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-27 20:06:42 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-27 20:06:42 -0700
commite2d17be2a7081888aaafea150d587a7ef9517e17 (patch)
treed438d8c0e401dd70d81a003846c53568f1730e3a
parentac80ef7c3b1caa2f1fe3b093bef825363675bcb3 (diff)
downloadnng-e2d17be2a7081888aaafea150d587a7ef9517e17.tar.gz
nng-e2d17be2a7081888aaafea150d587a7ef9517e17.tar.bz2
nng-e2d17be2a7081888aaafea150d587a7ef9517e17.zip
Convert to POSIX polled I/O for async; start of cancelable aio.
This eliminates the two threads per pipe that were being used to provide basic I/O handling, replacing them with a single global thread for now, that uses poll and nonblocking I/O. This should lead to great scalability. The infrastructure is in place to easily expand to multiple polling worker threads. Some thought needs to be given about how to scale this to engage multiple CPUs. Horizontal scaling may also shorten the poll() lists easing C10K problem. We should look into better solutions than poll() for platforms that have them (epoll on Linux, kqueue on BSD, and event ports on illumos). Note that the file descriptors start out in blocking mode for now, but then are placed into non-blocking mode. This is because the negotiation phase is not yet callback driven, and so needs to be synchronous.
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/aio.h1
-rw-r--r--src/core/timer.c1
-rw-r--r--src/platform/posix/posix_aio.h44
-rw-r--r--src/platform/posix/posix_aiothr.c323
-rw-r--r--src/platform/posix/posix_config.h6
-rw-r--r--src/platform/posix/posix_impl.h5
-rw-r--r--src/platform/posix/posix_ipc.c14
-rw-r--r--src/platform/posix/posix_net.c15
-rw-r--r--src/platform/posix/posix_poll.c575
-rw-r--r--src/platform/posix/posix_thread.c11
11 files changed, 619 insertions, 378 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ac45fd68..4c32d5ce 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -82,12 +82,12 @@ set (NNG_SOURCES
platform/posix/posix_aio.h
platform/posix/posix_alloc.c
- platform/posix/posix_aiothr.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
platform/posix/posix_ipc.c
platform/posix/posix_net.c
platform/posix/posix_pipe.c
+ platform/posix/posix_poll.c
platform/posix/posix_rand.c
platform/posix/posix_thread.c
diff --git a/src/core/aio.h b/src/core/aio.h
index c377ca93..a5f78b3f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -41,6 +41,7 @@ struct nni_aio {
// TBD: Resolver operations.
// Provider-use fields.
+ void (*a_prov_cancel)(nni_aio *);
void * a_prov_data;
nni_list_node a_prov_node;
};
diff --git a/src/core/timer.c b/src/core/timer.c
index 0224ce75..085a297a 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -14,6 +14,7 @@
static void nni_timer_loop(void *);
+// XXX: replace this timer list with a minHeap based priority queue.
struct nni_timer {
nni_mtx t_mx;
nni_cv t_cv;
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 797f9e43..9ab322a0 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,43 +18,13 @@
#include "core/nng_impl.h"
-typedef struct nni_posix_aioq nni_posix_aioq;
-typedef struct nni_posix_aiof nni_posix_aiof;
-typedef struct nni_posix_aio_pipe nni_posix_aio_pipe;
-typedef struct nni_posix_aio_ep nni_posix_aio_ep;
-// Head structure representing file operations for read/write. We process
-// the list of aios serially, and each file has its own thread for now.
-struct nni_posix_aioq {
- nni_list aq_aios;
- int aq_fd;
- nni_mtx aq_lk;
- nni_cv aq_cv;
-#ifdef NNG_USE_POSIX_AIOTHR
- nni_thr aq_thr;
-#endif
-};
-
-struct nni_posix_aio_pipe {
- int ap_fd;
- nni_posix_aioq ap_readq;
- nni_posix_aioq ap_writeq;
-};
-
-struct nni_posix_aio_ep {
- int ap_fd;
- nni_posix_aioq ap_q;
-};
-
-extern int nni_posix_aio_pipe_init(nni_posix_aio_pipe *, int);
-extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *);
-
-// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
-// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *);
-extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *);
-
-// extern int nni_posix_aio_connect();
-// extern int nni_posix_aio_accept();
+typedef struct nni_posix_pipedesc nni_posix_pipedesc;
+extern int nni_posix_pipedesc_sysinit(void);
+extern void nni_posix_pipedesc_sysfini(void);
+extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
+extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
+extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
+extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
deleted file mode 100644
index a01fa194..00000000
--- a/src/platform/posix/posix_aiothr.c
+++ /dev/null
@@ -1,323 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-#include "platform/posix/posix_aio.h"
-
-#ifdef NNG_USE_POSIX_AIOTHR
-
-#include <errno.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/uio.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-// POSIX AIO using threads. This allows us to use normal synchronous AIO,
-// along with underlying threads, to simulate asynchronous I/O. This will be
-// unscalable for systems where threads are a finite resource, but it should
-// be sufficient for systems where threads are efficient, and cheap, or for
-// applications that do not need excessive amounts of open files. It also
-// serves as a model upon which we can build more scalable forms of asynch
-// I/O, using non-blocking I/O and pollers.
-
-
-// nni_plat_aiothr_write is used to attempt a write, sending
-// as much as it can. On success, it returns 0, otherwise an errno. It will
-// retry if EINTR is received.
-static int
-nni_plat_aiothr_write(int fd, nni_aio *aio)
-{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- int niov = aio->a_niov;
- int progress = 0;
-
- for (i = 0; i < niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
- }
- iovp = &iovec[0];
- rv = 0;
-
- while (niov != 0) {
- n = writev(fd, iovp, niov);
- if (n < 0) {
- if (errno == EINTR) {
- continue;
- }
- rv = nni_plat_errno(errno);
- break;
- }
-
- aio->a_count += n;
- progress += n;
- while (n) {
- // If we didn't finish it yet, try again.
- if (n < iovp->iov_len) {
- iovp->iov_len -= n;
- iovp->iov_base += n;
- break;
- }
-
- n -= iovp->iov_len;
- iovp++;
- niov--;
- }
- }
-
- // Either we got it all, or we didn't.
- if ((rv != 0) && (progress != 0)) {
- for (i = 0; i < niov; i++) {
- aio->a_iov[i].iov_len = iovp[i].iov_len;
- aio->a_iov[i].iov_buf = iovp[i].iov_base;
- }
- aio->a_niov = niov;
- }
-
- return (rv);
-}
-
-
-// nni_plat_aiothr_read is used to attempt a read, sending as much as it can
-// (limited by the requested read). On success, it returns 0, otherwise an
-// errno. It will retry if EINTR is received.
-static int
-nni_plat_aiothr_read(int fd, nni_aio *aio)
-{
- int n;
- int rv;
- int i;
- struct iovec iovec[4];
- struct iovec *iovp;
- int niov = aio->a_niov;
- int progress = 0;
-
- for (i = 0; i < niov; i++) {
- iovec[i].iov_len = aio->a_iov[i].iov_len;
- iovec[i].iov_base = aio->a_iov[i].iov_buf;
- }
- iovp = &iovec[0];
- rv = 0;
-
- while (niov != 0) {
- n = readv(fd, iovp, niov);
- if (n < 0) {
- if (errno == EINTR) {
- continue;
- }
- rv = nni_plat_errno(errno);
- break;
- }
- if (n == 0) {
- rv = NNG_ECLOSED;
- break;
- }
-
- aio->a_count += n;
- progress += n;
- while (n) {
- // If we didn't finish it yet, try again.
- if (n < iovp->iov_len) {
- iovp->iov_len -= n;
- iovp->iov_base += n;
- break;
- }
-
- n -= iovp->iov_len;
- iovp++;
- niov--;
- }
- }
-
- // Either we got it all, or we didn't.
- if ((rv != 0) && (progress != 0)) {
- for (i = 0; i < niov; i++) {
- aio->a_iov[i].iov_len = iovp[i].iov_len;
- aio->a_iov[i].iov_buf = iovp[i].iov_base;
- }
- aio->a_niov = niov;
- }
-
- return (rv);
-}
-
-
-static void
-nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *))
-{
- nni_aio *aio;
- int rv;
-
- nni_mtx_lock(&q->aq_lk);
- for (;;) {
- if (q->aq_fd < 0) {
- break;
- }
- if ((aio = nni_list_first(&q->aq_aios)) == NULL) {
- nni_cv_wait(&q->aq_cv);
- continue;
- }
- rv = fn(q->aq_fd, aio);
- if (rv == NNG_EAGAIN) {
- continue;
- }
- if (rv == NNG_ECLOSED) {
- break;
- }
-
- nni_list_remove(&q->aq_aios, aio);
-
- // Call the callback.
- nni_aio_finish(aio, rv, aio->a_count);
- }
-
- while ((aio = nni_list_first(&q->aq_aios)) != NULL) {
- nni_list_remove(&q->aq_aios, aio);
- nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
- }
-
- nni_mtx_unlock(&q->aq_lk);
-}
-
-
-static void
-nni_plat_aiothr_readthr(void *arg)
-{
- nni_plat_aiothr_dothr(arg, nni_plat_aiothr_read);
-}
-
-
-static void
-nni_plat_aiothr_writethr(void *arg)
-{
- nni_plat_aiothr_dothr(arg, nni_plat_aiothr_write);
-}
-
-
-static int
-nni_posix_aioq_init(nni_posix_aioq *q, int fd, nni_cb cb)
-{
- int rv;
-
- NNI_LIST_INIT(&q->aq_aios, nni_aio, a_prov_node);
- if ((rv = nni_mtx_init(&q->aq_lk)) != 0) {
- return (rv);
- }
- if ((rv = nni_cv_init(&q->aq_cv, &q->aq_lk)) != 0) {
- nni_mtx_fini(&q->aq_lk);
- return (rv);
- }
- if ((rv = nni_thr_init(&q->aq_thr, cb, q)) != 0) {
- nni_cv_fini(&q->aq_cv);
- nni_mtx_fini(&q->aq_lk);
- return (rv);
- }
- q->aq_fd = fd;
- return (0);
-}
-
-
-static void
-nni_posix_aioq_start(nni_posix_aioq *q)
-{
- nni_thr_run(&q->aq_thr);
-}
-
-
-static void
-nni_posix_aioq_fini(nni_posix_aioq *q)
-{
- if (q->aq_fd > 0) {
- nni_mtx_lock(&q->aq_lk);
- q->aq_fd = -1;
- nni_cv_wake(&q->aq_cv);
- nni_mtx_unlock(&q->aq_lk);
-
- nni_thr_fini(&q->aq_thr);
- nni_cv_fini(&q->aq_cv);
- nni_mtx_fini(&q->aq_lk);
- }
-}
-
-
-int
-nni_posix_aio_pipe_init(nni_posix_aio_pipe *p, int fd)
-{
- int rv;
-
- rv = nni_posix_aioq_init(&p->ap_readq, fd, nni_plat_aiothr_readthr);
- if (rv != 0) {
- return (rv);
- }
- rv = nni_posix_aioq_init(&p->ap_writeq, fd, nni_plat_aiothr_writethr);
- if (rv != 0) {
- nni_posix_aioq_fini(&p->ap_readq);
- return (rv);
- }
- nni_posix_aioq_start(&p->ap_readq);
- nni_posix_aioq_start(&p->ap_writeq);
- return (0);
-}
-
-
-void
-nni_posix_aio_pipe_fini(nni_posix_aio_pipe *p)
-{
- nni_posix_aioq_fini(&p->ap_readq);
- nni_posix_aioq_fini(&p->ap_writeq);
-}
-
-
-// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
-// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-
-static int
-nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio)
-{
- nni_mtx_lock(&q->aq_lk);
- if (q->aq_fd < 0) {
- nni_mtx_unlock(&q->aq_lk);
- return (NNG_ECLOSED);
- }
- nni_list_append(&q->aq_aios, aio);
- nni_cv_wake(&q->aq_cv);
- nni_mtx_unlock(&q->aq_lk);
- return (0);
-}
-
-
-int
-nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio)
-{
- return (nni_posix_aio_submit(&p->ap_readq, aio));
-}
-
-
-int
-nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio)
-{
- return (nni_posix_aio_submit(&p->ap_writeq, aio));
-}
-
-
-// extern int nni_posix_aio_connect();
-// extern int nni_posix_aio_accept();
-
-#else
-
-// Suppress empty symbols warnings in ranlib.
-int nni_posix_aiothr_not_used = 0;
-
-#endif // NNG_USE_POSIX_AIOTHR
diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h
index 34ab2c6d..bf121243 100644
--- a/src/platform/posix/posix_config.h
+++ b/src/platform/posix/posix_config.h
@@ -53,9 +53,9 @@
#ifndef CLOCK_REALTIME
#define NNG_USE_GETTIMEOFDAY
#elif !defined(NNG_USE_CLOCKID)
-#define NNG_USE_CLOCKID CLOCK_MONOTONIC
+#define NNG_USE_CLOCKID CLOCK_MONOTONIC
#else
-#define NNG_USE_CLOCKID CLOCK_REALTIME
+#define NNG_USE_CLOCKID CLOCK_REALTIME
#endif // CLOCK_REALTIME
-#define NNG_USE_POSIX_AIOTHR 1
+#define NNG_USE_POSIX_AIOPOLL 1
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 1030b6de..9c8d00dc 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -34,7 +34,6 @@ extern int nni_plat_errno(int);
#endif
-
// Define types that this platform uses.
#ifdef PLATFORM_POSIX_THREAD
@@ -64,4 +63,8 @@ struct nni_plat_cv {
#endif
+
+extern int nni_posix_pipedesc_sysinit(void);
+extern void nni_posix_pipedesc_sysfini(void);
+
#endif // PLATFORM_POSIX_IMPL_H
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index e75edeca..ccf19fed 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -34,7 +34,7 @@ struct nni_plat_ipcsock {
int fd;
int devnull; // for shutting down accept()
char * unlink; // path to unlink at fini
- nni_posix_aio_pipe aiop;
+ nni_posix_pipedesc * pd;
};
#ifdef SOCK_CLOEXEC
@@ -69,14 +69,14 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path)
int
nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_aio_write(&isp->aiop, aio));
+ return (nni_posix_pipedesc_write(isp->pd, aio));
}
int
nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio)
{
- return (nni_posix_aio_read(&isp->aiop, aio));
+ return (nni_posix_pipedesc_read(isp->pd, aio));
}
@@ -225,7 +225,9 @@ nni_plat_ipc_fini(nni_plat_ipcsock *isp)
nni_free(isp->unlink, strlen(isp->unlink) + 1);
}
- nni_posix_aio_pipe_fini(&isp->aiop);
+ if (isp->pd != NULL) {
+ nni_posix_pipedesc_fini(isp->pd);
+ }
NNI_FREE_STRUCT(isp);
}
@@ -338,7 +340,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path)
return (rv);
}
- if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
@@ -380,7 +382,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server)
nni_plat_ipc_setopts(fd);
- if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 94dc2667..c8b7766e 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -34,7 +34,7 @@
struct nni_plat_tcpsock {
int fd;
int devnull; // for shutting down accept()
- nni_posix_aio_pipe aiop;
+ nni_posix_pipedesc * pd;
};
static int
@@ -173,14 +173,14 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
int
nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_aio_write(&s->aiop, aio));
+ return (nni_posix_pipedesc_write(s->pd, aio));
}
int
nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
{
- return (nni_posix_aio_read(&s->aiop, aio));
+ return (nni_posix_pipedesc_read(s->pd, aio));
}
@@ -282,7 +282,9 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
(void) close(tsp->fd);
tsp->fd = -1;
}
- nni_posix_aio_pipe_fini(&tsp->aiop);
+ if (tsp->pd != NULL) {
+ nni_posix_pipedesc_fini(tsp->pd);
+ }
NNI_FREE_STRUCT(tsp);
}
@@ -387,7 +389,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr,
(void) close(fd);
return (rv);
}
- if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
(void) close(fd);
return (rv);
}
@@ -421,11 +423,10 @@ nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server)
nni_plat_tcp_setopts(fd);
- if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) {
close(fd);
return (rv);
}
-
tsp->fd = fd;
return (0);
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
new file mode 100644
index 00000000..a6024db0
--- /dev/null
+++ b/src/platform/posix/posix_poll.c
@@ -0,0 +1,575 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
+
+#ifdef NNG_USE_POSIX_AIOPOLL
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+// POSIX AIO using poll(). We use a single poll thread to perform
+// I/O operations for the entire system. This isn't entirely scalable,
+// and it might be a good idea to create a few threads and group the
+// I/O operations into separate pollers to limit the amount of work each
+// thread does, and to scale across multiple cores. For now we don't
+// worry about it.
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
+// file descriptor for TCP socket, etc.) This contains the list of pending
+// aios for that underlying socket, as well as the socket itself.
+struct nni_posix_pipedesc {
+ int fd;
+ int index;
+ nni_list readq;
+ nni_list writeq;
+ nni_list_node node;
+ nni_posix_pollq * pq;
+};
+
+// nni_posix_pollq is a work structure used by the poller thread, that keeps
+// track of all the underlying pipe handles and so forth being used by poll().
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ struct pollfd * fds;
+ struct pollfd * newfds;
+ int nfds;
+ int nnewfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list pds; // linked list of nni_posix_pipedescs.
+ int npds; // number of pipe descriptors
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
+
+static void
+nni_posix_poll_finish(nni_aio *aio, int rv)
+{
+ nni_posix_pipedesc *pd;
+
+ pd = aio->a_prov_data;
+ if (nni_list_active(&pd->readq, aio)) {
+ nni_list_remove(&pd->readq, aio);
+ }
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+ nni_aio_finish(aio, rv, aio->a_count);
+}
+
+
+static void
+nni_posix_poll_write(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = writev(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+ nni_list_remove(&pd->writeq, aio);
+
+ nni_posix_poll_finish(aio, rv);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_list_remove(&pd->writeq, aio);
+ nni_posix_poll_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_poll_read(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = readv(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+
+ nni_posix_poll_finish(aio, rv);
+ return;
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_posix_poll_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_poll_close(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_poll_finish(aio, NNG_ECLOSED);
+ }
+}
+
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pollq = arg;
+ nni_posix_pipedesc *pd, *nextpd;
+
+ nni_mtx_lock(&pollq->mtx);
+ for (;;) {
+ int rv;
+ int nfds;
+ struct pollfd *fds;
+
+ if (pollq->close) {
+ break;
+ }
+
+ if (pollq->newfds != NULL) {
+ // We have "grown" by the caller. Free up the old
+ // space, and start using the new.
+ nni_free(pollq->fds,
+ pollq->nfds * sizeof (struct pollfd));
+ pollq->fds = pollq->newfds;
+ pollq->nfds = pollq->nnewfds;
+ pollq->newfds = NULL;
+ }
+ fds = pollq->fds;
+ nfds = 0;
+
+ // The waker pipe is set up so that we will be woken
+ // when it is written (this allows us to be signaled).
+ fds[nfds].fd = pollq->wakerfd;
+ fds[nfds].events = POLLIN;
+ fds[nfds].revents = 0;
+ nfds++;
+
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pollq->pds, pd) {
+ fds[nfds].fd = pd->fd;
+ fds[nfds].events = 0;
+ fds[nfds].revents = 0;
+ if (nni_list_first(&pd->readq) != NULL) {
+ fds[nfds].events |= POLLIN;
+ }
+ if (nni_list_first(&pd->writeq) != NULL) {
+ fds[nfds].events |= POLLOUT;
+ }
+ pd->index = nfds;
+ nfds++;
+ }
+
+
+ // Now poll it. We block indefinitely, since we use separate
+ // timeouts to wake and remove the elements from the list.
+ nni_mtx_unlock(&pollq->mtx);
+ rv = poll(fds, nfds, -1);
+ nni_mtx_lock(&pollq->mtx);
+
+ if (rv < 0) {
+ // This shouldn't happen really. If it does, we
+ // just try again. (EINTR is probably the only
+ // reasonable failure here, unless internal memory
+ // allocations fail in the kernel, leading to EAGAIN.)
+ continue;
+ }
+
+
+ // If the waker pipe was signaled, read from it.
+ if (fds[0].revents & POLLIN) {
+ NNI_ASSERT(fds[0].fd == pollq->wakerfd);
+ nni_plat_pipe_clear(pollq->wakerfd);
+ }
+
+ // Now we iterate through all the pipedescs. Note that one
+ // may have been added or removed. New pipedescs will have
+ // their index set to -1. Removed ones will just be absent.
+ // Note that we may remove the pipedesc from the list, so we
+ // have to use a custom iterator.
+ nextpd = nni_list_first(&pollq->pds);
+ while ((pd = nextpd) != NULL) {
+ int index;
+
+ // Save the nextpd for our next iteration. This
+ // way we can remove the PD from the list without
+ // breaking the iteration.
+
+ nextpd = nni_list_next(&pollq->pds, pd);
+ if ((index = pd->index) < 1) {
+ continue;
+ }
+ pd->index = 0;
+ if (fds[index].revents & POLLIN) {
+ // process the read q.
+ nni_posix_poll_read(pd);
+ }
+ if (fds[index].revents & POLLOUT) {
+ // process the write q.
+ nni_posix_poll_write(pd);
+ }
+ if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ // the pipe is closed. wake all the
+ // aios with NNG_ECLOSED.
+ nni_posix_poll_close(pd);
+ }
+
+ // If we have completed all the AIOs outstanding,
+ // then remove this pipedesc from the pollq.
+ if ((nni_list_first(&pd->readq) == NULL) &&
+ (nni_list_first(&pd->writeq) == NULL)) {
+ nni_list_remove(&pollq->pds, pd);
+ }
+ }
+ }
+ nni_mtx_unlock(&pollq->mtx);
+}
+
+
+static void
+nni_posix_pipedesc_cancel(nni_aio *aio)
+{
+ nni_posix_pipedesc *pd;
+ nni_posix_pollq *pq;
+
+ pd = aio->a_prov_data;
+ pq = pd->pq;
+
+ nni_mtx_lock(&pq->mtx);
+ // This will remove the aio from either the read or the write
+ // list; it doesn't matter which.
+ if (nni_list_active(&pd->readq, aio)) {
+ nni_list_remove(&pd->readq, aio);
+ }
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+static int
+nni_posix_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->npds + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
+static void
+nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
+{
+ int wake;
+ int rv;
+ nni_posix_pollq *pq = pd->pq;
+
+ (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+
+ nni_mtx_lock(&pq->mtx);
+ if (!nni_list_active(&pq->pds, pd)) {
+ if ((rv = nni_posix_poll_grow(pq)) != 0) {
+ nni_aio_finish(aio, rv, aio->a_count);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ nni_list_append(&pq->pds, pd);
+ }
+ NNI_ASSERT(!nni_list_active(l, aio));
+ aio->a_prov_data = pd;
+ aio->a_prov_cancel = nni_posix_pipedesc_cancel;
+ // Only wake if we aren't already waiting for this type of I/O on
+ // this descriptor.
+ wake = nni_list_first(l) == NULL ? 1 : 0;
+ nni_list_append(l, aio);
+
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
+{
+ nni_posix_pipedesc *pd;
+
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the pd to a single pollq we may get some kind of cache warmth.
+
+ pd->pq = &nni_posix_global_pollq;
+ pd->fd = fd;
+ pd->index = 0;
+
+ NNI_LIST_INIT(&pd->readq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&pd->writeq, nni_aio, a_prov_node);
+
+ *pdp = pd;
+ return (0);
+}
+
+
+void
+nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+ nni_posix_pollq *pq = pd->pq;
+
+ nni_mtx_lock(&pq->mtx);
+
+ // This removes any aios from our list.
+ nni_posix_poll_close(pd);
+
+ if (nni_list_active(&pq->pds, pd)) {
+ nni_list_remove(&pq->pds, pd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ NNI_FREE_STRUCT(pd);
+}
+
+
+static void
+nni_posix_pollq_fini(nni_posix_pollq *pq)
+{
+ if (pq->started) {
+ nni_mtx_lock(&pq->mtx);
+ pq->close = 1;
+ pq->started = 0;
+ nni_plat_pipe_raise(pq->wakewfd);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_first(&pq->pds) == NULL);
+ nni_mtx_unlock(&pq->mtx);
+ }
+
+ nni_thr_fini(&pq->thr);
+ if (pq->wakewfd >= 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ pq->wakewfd = pq->wakerfd = -1;
+ }
+ nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd));
+ nni_free(pq->fds, pq->nfds * sizeof (struct pollfd));
+ nni_mtx_fini(&pq->mtx);
+}
+
+
+static int
+nni_posix_pollq_init(nni_posix_pollq *pq)
+{
+ int rv;
+
+ NNI_LIST_INIT(&pq->pds, nni_posix_pipedesc, node);
+ pq->wakewfd = -1;
+ pq->wakerfd = -1;
+ pq->close = 0;
+
+ if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
+ ((rv = nni_posix_poll_grow(pq)) != 0) ||
+ ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
+ ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
+ nni_posix_pollq_fini(pq);
+ return (rv);
+ }
+ pq->started = 1;
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+
+int
+nni_posix_pipedesc_sysinit(void)
+{
+ int rv;
+
+ rv = nni_posix_pollq_init(&nni_posix_global_pollq);
+ return (rv);
+}
+
+
+void
+nni_posix_pipedesc_sysfini(void)
+{
+ nni_posix_pollq_fini(&nni_posix_global_pollq);
+}
+
+
+// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
+// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
+
+int
+nni_posix_pipedesc_read(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ aio->a_count = 0;
+
+ nni_posix_pipedesc_submit(pd, &pd->readq, aio);
+ return (0);
+}
+
+
+int
+nni_posix_pipedesc_write(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ aio->a_count = 0;
+ nni_posix_pipedesc_submit(pd, &pd->writeq, aio);
+ return (0);
+}
+
+
+// extern int nni_posix_aio_connect();
+// extern int nni_posix_aio_accept();
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_poll_not_used = 0;
+
+#endif // NNG_USE_POSIX_AIOPOLL
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index de491c01..b052ed53 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -299,8 +299,18 @@ nni_plat_init(int (*helper)(void))
// as scalable / thrifty with our use of VM.
(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ if ((rv = nni_posix_pipedesc_sysinit()) != 0) {
+ pthread_mutex_unlock(&nni_plat_lock);
+ (void) close(nni_plat_devnull);
+ pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_condattr_destroy(&nni_cvattr);
+ pthread_attr_destroy(&nni_pthread_attr);
+ return (rv);
+ }
+
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
+ nni_posix_pipedesc_sysfini();
(void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
@@ -322,6 +332,7 @@ nni_plat_fini(void)
{
pthread_mutex_lock(&nni_plat_lock);
if (nni_plat_inited) {
+ nni_posix_pipedesc_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
pthread_attr_destroy(&nni_pthread_attr);