diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/aio.h | 1 | ||||
| -rw-r--r-- | src/core/timer.c | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_aio.h | 44 | ||||
| -rw-r--r-- | src/platform/posix/posix_aiothr.c | 323 | ||||
| -rw-r--r-- | src/platform/posix/posix_config.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 14 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 15 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 575 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 11 |
11 files changed, 619 insertions, 378 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ac45fd68..4c32d5ce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -82,12 +82,12 @@ set (NNG_SOURCES platform/posix/posix_aio.h platform/posix/posix_alloc.c - platform/posix/posix_aiothr.c platform/posix/posix_clock.c platform/posix/posix_debug.c platform/posix/posix_ipc.c platform/posix/posix_net.c platform/posix/posix_pipe.c + platform/posix/posix_poll.c platform/posix/posix_rand.c platform/posix/posix_thread.c diff --git a/src/core/aio.h b/src/core/aio.h index c377ca93..a5f78b3f 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -41,6 +41,7 @@ struct nni_aio { // TBD: Resolver operations. // Provider-use fields. + void (*a_prov_cancel)(nni_aio *); void * a_prov_data; nni_list_node a_prov_node; }; diff --git a/src/core/timer.c b/src/core/timer.c index 0224ce75..085a297a 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -14,6 +14,7 @@ static void nni_timer_loop(void *); +// XXX: replace this timer list with a minHeap based priority queue. struct nni_timer { nni_mtx t_mx; nni_cv t_cv; diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 797f9e43..9ab322a0 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -18,43 +18,13 @@ #include "core/nng_impl.h" -typedef struct nni_posix_aioq nni_posix_aioq; -typedef struct nni_posix_aiof nni_posix_aiof; -typedef struct nni_posix_aio_pipe nni_posix_aio_pipe; -typedef struct nni_posix_aio_ep nni_posix_aio_ep; -// Head structure representing file operations for read/write. We process -// the list of aios serially, and each file has its own thread for now. -struct nni_posix_aioq { - nni_list aq_aios; - int aq_fd; - nni_mtx aq_lk; - nni_cv aq_cv; -#ifdef NNG_USE_POSIX_AIOTHR - nni_thr aq_thr; -#endif -}; - -struct nni_posix_aio_pipe { - int ap_fd; - nni_posix_aioq ap_readq; - nni_posix_aioq ap_writeq; -}; - -struct nni_posix_aio_ep { - int ap_fd; - nni_posix_aioq ap_q; -}; - -extern int nni_posix_aio_pipe_init(nni_posix_aio_pipe *, int); -extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *); - -// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); -// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); -extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *); -extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *); - -// extern int nni_posix_aio_connect(); -// extern int nni_posix_aio_accept(); +typedef struct nni_posix_pipedesc nni_posix_pipedesc; +extern int nni_posix_pipedesc_sysinit(void); +extern void nni_posix_pipedesc_sysfini(void); +extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int); +extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *); +extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *); +extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *); #endif // PLATFORM_POSIX_AIO_H diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c deleted file mode 100644 index a01fa194..00000000 --- a/src/platform/posix/posix_aiothr.c +++ /dev/null @@ -1,323 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" - -#ifdef NNG_USE_POSIX_AIOTHR - -#include <errno.h> -#include <stdlib.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/uio.h> -#include <fcntl.h> -#include <unistd.h> - -// POSIX AIO using threads. This allows us to use normal synchronous AIO, -// along with underlying threads, to simulate asynchronous I/O. This will be -// unscalable for systems where threads are a finite resource, but it should -// be sufficient for systems where threads are efficient, and cheap, or for -// applications that do not need excessive amounts of open files. It also -// serves as a model upon which we can build more scalable forms of asynch -// I/O, using non-blocking I/O and pollers. - - -// nni_plat_aiothr_write is used to attempt a write, sending -// as much as it can. On success, it returns 0, otherwise an errno. It will -// retry if EINTR is received. -static int -nni_plat_aiothr_write(int fd, nni_aio *aio) -{ - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - int niov = aio->a_niov; - int progress = 0; - - for (i = 0; i < niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; - } - iovp = &iovec[0]; - rv = 0; - - while (niov != 0) { - n = writev(fd, iovp, niov); - if (n < 0) { - if (errno == EINTR) { - continue; - } - rv = nni_plat_errno(errno); - break; - } - - aio->a_count += n; - progress += n; - while (n) { - // If we didn't finish it yet, try again. - if (n < iovp->iov_len) { - iovp->iov_len -= n; - iovp->iov_base += n; - break; - } - - n -= iovp->iov_len; - iovp++; - niov--; - } - } - - // Either we got it all, or we didn't. - if ((rv != 0) && (progress != 0)) { - for (i = 0; i < niov; i++) { - aio->a_iov[i].iov_len = iovp[i].iov_len; - aio->a_iov[i].iov_buf = iovp[i].iov_base; - } - aio->a_niov = niov; - } - - return (rv); -} - - -// nni_plat_aiothr_read is used to attempt a read, sending as much as it can -// (limited by the requested read). On success, it returns 0, otherwise an -// errno. It will retry if EINTR is received. -static int -nni_plat_aiothr_read(int fd, nni_aio *aio) -{ - int n; - int rv; - int i; - struct iovec iovec[4]; - struct iovec *iovp; - int niov = aio->a_niov; - int progress = 0; - - for (i = 0; i < niov; i++) { - iovec[i].iov_len = aio->a_iov[i].iov_len; - iovec[i].iov_base = aio->a_iov[i].iov_buf; - } - iovp = &iovec[0]; - rv = 0; - - while (niov != 0) { - n = readv(fd, iovp, niov); - if (n < 0) { - if (errno == EINTR) { - continue; - } - rv = nni_plat_errno(errno); - break; - } - if (n == 0) { - rv = NNG_ECLOSED; - break; - } - - aio->a_count += n; - progress += n; - while (n) { - // If we didn't finish it yet, try again. - if (n < iovp->iov_len) { - iovp->iov_len -= n; - iovp->iov_base += n; - break; - } - - n -= iovp->iov_len; - iovp++; - niov--; - } - } - - // Either we got it all, or we didn't. - if ((rv != 0) && (progress != 0)) { - for (i = 0; i < niov; i++) { - aio->a_iov[i].iov_len = iovp[i].iov_len; - aio->a_iov[i].iov_buf = iovp[i].iov_base; - } - aio->a_niov = niov; - } - - return (rv); -} - - -static void -nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *)) -{ - nni_aio *aio; - int rv; - - nni_mtx_lock(&q->aq_lk); - for (;;) { - if (q->aq_fd < 0) { - break; - } - if ((aio = nni_list_first(&q->aq_aios)) == NULL) { - nni_cv_wait(&q->aq_cv); - continue; - } - rv = fn(q->aq_fd, aio); - if (rv == NNG_EAGAIN) { - continue; - } - if (rv == NNG_ECLOSED) { - break; - } - - nni_list_remove(&q->aq_aios, aio); - - // Call the callback. - nni_aio_finish(aio, rv, aio->a_count); - } - - while ((aio = nni_list_first(&q->aq_aios)) != NULL) { - nni_list_remove(&q->aq_aios, aio); - nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); - } - - nni_mtx_unlock(&q->aq_lk); -} - - -static void -nni_plat_aiothr_readthr(void *arg) -{ - nni_plat_aiothr_dothr(arg, nni_plat_aiothr_read); -} - - -static void -nni_plat_aiothr_writethr(void *arg) -{ - nni_plat_aiothr_dothr(arg, nni_plat_aiothr_write); -} - - -static int -nni_posix_aioq_init(nni_posix_aioq *q, int fd, nni_cb cb) -{ - int rv; - - NNI_LIST_INIT(&q->aq_aios, nni_aio, a_prov_node); - if ((rv = nni_mtx_init(&q->aq_lk)) != 0) { - return (rv); - } - if ((rv = nni_cv_init(&q->aq_cv, &q->aq_lk)) != 0) { - nni_mtx_fini(&q->aq_lk); - return (rv); - } - if ((rv = nni_thr_init(&q->aq_thr, cb, q)) != 0) { - nni_cv_fini(&q->aq_cv); - nni_mtx_fini(&q->aq_lk); - return (rv); - } - q->aq_fd = fd; - return (0); -} - - -static void -nni_posix_aioq_start(nni_posix_aioq *q) -{ - nni_thr_run(&q->aq_thr); -} - - -static void -nni_posix_aioq_fini(nni_posix_aioq *q) -{ - if (q->aq_fd > 0) { - nni_mtx_lock(&q->aq_lk); - q->aq_fd = -1; - nni_cv_wake(&q->aq_cv); - nni_mtx_unlock(&q->aq_lk); - - nni_thr_fini(&q->aq_thr); - nni_cv_fini(&q->aq_cv); - nni_mtx_fini(&q->aq_lk); - } -} - - -int -nni_posix_aio_pipe_init(nni_posix_aio_pipe *p, int fd) -{ - int rv; - - rv = nni_posix_aioq_init(&p->ap_readq, fd, nni_plat_aiothr_readthr); - if (rv != 0) { - return (rv); - } - rv = nni_posix_aioq_init(&p->ap_writeq, fd, nni_plat_aiothr_writethr); - if (rv != 0) { - nni_posix_aioq_fini(&p->ap_readq); - return (rv); - } - nni_posix_aioq_start(&p->ap_readq); - nni_posix_aioq_start(&p->ap_writeq); - return (0); -} - - -void -nni_posix_aio_pipe_fini(nni_posix_aio_pipe *p) -{ - nni_posix_aioq_fini(&p->ap_readq); - nni_posix_aioq_fini(&p->ap_writeq); -} - - -// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); -// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); - -static int -nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio) -{ - nni_mtx_lock(&q->aq_lk); - if (q->aq_fd < 0) { - nni_mtx_unlock(&q->aq_lk); - return (NNG_ECLOSED); - } - nni_list_append(&q->aq_aios, aio); - nni_cv_wake(&q->aq_cv); - nni_mtx_unlock(&q->aq_lk); - return (0); -} - - -int -nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio) -{ - return (nni_posix_aio_submit(&p->ap_readq, aio)); -} - - -int -nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio) -{ - return (nni_posix_aio_submit(&p->ap_writeq, aio)); -} - - -// extern int nni_posix_aio_connect(); -// extern int nni_posix_aio_accept(); - -#else - -// Suppress empty symbols warnings in ranlib. -int nni_posix_aiothr_not_used = 0; - -#endif // NNG_USE_POSIX_AIOTHR diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h index 34ab2c6d..bf121243 100644 --- a/src/platform/posix/posix_config.h +++ b/src/platform/posix/posix_config.h @@ -53,9 +53,9 @@ #ifndef CLOCK_REALTIME #define NNG_USE_GETTIMEOFDAY #elif !defined(NNG_USE_CLOCKID) -#define NNG_USE_CLOCKID CLOCK_MONOTONIC +#define NNG_USE_CLOCKID CLOCK_MONOTONIC #else -#define NNG_USE_CLOCKID CLOCK_REALTIME +#define NNG_USE_CLOCKID CLOCK_REALTIME #endif // CLOCK_REALTIME -#define NNG_USE_POSIX_AIOTHR 1 +#define NNG_USE_POSIX_AIOPOLL 1 diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 1030b6de..9c8d00dc 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -34,7 +34,6 @@ extern int nni_plat_errno(int); #endif - // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD @@ -64,4 +63,8 @@ struct nni_plat_cv { #endif + +extern int nni_posix_pipedesc_sysinit(void); +extern void nni_posix_pipedesc_sysfini(void); + #endif // PLATFORM_POSIX_IMPL_H diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index e75edeca..ccf19fed 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -34,7 +34,7 @@ struct nni_plat_ipcsock { int fd; int devnull; // for shutting down accept() char * unlink; // path to unlink at fini - nni_posix_aio_pipe aiop; + nni_posix_pipedesc * pd; }; #ifdef SOCK_CLOEXEC @@ -69,14 +69,14 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path) int nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio) { - return (nni_posix_aio_write(&isp->aiop, aio)); + return (nni_posix_pipedesc_write(isp->pd, aio)); } int nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio) { - return (nni_posix_aio_read(&isp->aiop, aio)); + return (nni_posix_pipedesc_read(isp->pd, aio)); } @@ -225,7 +225,9 @@ nni_plat_ipc_fini(nni_plat_ipcsock *isp) nni_free(isp->unlink, strlen(isp->unlink) + 1); } - nni_posix_aio_pipe_fini(&isp->aiop); + if (isp->pd != NULL) { + nni_posix_pipedesc_fini(isp->pd); + } NNI_FREE_STRUCT(isp); } @@ -338,7 +340,7 @@ nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path) return (rv); } - if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) { (void) close(fd); return (rv); } @@ -380,7 +382,7 @@ nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server) nni_plat_ipc_setopts(fd); - if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&isp->pd, fd)) != 0) { (void) close(fd); return (rv); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 94dc2667..c8b7766e 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -34,7 +34,7 @@ struct nni_plat_tcpsock { int fd; int devnull; // for shutting down accept() - nni_posix_aio_pipe aiop; + nni_posix_pipedesc * pd; }; static int @@ -173,14 +173,14 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) int nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) { - return (nni_posix_aio_write(&s->aiop, aio)); + return (nni_posix_pipedesc_write(s->pd, aio)); } int nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) { - return (nni_posix_aio_read(&s->aiop, aio)); + return (nni_posix_pipedesc_read(s->pd, aio)); } @@ -282,7 +282,9 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp) (void) close(tsp->fd); tsp->fd = -1; } - nni_posix_aio_pipe_fini(&tsp->aiop); + if (tsp->pd != NULL) { + nni_posix_pipedesc_fini(tsp->pd); + } NNI_FREE_STRUCT(tsp); } @@ -387,7 +389,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr, (void) close(fd); return (rv); } - if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { (void) close(fd); return (rv); } @@ -421,11 +423,10 @@ nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server) nni_plat_tcp_setopts(fd); - if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + if ((rv = nni_posix_pipedesc_init(&tsp->pd, fd)) != 0) { close(fd); return (rv); } - tsp->fd = fd; return (0); } diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c new file mode 100644 index 00000000..a6024db0 --- /dev/null +++ b/src/platform/posix/posix_poll.c @@ -0,0 +1,575 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" + +#ifdef NNG_USE_POSIX_AIOPOLL + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <unistd.h> +#include <poll.h> + + +// POSIX AIO using poll(). We use a single poll thread to perform +// I/O operations for the entire system. This isn't entirely scalable, +// and it might be a good idea to create a few threads and group the +// I/O operations into separate pollers to limit the amount of work each +// thread does, and to scale across multiple cores. For now we don't +// worry about it. + +typedef struct nni_posix_pollq nni_posix_pollq; + +// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open +// file descriptor for TCP socket, etc.) This contains the list of pending +// aios for that underlying socket, as well as the socket itself. +struct nni_posix_pipedesc { + int fd; + int index; + nni_list readq; + nni_list writeq; + nni_list_node node; + nni_posix_pollq * pq; +}; + +// nni_posix_pollq is a work structure used by the poller thread, that keeps +// track of all the underlying pipe handles and so forth being used by poll(). +struct nni_posix_pollq { + nni_mtx mtx; + struct pollfd * fds; + struct pollfd * newfds; + int nfds; + int nnewfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list pds; // linked list of nni_posix_pipedescs. + int npds; // number of pipe descriptors +}; + +static nni_posix_pollq nni_posix_global_pollq; + +static void +nni_posix_poll_finish(nni_aio *aio, int rv) +{ + nni_posix_pipedesc *pd; + + pd = aio->a_prov_data; + if (nni_list_active(&pd->readq, aio)) { + nni_list_remove(&pd->readq, aio); + } + aio->a_prov_data = NULL; + aio->a_prov_cancel = NULL; + nni_aio_finish(aio, rv, aio->a_count); +} + + +static void +nni_posix_poll_write(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = writev(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + nni_list_remove(&pd->writeq, aio); + + nni_posix_poll_finish(aio, rv); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_list_remove(&pd->writeq, aio); + nni_posix_poll_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_poll_read(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->readq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = readv(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + + nni_posix_poll_finish(aio, rv); + return; + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + nni_posix_poll_finish(aio, NNG_ECLOSED); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_posix_poll_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_poll_close(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_poll_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_poll_finish(aio, NNG_ECLOSED); + } +} + + +static void +nni_posix_poll_thr(void *arg) +{ + nni_posix_pollq *pollq = arg; + nni_posix_pipedesc *pd, *nextpd; + + nni_mtx_lock(&pollq->mtx); + for (;;) { + int rv; + int nfds; + struct pollfd *fds; + + if (pollq->close) { + break; + } + + if (pollq->newfds != NULL) { + // We have "grown" by the caller. Free up the old + // space, and start using the new. + nni_free(pollq->fds, + pollq->nfds * sizeof (struct pollfd)); + pollq->fds = pollq->newfds; + pollq->nfds = pollq->nnewfds; + pollq->newfds = NULL; + } + fds = pollq->fds; + nfds = 0; + + // The waker pipe is set up so that we will be woken + // when it is written (this allows us to be signaled). + fds[nfds].fd = pollq->wakerfd; + fds[nfds].events = POLLIN; + fds[nfds].revents = 0; + nfds++; + + // Set up the poll list. + NNI_LIST_FOREACH (&pollq->pds, pd) { + fds[nfds].fd = pd->fd; + fds[nfds].events = 0; + fds[nfds].revents = 0; + if (nni_list_first(&pd->readq) != NULL) { + fds[nfds].events |= POLLIN; + } + if (nni_list_first(&pd->writeq) != NULL) { + fds[nfds].events |= POLLOUT; + } + pd->index = nfds; + nfds++; + } + + + // Now poll it. We block indefinitely, since we use separate + // timeouts to wake and remove the elements from the list. + nni_mtx_unlock(&pollq->mtx); + rv = poll(fds, nfds, -1); + nni_mtx_lock(&pollq->mtx); + + if (rv < 0) { + // This shouldn't happen really. If it does, we + // just try again. (EINTR is probably the only + // reasonable failure here, unless internal memory + // allocations fail in the kernel, leading to EAGAIN.) + continue; + } + + + // If the waker pipe was signaled, read from it. + if (fds[0].revents & POLLIN) { + NNI_ASSERT(fds[0].fd == pollq->wakerfd); + nni_plat_pipe_clear(pollq->wakerfd); + } + + // Now we iterate through all the pipedescs. Note that one + // may have been added or removed. New pipedescs will have + // their index set to -1. Removed ones will just be absent. + // Note that we may remove the pipedesc from the list, so we + // have to use a custom iterator. + nextpd = nni_list_first(&pollq->pds); + while ((pd = nextpd) != NULL) { + int index; + + // Save the nextpd for our next iteration. This + // way we can remove the PD from the list without + // breaking the iteration. + + nextpd = nni_list_next(&pollq->pds, pd); + if ((index = pd->index) < 1) { + continue; + } + pd->index = 0; + if (fds[index].revents & POLLIN) { + // process the read q. + nni_posix_poll_read(pd); + } + if (fds[index].revents & POLLOUT) { + // process the write q. + nni_posix_poll_write(pd); + } + if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) { + // the pipe is closed. wake all the + // aios with NNG_ECLOSED. + nni_posix_poll_close(pd); + } + + // If we have completed all the AIOs outstanding, + // then remove this pipedesc from the pollq. + if ((nni_list_first(&pd->readq) == NULL) && + (nni_list_first(&pd->writeq) == NULL)) { + nni_list_remove(&pollq->pds, pd); + } + } + } + nni_mtx_unlock(&pollq->mtx); +} + + +static void +nni_posix_pipedesc_cancel(nni_aio *aio) +{ + nni_posix_pipedesc *pd; + nni_posix_pollq *pq; + + pd = aio->a_prov_data; + pq = pd->pq; + + nni_mtx_lock(&pq->mtx); + // This will remove the aio from either the read or the write + // list; it doesn't matter which. + if (nni_list_active(&pd->readq, aio)) { + nni_list_remove(&pd->readq, aio); + } + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; + nni_mtx_unlock(&pq->mtx); +} + + +static int +nni_posix_poll_grow(nni_posix_pollq *pq) +{ + int grow = pq->npds + 2; // one for us, one for waker + int noldfds; + struct pollfd *oldfds; + struct pollfd *newfds; + + if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + return (0); + } + + grow = grow + 128; + + // Maybe we are adding a *lot* of pipes at once, and have to grow + // multiple times before the poller gets scheduled. In that case + // toss the old array before we finish. + oldfds = pq->newfds; + noldfds = pq->nnewfds; + + if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + return (NNG_ENOMEM); + } + + + pq->newfds = newfds; + pq->nnewfds = grow; + + if (noldfds != 0) { + nni_free(oldfds, noldfds * sizeof (struct pollfd)); + } + return (0); +} + + +static void +nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) +{ + int wake; + int rv; + nni_posix_pollq *pq = pd->pq; + + (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); + + nni_mtx_lock(&pq->mtx); + if (!nni_list_active(&pq->pds, pd)) { + if ((rv = nni_posix_poll_grow(pq)) != 0) { + nni_aio_finish(aio, rv, aio->a_count); + nni_mtx_unlock(&pq->mtx); + return; + } + + nni_list_append(&pq->pds, pd); + } + NNI_ASSERT(!nni_list_active(l, aio)); + aio->a_prov_data = pd; + aio->a_prov_cancel = nni_posix_pipedesc_cancel; + // Only wake if we aren't already waiting for this type of I/O on + // this descriptor. + wake = nni_list_first(l) == NULL ? 1 : 0; + nni_list_append(l, aio); + + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) +{ + nni_posix_pipedesc *pd; + + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the pd to a single pollq we may get some kind of cache warmth. + + pd->pq = &nni_posix_global_pollq; + pd->fd = fd; + pd->index = 0; + + NNI_LIST_INIT(&pd->readq, nni_aio, a_prov_node); + NNI_LIST_INIT(&pd->writeq, nni_aio, a_prov_node); + + *pdp = pd; + return (0); +} + + +void +nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + nni_posix_pollq *pq = pd->pq; + + nni_mtx_lock(&pq->mtx); + + // This removes any aios from our list. + nni_posix_poll_close(pd); + + if (nni_list_active(&pq->pds, pd)) { + nni_list_remove(&pq->pds, pd); + } + nni_mtx_unlock(&pq->mtx); + + NNI_FREE_STRUCT(pd); +} + + +static void +nni_posix_pollq_fini(nni_posix_pollq *pq) +{ + if (pq->started) { + nni_mtx_lock(&pq->mtx); + pq->close = 1; + pq->started = 0; + nni_plat_pipe_raise(pq->wakewfd); + + // All pipes should have been closed before this is called. + NNI_ASSERT(nni_list_first(&pq->pds) == NULL); + nni_mtx_unlock(&pq->mtx); + } + + nni_thr_fini(&pq->thr); + if (pq->wakewfd >= 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + pq->wakewfd = pq->wakerfd = -1; + } + nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd)); + nni_free(pq->fds, pq->nfds * sizeof (struct pollfd)); + nni_mtx_fini(&pq->mtx); +} + + +static int +nni_posix_pollq_init(nni_posix_pollq *pq) +{ + int rv; + + NNI_LIST_INIT(&pq->pds, nni_posix_pipedesc, node); + pq->wakewfd = -1; + pq->wakerfd = -1; + pq->close = 0; + + if (((rv = nni_mtx_init(&pq->mtx)) != 0) || + ((rv = nni_posix_poll_grow(pq)) != 0) || + ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || + ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { + nni_posix_pollq_fini(pq); + return (rv); + } + pq->started = 1; + nni_thr_run(&pq->thr); + return (0); +} + + +int +nni_posix_pipedesc_sysinit(void) +{ + int rv; + + rv = nni_posix_pollq_init(&nni_posix_global_pollq); + return (rv); +} + + +void +nni_posix_pipedesc_sysfini(void) +{ + nni_posix_pollq_fini(&nni_posix_global_pollq); +} + + +// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); +// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); + +int +nni_posix_pipedesc_read(nni_posix_pipedesc *pd, nni_aio *aio) +{ + aio->a_count = 0; + + nni_posix_pipedesc_submit(pd, &pd->readq, aio); + return (0); +} + + +int +nni_posix_pipedesc_write(nni_posix_pipedesc *pd, nni_aio *aio) +{ + aio->a_count = 0; + nni_posix_pipedesc_submit(pd, &pd->writeq, aio); + return (0); +} + + +// extern int nni_posix_aio_connect(); +// extern int nni_posix_aio_accept(); + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_poll_not_used = 0; + +#endif // NNG_USE_POSIX_AIOPOLL diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index de491c01..b052ed53 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -299,8 +299,18 @@ nni_plat_init(int (*helper)(void)) // as scalable / thrifty with our use of VM. (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + if ((rv = nni_posix_pipedesc_sysinit()) != 0) { + pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); + pthread_mutexattr_destroy(&nni_mxattr); + pthread_condattr_destroy(&nni_cvattr); + pthread_attr_destroy(&nni_pthread_attr); + return (rv); + } + if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&nni_plat_lock); + nni_posix_pipedesc_sysfini(); (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); @@ -322,6 +332,7 @@ nni_plat_fini(void) { pthread_mutex_lock(&nni_plat_lock); if (nni_plat_inited) { + nni_posix_pipedesc_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); |
