diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-27 20:06:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-27 20:06:42 -0700 |
| commit | e2d17be2a7081888aaafea150d587a7ef9517e17 (patch) | |
| tree | d438d8c0e401dd70d81a003846c53568f1730e3a /src/platform/posix | |
| parent | ac80ef7c3b1caa2f1fe3b093bef825363675bcb3 (diff) | |
| download | nng-e2d17be2a7081888aaafea150d587a7ef9517e17.tar.gz nng-e2d17be2a7081888aaafea150d587a7ef9517e17.tar.bz2 nng-e2d17be2a7081888aaafea150d587a7ef9517e17.zip | |
Convert to POSIX polled I/O for async; start of cancelable aio.
This eliminates the two threads per pipe that were being used to provide
basic I/O handling, replacing them with a single global thread for now,
that uses poll and nonblocking I/O. This should lead to great scalability.
The infrastructure is in place to easily expand to multiple polling worker
threads. Some thought needs to be given about how to scale this to engage
multiple CPUs. Horizontal scaling may also shorten the poll() lists easing
C10K problem.
We should look into better solutions than poll() for platforms that have
them (epoll on Linux, kqueue on BSD, and event ports on illumos).
Note that the file descriptors start out in blocking mode for now, but
then are placed into non-blocking mode. This is because the negotiation
phase is not yet callback driven, and so needs to be synchronous.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 44 | ||||
| -rw-r--r-- | src/platform/posix/posix_aiothr.c | 323 | ||||
| -rw-r--r-- | src/platform/posix/posix_config.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 14 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 15 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 575 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 11 |
8 files changed, 616 insertions, 377 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 797f9e43..9ab322a0 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -18,43 +18,13 @@ #include "core/nng_impl.h" -typedef struct nni_posix_aioq nni_posix_aioq; -typedef struct nni_posix_aiof nni_posix_aiof; -typedef struct nni_posix_aio_pipe nni_posix_aio_pipe; -typedef struct nni_posix_aio_ep nni_posix_aio_ep; -// Head structure representing file operations for read/write. We process -// the list of aios serially, and each file has its own thread for now. -struct nni_posix_aioq { - nni_list aq_aios; - int aq_fd; - nni_mtx aq_lk; - nni_cv aq_cv; -#ifdef NNG_USE_POSIX_AIOTHR - nni_thr aq_thr; -#endif -}; - -struct nni_posix_aio_pipe { - int ap_fd; - nni_posix_aioq ap_readq; - nni_posix_aioq ap_writeq; -}; - -struct nni_posix_aio_ep { - int ap_fd; - nni_posix_aioq ap_q; -}; - -extern int nni_posix_aio_pipe_init(nni_posix_aio_pipe *, int); -extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *); - -// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); -// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); -extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *); -extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *); - -// extern int nni_posix_aio_connect(); -// extern int nni_posix_aio_accept(); +typedef struct nni_posix_pipedesc nni_posix_pipedesc; +extern int nni_posix_pipedesc_sysinit(void); +extern void nni_posix_pipedesc_sysfini(void); +extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int); +extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *); +extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *); +extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *); #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c deleted file mode 100644 index a01fa194..00000000 --- a/src/platform/posix/posix_aiothr.c +++ /dev/null @@ -1,323 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" - -#ifdef NNG_USE_POSIX_AIOTHR - -#include <errno.h> -#include <stdlib.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/uio.h> -#include <fcntl.h> -#include <unistd.h> - -// POSIX AIO using threads. This allows us to use normal synchronous AIO, -// along with underlying threads, to simulate asynchronous I/O. This will be -// unscalable for systems where threads are a finite resource, but it should -// be sufficient for systems where threads are efficient, and cheap, or for -// applications that do not need excessive amounts of open files. It also -// serves as a model upon which we can build more scalable forms of asynch -// I/O, using non-blocking I/O and pollers. - - -// nni_plat_aiothr_write is used to attempt a write, sending -// as much as it can. On success, it returns 0, otherwise an errno. It will -// retry if EINTR is received. -static int -nni_plat_aiothr_write(int fd, nni_aio *aio) -{ - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - int niov = aio->a_niov; - int progress = 0; - - for (i = 0; i < niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; - } - iovp = &iovec[0]; - rv = 0; - - while (niov != 0) { - n = writev(fd, iovp, niov); - if (n < 0) { - if (errno == EINTR) { - continue; - } - rv = nni_plat_errno(errno); - break; - } - - aio->a_count += n; - progress += n; - while (n) { - // If we didn't finish it yet, try again. - if (n < iovp->iov_len) { - iovp->iov_len -= n; - iovp->iov_base += n; - break; - } - - n -= iovp->iov_len; - iovp++; - niov--; - } - } - - // Either we got it all, or we didn't. - if ((rv != 0) && (progress != 0)) { - for (i = 0; i < niov; i++) { - aio->a_iov[i].iov_len = iovp[i].iov_len; - aio->a_iov[i].iov_buf = iovp[i].iov_base; - } - aio->a_niov = niov; - } - - return (rv); -} - - -// nni_plat_aiothr_read is used to attempt a read, sending as much as it can -// (limited by the requested read). On success, it returns 0, otherwise an -// errno. It will retry if EINTR is received. -static int -nni_plat_aiothr_read(int fd, nni_aio *aio) -{ - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - int niov = aio->a_niov; - int progress = 0; - - for (i = 0; i < niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; - } - iovp = &iovec[0]; - rv = 0; - - while (niov != 0) { - n = readv(fd, iovp, niov); - if (n < 0) { - if (errno == EINTR) { - continue; - } - rv = nni_plat_errno(errno); - break; - } - if (n == 0) { - rv = NNG_ECLOSED; - break; - } - - aio->a_count += n; - progress += n; - while (n) { - // If we didn't finish it yet, try again. - if (n < iovp->iov_len) { - iovp->iov_len -= n; - iovp->iov_base += n; - break; - } - - n -= iovp->iov_len; - iovp++; - niov--; - } - } - - // Either we got it all, or we didn't. - if ((rv != 0) && (progress != 0)) { - for (i = 0; i < niov; i++) { - aio->a_iov[i].iov_len = iovp[i].iov_len; - aio->a_iov[i].iov_buf = iovp[i].iov_base; - } - aio->a_niov = niov; - } - - return (rv); -} - - -static void -nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *)) -{ - nni_aio *aio; - int rv; - - nni_mtx_lock(&q->aq_lk); - for (;;) { - if (q->aq_fd < 0) { - break; - } - if ((aio = nni_list_first(&q->aq_aios)) == NULL) { - nni_cv_wait(&q->aq_cv); - continue; - } - rv = fn(q->aq_fd, aio); - if (rv == NNG_EAGAIN) { - continue; - } - if (rv == NNG_ECLOSED) { - break; - } - - nni_list_remove(&q->aq_aios, aio); - - // Call the callback. - nni_aio_finish(aio, rv, aio->a_count); - } - - while ((aio = nni_list_first(&q->aq_aios)) != NULL) { - nni_list_remove(&q->aq_aios, aio); - nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); - } - - nni_mtx_unlock(&q->aq_lk); -} - - -static void -nni_plat_aiothr_readthr(void *arg) -{ - nni_plat_aiothr_dothr(arg, nni_plat_aiothr_read); -} - - -static void -nni_plat_aiothr_writethr(void *arg) -{ - nni_plat_aiothr_dothr(arg, nni_plat_aiothr_write); -} - - -static int -nni_posix_aioq_init(nni_posix_aioq *q, int fd, nni_cb cb) -{ - int rv; - - NNI_LIST_INIT(&q->aq_aios, nni_aio, a_prov_node); - if ((rv = nni_mtx_init(&q->aq_lk)) != 0) { - return (rv); - } - if ((rv = nni_cv_init(&q->aq_cv, &q->aq_lk)) != 0) { - nni_mtx_fini(&q->aq_lk); - return (rv); - } - if ((rv = nni_thr_init(&q->aq_thr, cb, q)) != 0) { - nni_cv_fini(&q->aq_cv); - nni_mtx_fini(&q->aq_lk); - return (rv); - } - q->aq_fd = fd; - return (0); -} - - -static void -nni_posix_aioq_start(nni_posix_aioq *q) -{ - nni_thr_run(&q->aq_thr); -} - - -static void -nni_posix_aioq_fini(nni_posix_aioq *q) -{ - if (q->aq_fd > 0) { - nni_mtx_lock(&q->aq_lk); - q->aq_fd = -1; - nni_cv_wake(&q->aq_cv); - nni_mtx_unlock(&q->aq_lk); - - nni_thr_fini(&q->aq_thr); - nni_cv_fini(&q->aq_cv); - nni_mtx_fini(&q->aq_lk); - } -} - - -int -nni_posix_aio_pipe_init(nni_posix_aio_pipe *p, int fd) -{ - int rv; - - rv = nni_posix_aioq_init(&p->ap_readq, fd, nni_plat_aiothr_readthr); - if (rv != 0) { - return (rv); - } - rv = nni_posix_aioq_init(&p->ap_writeq, fd, nni_plat_aiothr_writethr); - if (rv != 0) { - nni_posix_aioq_fini(&p->ap_readq); - return (rv); - } - nni_posix_aioq_start(&p->ap_readq); - nni_posix_aioq_start(&p->ap_writeq); - return (0); -} - - -void -nni_posix_aio_pipe_fini(nni_posix_aio_pipe *p) -{ - nni_posix_aioq_fini(&p->ap_readq); - nni_posix_aioq_fini(&p->ap_writeq); -} - - -// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); -// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); - -static int -nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio) -{ - nni_mtx_lock(&q->aq_lk); - if (q->aq_fd < 0) { - nni_mtx_unlock(&q->aq_lk); - return (NNG_ECLOSED); - } - nni_list_append(&q->aq_aios, aio); - nni_cv_wake(&q->aq_cv); - nni_mtx_unlock(&q->aq_lk); - return (0); -} - - -int -nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio) -{ - return (nni_posix_aio_submit(&p->ap_readq, aio)); -} - - -int -nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio) -{ - return (nni_posix_aio_submit(&p->ap_writeq, aio)); -} - - -// extern int nni_posix_aio_connect(); -// extern int nni_posix_aio_accept(); - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_aiothr_not_used = 0; - -#endif // NNG_USE_POSIX_AIOTHR diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h index 34ab2c6d..bf121243 100644 --- a/src/platform/posix/posix_config.h +++ b/src/platform/posix/posix_config.h @@ -53,9 +53,9 @@ #ifndef CLOCK_REALTIME #define NNG_USE_GETTIMEOFDAY #elif !defined(NNG_USE_CLOCKID) -#define NNG_USE_CLOCKID CLOCK_MONOTONIC +#define NNG_USE_CLOCKID CLOCK_MONOTONIC #else -#define NNG_USE_CLOCKID CLOCK_REALTIME +#define NNG_USE_CLOCKID CLOCK_REALTIME #endif // CLOCK_REALTIME -#define NNG_USE_POSIX_AIOTHR 1 +#define NNG_USE_POSIX_AIOPOLL 1 diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 1030b6de..9c8d00dc 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -34,7 +34,6 @@ extern int nni_plat_errno(int); #endif - // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD @@ -64,4 +63,8 @@ struct nni_plat_cv { #endif + +extern int nni_posix_pipedesc_sysinit(void); +extern void nni_posix_pipedesc_sysfini(void); + #endif // PLATFORM_POSIX_IMPL_H diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index e75edeca..ccf19fed 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -34,7 +34,7 @@ struct nni_plat_ipcsock { int fd; int devnull; // for shutting down accept() char * unlink; // path to unlink at fini - nni_posix_aio_pipe aiop; + nni_posix_pipedesc * pd; }; #ifdef SOCK_CLOEXEC @@ -69,14 +69,14 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path) int nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio) { - return (nni_posix_aio_write(&isp->aiop, aio)); + return (nni_posix_pipedesc_write(isp->pd, aio)); } int nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio) { - return (nni_posix_aio_read(&isp->aiop, aio)); + return (nni_posix_pipedesc_read(isp->pd, aio)); } @@ -225,7 +225,9 @@ nni_plat_ipc_fini(nni_plat_ipcsock *isp) nni_free(isp->unlink, strlen(isp->unlink) + 1); } - nni_posix_aio_pipe_fini(&isp->aiop); + if (isp->pd != NULL) { + nni_posix_pipedesc_fini(isp->pd); + } NNI_FREE_STRUCT(isp); } @@ -338,7 +340,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path) return (rv); } - if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) { (void) close(fd); return (rv); } @@ -380,7 +382,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server) nni_plat_ipc_setopts(fd); - if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) { (void) close(fd); return (rv); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 94dc2667..c8b7766e 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -34,7 +34,7 @@ struct nni_plat_tcpsock { int fd; int devnull; // for shutting down accept() - nni_posix_aio_pipe aiop; + nni_posix_pipedesc * pd; }; static int @@ -173,14 +173,14 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) int nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) { - return (nni_posix_aio_write(&s->aiop, aio)); + return (nni_posix_pipedesc_write(s->pd, aio)); } int nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) { - return (nni_posix_aio_read(&s->aiop, aio)); + return (nni_posix_pipedesc_read(s->pd, aio)); } @@ -282,7 +282,9 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp) (void) close(tsp->fd); tsp->fd = -1; } - nni_posix_aio_pipe_fini(&tsp->aiop); + if (tsp->pd != NULL) { + nni_posix_pipedesc_fini(tsp->pd); + } NNI_FREE_STRUCT(tsp); } @@ -387,7 +389,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr, (void) close(fd); return (rv); } - if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { (void) close(fd); return (rv); } @@ -421,11 +423,10 @@ nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server) nni_plat_tcp_setopts(fd); - if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { close(fd); return (rv); } - tsp->fd = fd; return (0); } diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c new file mode 100644 index 00000000..a6024db0 --- /dev/null +++ b/src/platform/posix/posix_poll.c @@ -0,0 +1,575 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" + +#ifdef NNG_USE_POSIX_AIOPOLL + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <unistd.h> +#include <poll.h> + + +// POSIX AIO using poll(). We use a single poll thread to perform +// I/O operations for the entire system. This isn't entirely scalable, +// and it might be a good idea to create a few threads and group the +// I/O operations into separate pollers to limit the amount of work each +// thread does, and to scale across multiple cores. For now we don't +// worry about it. + +typedef struct nni_posix_pollq nni_posix_pollq; + +// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open +// file descriptor for TCP socket, etc.) This contains the list of pending +// aios for that underlying socket, as well as the socket itself. +struct nni_posix_pipedesc { + int fd; + int index; + nni_list readq; + nni_list writeq; + nni_list_node node; + nni_posix_pollq * pq; +}; + +// nni_posix_pollq is a work structure used by the poller thread, that keeps +// track of all the underlying pipe handles and so forth being used by poll(). +struct nni_posix_pollq { + nni_mtx mtx; + struct pollfd * fds; + struct pollfd * newfds; + int nfds; + int nnewfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list pds; // linked list of nni_posix_pipedescs. + int npds; // number of pipe descriptors +}; + +static nni_posix_pollq nni_posix_global_pollq; + +static void +nni_posix_poll_finish(nni_aio *aio, int rv) +{ + nni_posix_pipedesc *pd; + + pd = aio->a_prov_data; + if (nni_list_active(&pd->readq, aio)) { + nni_list_remove(&pd->readq, aio); + } + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + nni_aio_finish(aio, rv, aio->a_count); +} + + +static void +nni_posix_poll_write(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = writev(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + nni_list_remove(&pd->writeq, aio); + + nni_posix_poll_finish(aio, rv); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_list_remove(&pd->writeq, aio); + nni_posix_poll_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_poll_read(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->readq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = readv(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + + nni_posix_poll_finish(aio, rv); + return; + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + nni_posix_poll_finish(aio, NNG_ECLOSED); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_posix_poll_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_poll_close(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_poll_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_poll_finish(aio, NNG_ECLOSED); + } +} + + +static void +nni_posix_poll_thr(void *arg) +{ + nni_posix_pollq *pollq = arg; + nni_posix_pipedesc *pd, *nextpd; + + nni_mtx_lock(&pollq->mtx); + for (;;) { + int rv; + int nfds; + struct pollfd *fds; + + if (pollq->close) { + break; + } + + if (pollq->newfds != NULL) { + // We have "grown" by the caller. Free up the old + // space, and start using the new. + nni_free(pollq->fds, + pollq->nfds * sizeof (struct pollfd)); + pollq->fds = pollq->newfds; + pollq->nfds = pollq->nnewfds; + pollq->newfds = NULL; + } + fds = pollq->fds; + nfds = 0; + + // The waker pipe is set up so that we will be woken + // when it is written (this allows us to be signaled). + fds[nfds].fd = pollq->wakerfd; + fds[nfds].events = POLLIN; + fds[nfds].revents = 0; + nfds++; + + // Set up the poll list. + NNI_LIST_FOREACH (&pollq->pds, pd) { + fds[nfds].fd = pd->fd; + fds[nfds].events = 0; + fds[nfds].revents = 0; + if (nni_list_first(&pd->readq) != NULL) { + fds[nfds].events |= POLLIN; + } + if (nni_list_first(&pd->writeq) != NULL) { + fds[nfds].events |= POLLOUT; + } + pd->index = nfds; + nfds++; + } + + + // Now poll it. We block indefinitely, since we use separate + // timeouts to wake and remove the elements from the list. + nni_mtx_unlock(&pollq->mtx); + rv = poll(fds, nfds, -1); + nni_mtx_lock(&pollq->mtx); + + if (rv < 0) { + // This shouldn't happen really. If it does, we + // just try again. (EINTR is probably the only + // reasonable failure here, unless internal memory + // allocations fail in the kernel, leading to EAGAIN.) + continue; + } + + + // If the waker pipe was signaled, read from it. + if (fds[0].revents & POLLIN) { + NNI_ASSERT(fds[0].fd == pollq->wakerfd); + nni_plat_pipe_clear(pollq->wakerfd); + } + + // Now we iterate through all the pipedescs. Note that one + // may have been added or removed. New pipedescs will have + // their index set to -1. Removed ones will just be absent. + // Note that we may remove the pipedesc from the list, so we + // have to use a custom iterator. + nextpd = nni_list_first(&pollq->pds); + while ((pd = nextpd) != NULL) { + int index; + + // Save the nextpd for our next iteration. This + // way we can remove the PD from the list without + // breaking the iteration. + + nextpd = nni_list_next(&pollq->pds, pd); + if ((index = pd->index) < 1) { + continue; + } + pd->index = 0; + if (fds[index].revents & POLLIN) { + // process the read q. + nni_posix_poll_read(pd); + } + if (fds[index].revents & POLLOUT) { + // process the write q. + nni_posix_poll_write(pd); + } + if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) { + // the pipe is closed. wake all the + // aios with NNG_ECLOSED. + nni_posix_poll_close(pd); + } + + // If we have completed all the AIOs outstanding, + // then remove this pipedesc from the pollq. + if ((nni_list_first(&pd->readq) == NULL) && + (nni_list_first(&pd->writeq) == NULL)) { + nni_list_remove(&pollq->pds, pd); + } + } + } + nni_mtx_unlock(&pollq->mtx); +} + + +static void +nni_posix_pipedesc_cancel(nni_aio *aio) +{ + nni_posix_pipedesc *pd; + nni_posix_pollq *pq; + + pd = aio->a_prov_data; + pq = pd->pq; + + nni_mtx_lock(&pq->mtx); + // This will remove the aio from either the read or the write + // list; it doesn't matter which. + if (nni_list_active(&pd->readq, aio)) { + nni_list_remove(&pd->readq, aio); + } + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; + nni_mtx_unlock(&pq->mtx); +} + + +static int +nni_posix_poll_grow(nni_posix_pollq *pq) +{ + int grow = pq->npds + 2; // one for us, one for waker + int noldfds; + struct pollfd *oldfds; + struct pollfd *newfds; + + if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + return (0); + } + + grow = grow + 128; + + // Maybe we are adding a *lot* of pipes at once, and have to grow + // multiple times before the poller gets scheduled. In that case + // toss the old array before we finish. + oldfds = pq->newfds; + noldfds = pq->nnewfds; + + if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + return (NNG_ENOMEM); + } + + + pq->newfds = newfds; + pq->nnewfds = grow; + + if (noldfds != 0) { + nni_free(oldfds, noldfds * sizeof (struct pollfd)); + } + return (0); +} + + +static void +nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) +{ + int wake; + int rv; + nni_posix_pollq *pq = pd->pq; + + (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); + + nni_mtx_lock(&pq->mtx); + if (!nni_list_active(&pq->pds, pd)) { + if ((rv = nni_posix_poll_grow(pq)) != 0) { + nni_aio_finish(aio, rv, aio->a_count); + nni_mtx_unlock(&pq->mtx); + return; + } + + nni_list_append(&pq->pds, pd); + } + NNI_ASSERT(!nni_list_active(l, aio)); + aio->a_prov_data = pd; + aio->a_prov_cancel = nni_posix_pipedesc_cancel; + // Only wake if we aren't already waiting for this type of I/O on + // this descriptor. + wake = nni_list_first(l) == NULL ? 1 : 0; + nni_list_append(l, aio); + + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) +{ + nni_posix_pipedesc *pd; + + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the pd to a single pollq we may get some kind of cache warmth. + + pd->pq = &nni_posix_global_pollq; + pd->fd = fd; + pd->index = 0; + + NNI_LIST_INIT(&pd->readq, nni_aio, a_prov_node); + NNI_LIST_INIT(&pd->writeq, nni_aio, a_prov_node); + + *pdp = pd; + return (0); +} + + +void +nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + nni_posix_pollq *pq = pd->pq; + + nni_mtx_lock(&pq->mtx); + + // This removes any aios from our list. + nni_posix_poll_close(pd); + + if (nni_list_active(&pq->pds, pd)) { + nni_list_remove(&pq->pds, pd); + } + nni_mtx_unlock(&pq->mtx); + + NNI_FREE_STRUCT(pd); +} + + +static void +nni_posix_pollq_fini(nni_posix_pollq *pq) +{ + if (pq->started) { + nni_mtx_lock(&pq->mtx); + pq->close = 1; + pq->started = 0; + nni_plat_pipe_raise(pq->wakewfd); + + // All pipes should have been closed before this is called. + NNI_ASSERT(nni_list_first(&pq->pds) == NULL); + nni_mtx_unlock(&pq->mtx); + } + + nni_thr_fini(&pq->thr); + if (pq->wakewfd >= 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + pq->wakewfd = pq->wakerfd = -1; + } + nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd)); + nni_free(pq->fds, pq->nfds * sizeof (struct pollfd)); + nni_mtx_fini(&pq->mtx); +} + + +static int +nni_posix_pollq_init(nni_posix_pollq *pq) +{ + int rv; + + NNI_LIST_INIT(&pq->pds, nni_posix_pipedesc, node); + pq->wakewfd = -1; + pq->wakerfd = -1; + pq->close = 0; + + if (((rv = nni_mtx_init(&pq->mtx)) != 0) || + ((rv = nni_posix_poll_grow(pq)) != 0) || + ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || + ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { + nni_posix_pollq_fini(pq); + return (rv); + } + pq->started = 1; + nni_thr_run(&pq->thr); + return (0); +} + + +int +nni_posix_pipedesc_sysinit(void) +{ + int rv; + + rv = nni_posix_pollq_init(&nni_posix_global_pollq); + return (rv); +} + + +void +nni_posix_pipedesc_sysfini(void) +{ + nni_posix_pollq_fini(&nni_posix_global_pollq); +} + + +// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); +// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); + +int +nni_posix_pipedesc_read(nni_posix_pipedesc *pd, nni_aio *aio) +{ + aio->a_count = 0; + + nni_posix_pipedesc_submit(pd, &pd->readq, aio); + return (0); +} + + +int +nni_posix_pipedesc_write(nni_posix_pipedesc *pd, nni_aio *aio) +{ + aio->a_count = 0; + nni_posix_pipedesc_submit(pd, &pd->writeq, aio); + return (0); +} + + +// extern int nni_posix_aio_connect(); +// extern int nni_posix_aio_accept(); + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_poll_not_used = 0; + +#endif // NNG_USE_POSIX_AIOPOLL diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index de491c01..b052ed53 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -299,8 +299,18 @@ nni_plat_init(int (*helper)(void)) // as scalable / thrifty with our use of VM. (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + if ((rv = nni_posix_pipedesc_sysinit()) != 0) { + pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); + pthread_mutexattr_destroy(&nni_mxattr); + pthread_condattr_destroy(&nni_cvattr); + pthread_attr_destroy(&nni_pthread_attr); + return (rv); + } + if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&nni_plat_lock); + nni_posix_pipedesc_sysfini(); (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); @@ -322,6 +332,7 @@ nni_plat_fini(void) { pthread_mutex_lock(&nni_plat_lock); if (nni_plat_inited) { + nni_posix_pipedesc_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); |
