diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-17 22:56:41 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-17 22:56:41 -0800 |
| commit | 6c949de1cb46182303a85864bad753c12142fa97 (patch) | |
| tree | 9b66786e0a22d4d38089d8c269d6e096ef0a8617 /src/platform | |
| parent | d83f5aea789f896c90208567a9e56599a439e90a (diff) | |
| download | nng-6c949de1cb46182303a85864bad753c12142fa97.tar.gz nng-6c949de1cb46182303a85864bad753c12142fa97.tar.bz2 nng-6c949de1cb46182303a85864bad753c12142fa97.zip | |
POSIX poller: add support for select, and for choosing the poller
Some platforms or configurations may not have more modern options
like kqueue or epoll, or may be constrained by policy.
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/CMakeLists.txt | 44 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcconn.c | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 9 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 10 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_select.c | 331 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockfd.c | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 16 |
8 files changed, 399 insertions, 23 deletions
diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index 6f3dfcaf..aeedddb9 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -60,6 +60,8 @@ if (NNG_PLATFORM_POSIX) nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE) nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL) nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1) + nng_check_sym(poll poll.h NNG_HAVE_POLL) + nng_check_sym(select sys/select.h NNG_HAVE_SELECT) nng_check_sym(getpeereid unistd.h NNG_HAVE_GETPEEREID) nng_check_sym(SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED) nng_check_struct_member(sockpeercred uid sys/socket.h NNG_HAVE_SOCKPEERCRED) @@ -101,14 +103,48 @@ if (NNG_PLATFORM_POSIX) posix_udp.c ) - if (NNG_HAVE_PORT_CREATE) - nng_sources(posix_pollq_port.c) + set(NNG_POLLQ_POLLER "auto" CACHE STRING "Poller used for multiplexing I/O") + set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select) + mark_as_advanced(NNG_POLLQ_POLLER) + if (NNG_POLLQ_POLLER STREQUAL "ports") + nng_defines(NNG_POLLQ_PORTS) + elseif (NNG_POLLQ_POLLER STREQUAL "kqueue") + nng_defines(NNG_POLLQ_KQUEUE) + elseif (NNG_POLLQ_POLLER STREQUAL "epoll") + nng_defines(NNG_POLLQ_EPOLL) + elseif (NNG_POLLQ_POLLER STREQUAL "poll") + nng_defines(NNG_POLLQ_POLL) + elseif (NNG_POLLQ_POLLER STREQUAL "select") + set(NNG_POLLQ_SELECT ON) + elseif (NNG_HAVE_PORT_CREATE) + set(NNG_POLLQ_PORTS ON) elseif (NNG_HAVE_KQUEUE) - nng_sources(posix_pollq_kqueue.c) + set(NNG_POLLQ_KQUEUE ON) elseif (NNG_HAVE_EPOLL AND NNG_HAVE_EVENTFD) + set(NNG_POLLQ_EPOLL ON) + elseif (NNG_HAVE_POLL) + set(NNG_POLLQ_POLL ON) + elseif (NNG_HAVE_SELECT) + set(NNG_POLLQ_SELECT TRUE) + endif() + + if (NNG_POLLQ_PORTS) + message(STATUS "Using port events for multiplexing I/O.") + nng_sources(posix_pollq_port.c) + elseif (NNG_POLLQ_KQUEUE) + message(STATUS "Using kqueue for multiplexing I/O.") + nng_sources(posix_pollq_kqueue.c) + elseif (NNG_POLLQ_EPOLL) + message(DEBUG "Using epoll for multiplexing I/O.") nng_sources(posix_pollq_epoll.c) - else () + elseif (NNG_POLLQ_POLL) + message(STATUS "Using poll for multiplexing I/O.") nng_sources(posix_pollq_poll.c) + elseif (NNG_POLLQ_SELECT) + message(STATUS "Using select for multiplexing I/O.") + nng_sources(posix_pollq_select.c) + else() + message(FATAL_ERROR "No suitable poller found for multiplexing I/O.") endif () if (NNG_HAVE_ARC4RANDOM) diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 8b11b64f..a198b87f 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -264,7 +264,7 @@ ipc_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -298,7 +298,7 @@ ipc_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 76105acc..8dc92fb1 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -32,11 +32,20 @@ extern int nni_posix_pfd_fd(nni_posix_pfd *); extern void nni_posix_pfd_close(nni_posix_pfd *); extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *); +#ifdef POLLIN #define NNI_POLL_IN ((unsigned) POLLIN) #define NNI_POLL_OUT ((unsigned) POLLOUT) #define NNI_POLL_HUP ((unsigned) POLLHUP) #define NNI_POLL_ERR ((unsigned) POLLERR) #define NNI_POLL_INVAL ((unsigned) POLLNVAL) +#else +// maybe using select +#define NNI_POLL_IN (0x0001) +#define NNI_POLL_OUT (0x0010) +#define NNI_POLL_HUP (0x0004) +#define NNI_POLL_ERR (0x0008) +#define NNI_POLL_INVAL (0x0020) +#endif // POLLIN #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index ae1b2f47..562c888e 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -198,10 +198,10 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) return (0); } - if (events & POLLIN) { + if (events & NNI_POLL_IN) { EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - if (events & POLLOUT) { + if (events & NNI_POLL_OUT) { EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { @@ -254,10 +254,10 @@ nni_posix_poll_thr(void *arg) switch (ev->filter) { case EVFILT_READ: - revents = POLLIN; + revents = NNI_POLL_IN; break; case EVFILT_WRITE: - revents = POLLOUT; + revents = NNI_POLL_OUT; break; } if (ev->udata == NULL) { @@ -267,7 +267,7 @@ nni_posix_poll_thr(void *arg) } pf = (void *) ev->udata; if (ev->flags & EV_ERROR) { - revents |= POLLHUP; + revents |= NNI_POLL_HUP; } nni_mtx_lock(&pf->mtx); diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c new file mode 100644 index 00000000..3213aa11 --- /dev/null +++ b/src/platform/posix/posix_pollq_select.c @@ -0,0 +1,331 @@ +// +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/defs.h" +#include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" + +#include <errno.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <sys/select.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +// POSIX AIO using select(). We use a single poll thread to perform +// I/O operations for the entire system. This is the worst form of +// I/O multiplexing, but short of using threads or spin-polling from +// a single thread, this is our only reasonable solution. +// +// Note that select() is not scalable, and we will be limited to a small +// number of open files/sockets. As such it is is not suitable for use +// on large servers. However, this may be enough for use in constrained +// systems that are not likely to have many open files anyway. +// + +typedef struct nni_posix_pollq nni_posix_pollq; + +struct nni_posix_pollq { + nni_mtx mtx; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + bool closing; // request for worker to exit + bool closed; + nni_thr thr; // worker thread + int maxfd; + struct nni_posix_pfd *pfds[FD_SETSIZE]; +}; + +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_cv cv; + nni_mtx mtx; + unsigned events; + nni_posix_pfd_cb cb; + void *arg; + bool reap; +}; + +static nni_posix_pollq nni_posix_global_pollq; + +int +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +{ + nni_posix_pfd *pfd; + nni_posix_pollq *pq = &nni_posix_global_pollq; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); + } + if (fd >= FD_SETSIZE) { + return (NNG_EINVAL); + } + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + pfd->fd = fd; + pfd->events = 0; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->pq = pq; + nni_mtx_lock(&pq->mtx); + if (pq->closing) { + nni_mtx_unlock(&pq->mtx); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); + return (NNG_ECLOSED); + } + pq->pfds[fd] = pfd; + if (fd > pq->maxfd) { + pq->maxfd = fd; + } + nni_mtx_unlock(&pq->mtx); + *pfdp = pfd; + return (0); +} + +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) +{ + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); +} + +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) +{ + return (pfd->fd); +} + +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + (void) shutdown(pfd->fd, SHUT_RDWR); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + int fd = pfd->fd; + + nni_posix_pfd_close(pfd); + + nni_mtx_lock(&pq->mtx); + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + pfd->reap = true; + nni_plat_pipe_raise(pq->wakewfd); + while (pfd->reap) { + nni_cv_wait(&pfd->cv); + } + } else { + pq->pfds[fd] = NULL; + } + nni_mtx_unlock(&pq->mtx); + + // We're exclusive now. + (void) close(fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} + +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) +{ + nni_posix_pollq *pq = pfd->pq; + + nni_mtx_lock(&pq->mtx); + pfd->events |= events; + nni_mtx_unlock(&pq->mtx); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if (!nni_thr_is_self(&pq->thr)) { + nni_plat_pipe_raise(pq->wakewfd); + } + return (0); +} + +static void +nni_posix_poll_thr(void *arg) +{ + nni_posix_pollq *pq = arg; + fd_set rfds; + fd_set wfds; + fd_set efds; + int maxfd; + + for (;;) { + unsigned events; + nni_posix_pfd *pfd; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + // The waker pipe is set up so that we will be woken + // when it is written (this allows us to be signaled). + FD_SET(pq->wakerfd, &rfds); + FD_SET(pq->wakerfd, &efds); + + nni_plat_pipe_clear(pq->wakerfd); + nni_mtx_lock(&pq->mtx); + + // If we're closing down, bail now. This is done *after* we + // have ensured that the reapq is empty. Anything still in + // the pollq is not going to receive further callbacks. + if (pq->closing) { + for (int fd = 0; fd <= pq->maxfd; fd++) { + if ((pfd = pq->pfds[fd]) != NULL) { + pq->pfds[fd] = NULL; + pfd->reap = false; + nni_cv_wake(&pfd->cv); + } + } + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + break; + } + + // Set up the poll list. + maxfd = pq->wakerfd; + for (int fd = 0; fd <= pq->maxfd; fd++) { + if ((pfd = pq->pfds[fd]) == NULL) { + continue; + } + NNI_ASSERT(pfd->fd == fd); + if (pfd->reap) { + pq->pfds[fd] = NULL; + pfd->reap = false; + nni_cv_wake(&pfd->cv); + continue; + } + events = pfd->events; + + if (events != 0) { + if (events & NNI_POLL_IN) { + FD_SET(fd, &rfds); + } + if (events & NNI_POLL_OUT) { + FD_SET(fd, &wfds); + } + FD_SET(fd, &efds); + if (maxfd < fd) { + maxfd = fd; + } + } + } + while (pq->maxfd > 0 && (pq->pfds[pq->maxfd] == NULL)) { + pq->maxfd--; + } + nni_mtx_unlock(&pq->mtx); + + // We could get the result from poll, and avoid iterating + // over the entire set of pollfds, but since on average we + // will be walking half the list, doubling the work we do + // (the condition with a potential pipeline stall) seems like + // adding complexity with no real benefit. It also makes the + // worst case even worse. + (void) select(maxfd + 1, &rfds, &wfds, &efds, NULL); + + nni_mtx_lock(&pq->mtx); + for (int fd = 0; fd <= maxfd; fd++) { + events = 0; + if (FD_ISSET(fd, &rfds)) { + events |= NNI_POLL_IN; + } + if (FD_ISSET(fd, &wfds)) { + events |= NNI_POLL_OUT; + } + if (FD_ISSET(fd, &efds)) { + events |= NNI_POLL_HUP; + } + if (events != 0) { + nni_posix_pfd_cb cb = NULL; + void *arg; + if ((pfd = pq->pfds[fd]) != NULL) { + cb = pfd->cb; + arg = pfd->arg; + pfd->events &= ~events; + } + + if (cb) { + nni_mtx_unlock(&pq->mtx); + cb(pfd, events, arg); + nni_mtx_lock(&pq->mtx); + } + } + } + nni_mtx_unlock(&pq->mtx); + } +} + +static void +nni_posix_pollq_destroy(nni_posix_pollq *pq) +{ + nni_mtx_lock(&pq->mtx); + pq->closing = true; + nni_mtx_unlock(&pq->mtx); + + nni_plat_pipe_raise(pq->wakewfd); + + close(pq->wakewfd); + nni_thr_fini(&pq->thr); + close(pq->wakerfd); + // nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + nni_mtx_fini(&pq->mtx); +} + +static int +nni_posix_pollq_create(nni_posix_pollq *pq) +{ + int rv; + + pq->closing = false; + pq->closed = false; + + if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { + return (rv); + } + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + return (rv); + } + nni_thr_set_name(&pq->thr, "nng:poll:select"); + nni_mtx_init(&pq->mtx); + nni_thr_run(&pq->thr); + return (0); +} + +int +nni_posix_pollq_sysinit(nng_init_params *params) +{ + NNI_ARG_UNUSED(params); + return (nni_posix_pollq_create(&nni_posix_global_pollq)); +} + +void +nni_posix_pollq_sysfini(void) +{ + nni_posix_pollq_destroy(&nni_posix_global_pollq); +} diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index 997feae6..8ccca66c 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -302,7 +302,7 @@ sfd_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -336,7 +336,7 @@ sfd_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index ce5243b0..d49fc838 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -310,7 +310,7 @@ tcp_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -344,7 +344,7 @@ tcp_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index a7eb7144..8d0d4a42 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -181,22 +181,21 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); - if (events & (unsigned) POLLIN) { + if (events & NNI_POLL_IN) { nni_posix_udp_dorecv(udp); } - if (events & (unsigned) POLLOUT) { + if (events & NNI_POLL_OUT) { nni_posix_udp_dosend(udp); } - if (events & - ((unsigned) POLLHUP | (unsigned) POLLERR | (unsigned) POLLNVAL)) { + if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) { nni_posix_udp_doclose(udp); } else { events = 0; if (!nni_list_empty(&udp->udp_sendq)) { - events |= (unsigned) POLLOUT; + events |= NNI_POLL_OUT; } if (!nni_list_empty(&udp->udp_recvq)) { - events |= (unsigned) POLLIN; + events |= NNI_POLL_IN; } if (events) { int rv; @@ -299,7 +298,7 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_recvq, aio); if (nni_list_first(&udp->udp_recvq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } @@ -322,7 +321,8 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_sendq, aio); if (nni_list_first(&udp->udp_sendq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) != + 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } |
