diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-04 16:15:02 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-04 16:15:02 -0700 |
| commit | 16a43040ef29f77375d226f669770e64a42d278c (patch) | |
| tree | 1fd80d0d761fbdf812c257817a3b23ae1dc6519c | |
| parent | 58c5fbb731f50a952864bc500a8efd3b7077ee65 (diff) | |
| download | nng-16a43040ef29f77375d226f669770e64a42d278c.tar.gz nng-16a43040ef29f77375d226f669770e64a42d278c.tar.bz2 nng-16a43040ef29f77375d226f669770e64a42d278c.zip | |
Separate out poller/pollq from basic socket operations.
| -rw-r--r-- | src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_aio.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_config.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 347 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 382 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 43 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 367 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 6 |
9 files changed, 1153 insertions, 8 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cfe66dd5..4a8aa8ef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -80,15 +80,19 @@ set (NNG_SOURCES platform/posix/posix_impl.h platform/posix/posix_config.h platform/posix/posix_aio.h + platform/posix/posix_pollq.h platform/posix/posix_socket.h platform/posix/posix_alloc.c platform/posix/posix_clock.c platform/posix/posix_debug.c + platform/posix/posix_epdesc.c platform/posix/posix_ipc.c platform/posix/posix_net.c platform/posix/posix_pipe.c + platform/posix/posix_pipedesc.c platform/posix/posix_poll.c + platform/posix/posix_pollq_poll.c platform/posix/posix_rand.c platform/posix/posix_socket.c platform/posix/posix_thread.c diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 01d042f8..186f586f 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -18,12 +18,12 @@ #include "core/nng_impl.h" +typedef struct nni_posix_pollq nni_posix_pollq; + typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; -extern int nni_posix_pipedesc_sysinit(void); -extern void nni_posix_pipedesc_sysfini(void); extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int); extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *); extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h index bf121243..7fc2f5d5 100644 --- a/src/platform/posix/posix_config.h +++ b/src/platform/posix/posix_config.h @@ -58,4 +58,4 @@ #define NNG_USE_CLOCKID CLOCK_REALTIME #endif // CLOCK_REALTIME -#define NNG_USE_POSIX_AIOPOLL 1 +#define NNG_USE_POSIX_POLLQ_POLL 1 diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c new file mode 100644 index 00000000..9d7cf538 --- /dev/null +++ b/src/platform/posix/posix_epdesc.c @@ -0,0 +1,347 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" + +#ifdef PLATFORM_POSIX_EPDESC + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <unistd.h> +#include <poll.h> + + +struct nni_posix_epdesc { + int fd; + int index; + nni_list connectq; + nni_list acceptq; + nni_posix_pollq_node node; + nni_posix_pollq * pq; + struct sockaddr_storage locaddr; + struct sockaddr_storage remaddr; + socklen_t loclen; + socklen_t remlen; + nni_mtx mtx; +}; + + +#if 0 +static void +nni_posix_epdesc_cancel(nni_aio *aio) +{ + nni_posix_epdesc *ed; + nni_posix_pollq *pq; + + ed = aio->a_prov_data; + pq = ed->pq; + + nni_mtx_lock(&pq->mtx); + nni_list_node_remove(&aio->a_prov_node); + nni_mtx_unlock(&pq->mtx); +} + + +static void +nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) +{ + nni_posix_epdesc *ed; + nni_posix_pipedesc *pd; + + ed = aio->a_prov_data; + + // acceptq or connectq. + if (nni_list_active(&ed->connectq, aio)) { + nni_list_remove(&ed->connectq, aio); + } + + if (rv == 0) { + rv = nni_posix_pipedesc_init(&pd, newfd); + if (rv != 0) { + (void) close(newfd); + } else { + aio->a_pipe = pipe; + } + } + // Abuse the count to hold our new fd. This is only for accept. + nni_aio_finish(aio, rv, 0); +} + + +static void +nni_posix_poll_connect(nni_posix_epdesc *ed) +{ + nni_aio *aio; + socklen_t sz; + int rv; + + // Note that normally there will only be a single connect AIO... + // A socket that is here will have *initiated* with a connect() + // call, which returned EINPROGRESS. When the connection attempt + // is done, either way, the descriptor will be noted as writable. + // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual + // status of the connection attempt... + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + rv = -1; + sz = sizeof (rv); + if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + switch (rv) { + case 0: + // Success! + nni_posix_epdesc_finish(aio, 0, ed->fd); + continue; + + case EINPROGRESS: + // Still in progress... keep trying + return; + + default: + nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); + continue; + } + } +} + + +static void +nni_posix_poll_accept(nni_posix_epdesc *ed) +{ + nni_aio *aio; + int newfd; + struct sockaddr_storage ss; + socklen_t slen; + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + // We could argue that knowing the remote peer address would + // be nice. But frankly if someone wants it, they can just + // do getpeername(). + +#ifdef NNG_USE_ACCEPT4 + newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC); + if ((newfd < 0) && + ((errno == ENOSYS) || (errno == ENOTSUP))) { + newfd = accept(ed->fd, NULL, NULL); + } +#else + newfd = accept(ed->fd, NULL, NULL); +#endif + + if (newfd >= 0) { + // successful connection request! + nni_posix_epdesc_finish(aio, 0, newfd); + continue; + } + + if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { + // Well, let's try later. Note that EWOULDBLOCK + // is required by standards, but some platforms may + // use EAGAIN. The values may be the same, so we + // can't use switch. + return; + } + + if (errno == ECONNABORTED) { + // Let's just eat this one. Perhaps it may be + // better to report it to the application, but we + // think most applications don't want to see this. + // Only someone with a packet trace is going to + // notice this. + continue; + } + + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + } +} + + +static void +nni_posix_poll_epclose(nni_posix_epdesc *ed) +{ + nni_aio *aio; + + while ((aio = nni_list_first(&ed->acceptq)) != NULL) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + } + while ((aio = nni_list_first(&ed->connectq)) != NULL) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + } +} + + +static int +nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed) +{ + int rv; + + // Add epdesc to the pollq if it isn't already there. + if (!nni_list_active(&pq->eds, ed)) { + if ((rv = nni_posix_poll_grow(pq)) != 0) { + return (rv); + } + nni_list_append(&pq->eds, ed); + pq->neds++; + } + return (0); +} + + +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + nni_posix_pollq *pq = ed->pq; + int wake; + + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + if (ed->fd < 0) { + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { + // Immediate connect, cool! This probably only happens on + // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + if (errno != EINPROGRESS) { + // Some immediate failure occurred. + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&pq->mtx); + return; + } + + NNI_ASSERT(!nni_list_active(&ed->connectq, aio)); + wake = nni_list_empty(&ed->connectq); + nni_aio_list_append(&ed->connectq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +void +nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + int wake; + nni_posix_pollq *pq = ed->pq; + + // Accept is simpler than the connect case. With accept we just + // need to wait for the socket to be readable to indicate an incoming + // connection is ready for us. There isn't anything else for us to + // do really, as that will have been done in listen. + nni_mtx_lock(&pq->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&pq->mtx); + return; + } + + if (ed->fd < 0) { + nni_mtx_unlock(&pq->mtx); + nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + return; + } + + // We have to submit to the pollq, because the connection is pending. + if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_lock(&pq->mtx); + } + NNI_ASSERT(!nni_list_active(&ed->acceptq, aio)); + wake = nni_list_empty(&ed->acceptq); + nni_aio_list_append(&ed->acceptq, aio); + if (wake) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); +} + + +#endif + +int +nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) +{ + nni_posix_epdesc *ed; + int rv; + + if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { + return (NNG_ENOMEM); + } + + if ((rv = nni_mtx_init(&ed->mtx)) != 0) { + NNI_FREE_STRUCT(ed); + return (rv); + } + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the ed to a single pollq we may get some kind of cache warmth. + + ed->pq = nni_posix_pollq_get(fd); + ed->fd = fd; + ed->node.index = 0; + ed->node.cb = NULL; // XXXX: + ed->node.data = ed; + + // Ensure we are in non-blocking mode. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + nni_aio_list_init(&ed->connectq); + nni_aio_list_init(&ed->acceptq); + + *edp = ed; + return (0); +} + + +void +nni_posix_epdesc_fini(nni_posix_epdesc *ed) +{ + // XXX: MORE WORK HERE. + nni_mtx_fini(&ed->mtx); + NNI_FREE_STRUCT(ed); +} + + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_epdesc_not_used = 0; + +#endif // PLATFORM_POSIX_EPDESC diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 83b4914e..3a2e29e1 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -26,6 +26,8 @@ #define PLATFORM_POSIX_RANDOM #define PLATFORM_POSIX_SOCKET #define PLATFORM_POSIX_THREAD +#define PLATFORM_POSIX_PIPEDESC +#define PLATFORM_POSIX_EPDESC #include "platform/posix/posix_config.h" #endif @@ -65,7 +67,7 @@ struct nni_plat_cv { #endif -extern int nni_posix_pipedesc_sysinit(void); -extern void nni_posix_pipedesc_sysfini(void); +extern int nni_posix_pollq_sysinit(void); +extern void nni_posix_pollq_sysfini(void); #endif // PLATFORM_POSIX_IMPL_H diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c new file mode 100644 index 00000000..14a5c8ab --- /dev/null +++ b/src/platform/posix/posix_pipedesc.c @@ -0,0 +1,382 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" +#include "platform/posix/posix_pollq.h" + +#ifdef PLATFORM_POSIX_PIPEDESC + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <unistd.h> +#include <poll.h> + + +// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open +// file descriptor for TCP socket, etc.) This contains the list of pending +// aios for that underlying socket, as well as the socket itself. +struct nni_posix_pipedesc { + nni_posix_pollq * pq; + int fd; + nni_list readq; + nni_list writeq; + nni_posix_pollq_node node; + nni_mtx mtx; +}; + + +static void +nni_posix_pipedesc_finish(nni_aio *aio, int rv) +{ + nni_aio_list_remove(aio); + nni_aio_finish(aio, rv, aio->a_count); +} + + +static void +nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = writev(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + + nni_posix_pipedesc_finish(aio, rv); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_list_remove(&pd->writeq, aio); + nni_posix_pipedesc_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) +{ + int n; + int rv; + int i; + struct iovec iovec[4]; + struct iovec *iovp; + nni_aio *aio; + + while ((aio = nni_list_first(&pd->readq)) != NULL) { + for (i = 0; i < aio->a_niov; i++) { + iovec[i].iov_len = aio->a_iov[i].iov_len; + iovec[i].iov_base = aio->a_iov[i].iov_buf; + } + iovp = &iovec[0]; + rv = 0; + + n = readv(pd->fd, iovp, aio->a_niov); + if (n < 0) { + if ((errno == EAGAIN) || (errno == EINTR)) { + // Can't write more right now. We're done + // on this fd for now. + return; + } + rv = nni_plat_errno(errno); + + nni_posix_pipedesc_finish(aio, rv); + return; + } + + if (n == 0) { + // No bytes indicates a closed descriptor. + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + return; + } + + aio->a_count += n; + + while (n > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and return to caller. + if (n < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_buf += n; + aio->a_iov[0].iov_len -= n; + return; + } + + // We consumed the full iovec, so just move the + // remaininng ones up, and decrement count handled. + n -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i-1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + // We completed the entire operation on this aioq. + nni_posix_pipedesc_finish(aio, 0); + + // Go back to start of loop to see if there is another + // aioq ready for us to process. + } +} + + +static void +nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + + if (pd->fd != -1) { + // Let any peer know we are closing. + (void) shutdown(pd->fd, SHUT_RDWR); + } + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } +} + + +static void +nni_posix_pipedesc_cb(void *arg) +{ + nni_posix_pipedesc *pd = arg; + + nni_mtx_lock(&pd->mtx); + if (pd->node.revents & POLLIN) { + nni_posix_pipedesc_doread(pd); + } + if (pd->node.revents & POLLOUT) { + nni_posix_pipedesc_dowrite(pd); + } + if (pd->node.revents & (POLLHUP|POLLERR|POLLNVAL)) { + nni_posix_pipedesc_doclose(pd); + } + + pd->node.revents = 0; + pd->node.events = 0; + + if (!nni_list_empty(&pd->writeq)) { + pd->node.events |= POLLOUT; + } + if (!nni_list_empty(&pd->readq)) { + pd->node.events |= POLLIN; + } + + // If we still have uncompleted operations, resubmit us. + if (pd->node.events != 0) { + nni_posix_pollq_submit(pd->pq, &pd->node); + } + nni_mtx_unlock(&pd->mtx); +} + + +void +nni_posix_pipedesc_close(nni_posix_pipedesc *pd) +{ + nni_posix_pollq *pq; + nni_aio *aio; + + pq = pd->pq; + + nni_posix_pollq_cancel(pq, &pd->node); + + nni_mtx_lock(&pd->mtx); + if (pd->fd != -1) { + // Let any peer know we are closing. + shutdown(pd->fd, SHUT_RDWR); + } + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&pd->mtx); +} + + +static void +nni_posix_pipedesc_cancel(nni_aio *aio) +{ + nni_posix_pipedesc *pd; + + pd = aio->a_prov_data; + + nni_mtx_lock(&pd->mtx); + nni_aio_list_remove(aio); + nni_mtx_unlock(&pd->mtx); +} + + +void +nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) +{ + int rv; + + nni_mtx_lock(&pd->mtx); + if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + return; + } + if (pd->fd < 0) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + nni_mtx_unlock(&pd->mtx); + return; + } + + nni_aio_list_append(&pd->readq, aio); + if ((pd->node.events & POLLIN) == 0) { + pd->node.events |= POLLIN; + rv = nni_posix_pollq_submit(pd->pq, &pd->node); + if (rv != 0) { + nni_posix_pipedesc_finish(aio, rv); + nni_mtx_unlock(&pd->mtx); + return; + } + } + nni_mtx_unlock(&pd->mtx); +} + + +void +nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) +{ + int rv; + + nni_mtx_lock(&pd->mtx); + if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + return; + } + if (pd->fd < 0) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + nni_mtx_unlock(&pd->mtx); + return; + } + + nni_aio_list_append(&pd->writeq, aio); + if ((pd->node.events & POLLOUT) == 0) { + pd->node.events |= POLLOUT; + rv = nni_posix_pollq_submit(pd->pq, &pd->node); + if (rv != 0) { + nni_posix_pipedesc_finish(aio, rv); + nni_mtx_unlock(&pd->mtx); + return; + } + } + nni_mtx_unlock(&pd->mtx); +} + + +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) +{ + nni_posix_pipedesc *pd; + int rv; + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + memset(pd, 0, sizeof (*pd)); + + // We could randomly choose a different pollq, or for efficiencies + // sake we could take a modulo of the file desc number to choose + // one. For now we just have a global pollq. Note that by tying + // the pd to a single pollq we may get some kind of cache warmth. + + if ((rv = nni_mtx_init(&pd->mtx)) != 0) { + NNI_FREE_STRUCT(pd); + return (rv); + } + pd->pq = nni_posix_pollq_get(fd); + pd->fd = fd; + pd->node.fd = fd; + pd->node.cb = nni_posix_pipedesc_cb; + pd->node.data = pd; + + (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); + + nni_aio_list_init(&pd->readq); + nni_aio_list_init(&pd->writeq); + + *pdp = pd; + return (0); +} + + +void +nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) +{ + // Make sure no other polling activity is pending. + nni_posix_pipedesc_close(pd); + + nni_mtx_fini(&pd->mtx); + + NNI_FREE_STRUCT(pd); +} + + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_pipedesc_not_used = 0; + +#endif // PLATFORM_POSIX_PIPEDESC diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h new file mode 100644 index 00000000..74b39704 --- /dev/null +++ b/src/platform/posix/posix_pollq.h @@ -0,0 +1,43 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef PLATFORM_POSIX_POLLQ_H +#define PLATFORM_POSIX_POLLQ_H + +// This file defines structures we will use for emulating asynchronous I/O +// on POSIX. POSIX lacks the support for callback based asynchronous I/O +// that we have on Windows, although it has a non-widely support aio layer +// that is not very performant on many systems. So we emulate this using +// one of several possible different backends. + +#include "core/nng_impl.h" +#include <poll.h> + +typedef struct nni_posix_pollq_node nni_posix_pollq_node; +typedef struct nni_posix_pollq nni_posix_pollq; + +struct nni_posix_pollq_node { + nni_posix_pollq * pq; // associated pollq + nni_list_node node; // linkage into the pollq list + int index; // used by the poller impl + int armed; // used by the poller impl + int fd; // file descriptor to poll + int events; // events to watch for + int revents; // events received + void * data; // user data + nni_cb cb; // user callback on event +}; + +extern nni_posix_pollq *nni_posix_pollq_get(int); +extern int nni_posix_pollq_submit(nni_posix_pollq *, nni_posix_pollq_node *); +extern void nni_posix_pollq_cancel(nni_posix_pollq *, nni_posix_pollq_node *); +extern int nni_posix_pollq_sysinit(void); +extern void nni_posix_pollq_sysfini(void); + +#endif // PLATFORM_POSIX_POLLQ_H diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c new file mode 100644 index 00000000..a70c902d --- /dev/null +++ b/src/platform/posix/posix_pollq_poll.c @@ -0,0 +1,367 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" + +#ifdef NNG_USE_POSIX_POLLQ_POLL + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <unistd.h> +#include <poll.h> + + +// POSIX AIO using poll(). We use a single poll thread to perform +// I/O operations for the entire system. This isn't entirely scalable, +// and it might be a good idea to create a few threads and group the +// I/O operations into separate pollers to limit the amount of work each +// thread does, and to scale across multiple cores. For now we don't +// worry about it. + +// nni_posix_pollq is a work structure used by the poller thread, that keeps +// track of all the underlying pipe handles and so forth being used by poll(). +struct nni_posix_pollq { + nni_mtx mtx; + nni_cv cv; + struct pollfd * fds; + struct pollfd * newfds; + int nfds; + int nnewfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list nodes; // poll list + nni_list notify; // notify list + int nnodes; // num of nodes in nodes list + int cancel; // waiters for cancellation + int inpoll; // poller asleep in poll + + nni_posix_pollq_node * active; // active node (in callback) +}; + + + +static int +nni_posix_pollq_poll_grow(nni_posix_pollq *pq) +{ + int grow = pq->nnodes + 2; // one for us, one for waker + int noldfds; + struct pollfd *oldfds; + struct pollfd *newfds; + + if ((grow < pq->nfds) || (grow < pq->nnewfds)) { + return (0); + } + + grow = grow + 128; + + // Maybe we are adding a *lot* of pipes at once, and have to grow + // multiple times before the poller gets scheduled. In that case + // toss the old array before we finish. + oldfds = pq->newfds; + noldfds = pq->nnewfds; + + if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) { + return (NNG_ENOMEM); + } + + + pq->newfds = newfds; + pq->nnewfds = grow; + + if (noldfds != 0) { + nni_free(oldfds, noldfds * sizeof (struct pollfd)); + } + return (0); +} + + +static void +nni_posix_poll_thr(void *arg) +{ + nni_posix_pollq *pollq = arg; + nni_posix_pollq_node *node, *nextnode; + + nni_mtx_lock(&pollq->mtx); + for (;;) { + int rv; + int nfds; + struct pollfd *fds; + + if (pollq->close) { + break; + } + + if (pollq->newfds != NULL) { + // We have "grown" by the caller. Free up the old + // space, and start using the new. + nni_free(pollq->fds, + pollq->nfds * sizeof (struct pollfd)); + pollq->fds = pollq->newfds; + pollq->nfds = pollq->nnewfds; + pollq->newfds = NULL; + } + fds = pollq->fds; + nfds = 0; + + // The waker pipe is set up so that we will be woken + // when it is written (this allows us to be signaled). + fds[nfds].fd = pollq->wakerfd; + fds[nfds].events = POLLIN; + fds[nfds].revents = 0; + nfds++; + + // Set up the poll list. + NNI_LIST_FOREACH (&pollq->nodes, node) { + fds[nfds].fd = node->fd; + fds[nfds].events = node->armed; + fds[nfds].revents = 0; + node->index = nfds; + nfds++; + } + + // Now poll it. We block indefinitely, since we use separate + // timeouts to wake and remove the elements from the list. + pollq->inpoll = 1; + nni_mtx_unlock(&pollq->mtx); + rv = poll(fds, nfds, -1); + nni_mtx_lock(&pollq->mtx); + pollq->inpoll = 0; + + if (rv < 0) { + // This shouldn't happen really. If it does, we + // just try again. (EINTR is probably the only + // reasonable failure here, unless internal memory + // allocations fail in the kernel, leading to EAGAIN.) + continue; + } + + + // If the waker pipe was signaled, read from it. + if (fds[0].revents & POLLIN) { + NNI_ASSERT(fds[0].fd == pollq->wakerfd); + nni_plat_pipe_clear(pollq->wakerfd); + } + + // Now we iterate through all the nodes. Note that one + // may have been added or removed. New pipedescs will have + // their index set to -1. Removed ones will just be absent. + // Note that we may remove the pipedesc from the list, so we + // have to use a custom iterator. + nextnode = nni_list_first(&pollq->nodes); + while ((node = nextnode) != NULL) { + int index; + + // Save the nextpd for our next iteration. This + // way we can remove the PD from the list without + // breaking the iteration. + + nextnode = nni_list_next(&pollq->nodes, node); + + // If index is less than 1, then we have just added + // this and there is no FD for it in the pollfds. + if ((index = node->index) < 1) { + continue; + } + + // Clear the index for the next time around. + node->index = 0; + + node->revents = fds[index].revents; + + // Now we move this node to the callback list. + node->armed = 0; + nni_list_remove(&pollq->nodes, node); + nni_list_append(&pollq->notify, node); + pollq->nnodes--; + } + + // Finally we can call the callbacks. We record the + // active callback so any attempt to cancel blocks until + // the callback is finished. + while ((node = nni_list_first(&pollq->notify)) != NULL) { + nni_list_remove(&pollq->notify, node); + if (node->cb != NULL) { + pollq->active = node; + nni_mtx_unlock(&pollq->mtx); + node->cb(node->data); + nni_mtx_lock(&pollq->mtx); + pollq->active = NULL; + } + } + + // Wake any cancelers. + if (pollq->cancel != 0) { + pollq->cancel = 0; + nni_cv_wake(&pollq->cv); + } + } + nni_mtx_unlock(&pollq->mtx); +} + + +void +nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node) +{ + nni_mtx_lock(&pq->mtx); + while (pq->active == node) { + pq->cancel++; + nni_cv_wait(&pq->cv); + } + if (nni_list_active(&pq->nodes, node)) { + node->armed = 0; + nni_list_remove(&pq->nodes, node); + } + // Since we're not removing the fd from the outstanding poll, we + // may get an event. In that case, we'll wake and rebuild the + // pollset without it, with no further action. Otherwise having the + // poll present does no harm beyond the "spurious" wake of the poller + // thread. (If we had port_disassociate or somesuch, this would be + // a great time for that.) + nni_mtx_unlock(&pq->mtx); +} + + +int +nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node) +{ + int wake; + int rv; + int evs; + + nni_mtx_lock(&pq->mtx); + + if (node->events == 0) { + // Nothing to schedule? + nni_mtx_unlock(&pq->mtx); + return (0); + } + + if (node->armed == 0) { + NNI_ASSERT(!nni_list_active(&pq->nodes, node)); + + rv = nni_posix_pollq_poll_grow(pq); + if (rv != 0) { + nni_mtx_unlock(&pq->mtx); + return (rv); + } + + nni_list_append(&pq->nodes, node); + pq->nnodes++; + } + + node->armed |= node->events; + + // Wake up the poller since we're adding a new poll, but only bother + // if it's already asleep. (Frequently it will *not* be.) + if (pq->inpoll) { + nni_plat_pipe_raise(pq->wakewfd); + } + nni_mtx_unlock(&pq->mtx); + return (0); +} + + +static void +nni_posix_pollq_fini(nni_posix_pollq *pq) +{ + if (pq->started) { + nni_mtx_lock(&pq->mtx); + pq->close = 1; + pq->started = 0; + nni_plat_pipe_raise(pq->wakewfd); + + // All pipes should have been closed before this is called. + NNI_ASSERT(nni_list_empty(&pq->nodes)); + nni_mtx_unlock(&pq->mtx); + } + + nni_thr_fini(&pq->thr); + if (pq->wakewfd >= 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + pq->wakewfd = pq->wakerfd = -1; + } + nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd)); + nni_free(pq->fds, pq->nfds * sizeof (struct pollfd)); + nni_mtx_fini(&pq->mtx); +} + + +static int +nni_posix_pollq_init(nni_posix_pollq *pq) +{ + int rv; + + NNI_LIST_INIT(&pq->nodes, nni_posix_pollq_node, node); + NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node); + pq->wakewfd = -1; + pq->wakerfd = -1; + pq->close = 0; + + if (((rv = nni_mtx_init(&pq->mtx)) != 0) || + ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || + ((rv = nni_posix_pollq_poll_grow(pq)) != 0) || + ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || + ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { + nni_posix_pollq_fini(pq); + return (rv); + } + pq->started = 1; + nni_thr_run(&pq->thr); + return (0); +} + + +// We use a single pollq for the entire system, which means only a single +// thread is polling. This may be somewhat less than optimally efficient, +// and it may be worth investigating having multiple threads to improve +// efficiency and scalability. (This would shorten the linked lists, +// improving C10K scalability, and also allow us to engage multiple cores.) +// It's not entirely clear how many threads are "optimal". +static nni_posix_pollq nni_posix_global_pollq; + +nni_posix_pollq * +nni_posix_pollq_get(int fd) +{ + // This is the point where we could choose a pollq based on FD. + return (&nni_posix_global_pollq); +} + + +int +nni_posix_pollq_sysinit(void) +{ + int rv; + + rv = nni_posix_pollq_init(&nni_posix_global_pollq); + return (rv); +} + + +void +nni_posix_pollq_sysfini(void) +{ + nni_posix_pollq_fini(&nni_posix_global_pollq); +} + + +#else + +// Suppress empty symbols warnings in ranlib. +int nni_posix_pollq_poll_used = 0; + +#endif // NNG_USE_POSIX_POLLQ_POLL diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index b052ed53..171a87b9 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -299,7 +299,7 @@ nni_plat_init(int (*helper)(void)) // as scalable / thrifty with our use of VM. (void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); - if ((rv = nni_posix_pipedesc_sysinit()) != 0) { + if ((rv = nni_posix_pollq_sysinit()) != 0) { pthread_mutex_unlock(&nni_plat_lock); (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); @@ -310,7 +310,7 @@ nni_plat_init(int (*helper)(void)) if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&nni_plat_lock); - nni_posix_pipedesc_sysfini(); + nni_posix_pollq_sysfini(); (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); @@ -332,7 +332,7 @@ nni_plat_fini(void) { pthread_mutex_lock(&nni_plat_lock); if (nni_plat_inited) { - nni_posix_pipedesc_sysfini(); + nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); pthread_attr_destroy(&nni_pthread_attr); |
