diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 30 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 34 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 22 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_port.c | 268 |
7 files changed, 280 insertions, 89 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a3e36ee8..96747bc3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -106,7 +106,11 @@ if (NNG_PLATFORM_POSIX) ) endif() -if (NNG_HAVE_KQUEUE) +if (NNG_HAVE_PORT_CREATE) + set (NNG_SOURCES ${NNG_SOURCES} + platform/posix/posix_pollq_port.c + ) +elseif (NNG_HAVE_KQUEUE) set (NNG_SOURCES ${NNG_SOURCES} platform/posix/posix_pollq_kqueue.c ) diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index 3745f11f..e7225395 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -242,7 +242,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT); + nni_posix_pollq_remove(&pd->node); nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index bb441c7b..2c855da1 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -35,6 +35,8 @@ struct nni_posix_pollq_node { int revents; // events received void * data; // user data nni_cb cb; // user callback on event + nni_mtx mx; + nni_cv cv; }; extern nni_posix_pollq *nni_posix_pollq_get(int); @@ -46,7 +48,6 @@ extern void nni_posix_pollq_fini(nni_posix_pollq_node *); extern int nni_posix_pollq_add(nni_posix_pollq_node *); extern void nni_posix_pollq_remove(nni_posix_pollq_node *); extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int); -extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index fe831ec1..0f6867da 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -84,6 +84,7 @@ nni_posix_pollq_add(nni_posix_pollq_node *node) rv = nni_plat_errno(errno); nni_idhash_remove(pq->nodes, id); node->index = 0; + node->pq = NULL; } nni_mtx_unlock(&pq->mtx); @@ -105,7 +106,7 @@ nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) ev.data.u64 = (uint64_t) node->index; if (node->index != 0) { - // This dereegisters the node. If the poller was blocked + // This deregisters the node. If the poller was blocked // then this keeps it from coming back in to find us. nni_idhash_remove(pq->nodes, (uint64_t) node->index); } @@ -197,33 +198,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) nni_mtx_unlock(&pq->mtx); } -void -nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) -{ - struct epoll_event ev; - - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - - node->events &= ~events; - if (node->events == 0) { - ev.events = 0; - } else { - ev.events = node->events | NNI_EPOLL_FLAGS; - } - ev.data.u64 = (uint64_t) node->index; - - if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev) != 0) { - NNI_ASSERT(errno == EBADF || errno == ENOENT); - } - - nni_mtx_unlock(&pq->mtx); -} - static void nni_posix_poll_thr(void *arg) { diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index cff0dcf4..0f312170 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -205,40 +205,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) nni_mtx_unlock(&pq->mtx); } -void -nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) -{ - struct kevent kevents[2]; - int nevents = 0; - - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - - if ((node->events & POLLIN) && (events & POLLIN)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ, - EV_DISABLE, 0, 0, (kevent_udata_t) node); - } - - if ((node->events & POLLOUT) && (events & POLLOUT)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE, - EV_DISABLE, 0, 0, (kevent_udata_t) node); - } - - if (nevents > 0) { - int rv = kevent(pq->kq, kevents, nevents, NULL, 0, NULL); - if (rv < 0 && errno != ENOENT && errno != EBADF) { - NNI_ASSERT(false); - } - node->events &= ~events; - } - - nni_mtx_unlock(&pq->mtx); -} - static void nni_posix_poll_thr(void *arg) { diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 8e3bc741..efc9ff48 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -305,28 +305,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) nni_mtx_unlock(&pq->mtx); } -void -nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) -{ - nni_posix_pollq *pq = node->pq; - int oevents; - - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - oevents = node->events; - node->events &= ~events; - if ((node->events == 0) && (oevents != 0)) { - nni_list_node_remove(&node->node); - nni_list_append(&pq->idle, node); - } - // No need to wake anything, we might get a spurious wake up but - // that's harmless. - nni_mtx_unlock(&pq->mtx); -} - static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c new file mode 100644 index 00000000..2afeb651 --- /dev/null +++ b/src/platform/posix/posix_pollq_port.c @@ -0,0 +1,268 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Liam Staskawicz <liam@stask.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifdef NNG_HAVE_PORT_CREATE + +#include <errno.h> +#include <port.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> /* for strerror() */ +#include <unistd.h> + +#include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" + +// nni_posix_pollq is a work structure that manages state for the port-event +// based pollq implementation. We only really need to keep track of the +// single thread, and the associated port itself. +struct nni_posix_pollq { + int port; // port id (from port_create) + nni_thr thr; // worker thread +}; + +int +nni_posix_pollq_add(nni_posix_pollq_node *node) +{ + nni_posix_pollq *pq; + + pq = nni_posix_pollq_get(node->fd); + if (pq == NULL) { + return (NNG_EINVAL); + } + + nni_mtx_lock(&node->mx); + // ensure node was not previously associated with a pollq + if (node->pq != NULL) { + nni_mtx_unlock(&node->mx); + return (NNG_ESTATE); + } + + node->pq = pq; + node->events = 0; + node->armed = false; + nni_mtx_unlock(&node->mx); + + return (0); +} + +// nni_posix_pollq_remove removes the node from the pollq, but +// does not ensure that the pollq node is safe to destroy. In particular, +// this function can be called from a callback (the callback may be active). +void +nni_posix_pollq_remove(nni_posix_pollq_node *node) +{ + nni_posix_pollq *pq = node->pq; + + if (pq == NULL) { + return; + } + + nni_mtx_lock(&node->mx); + node->events = 0; + if (node->armed) { + // Failure modes that can occur are uninteresting. + (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); + node->armed = false; + } + nni_mtx_unlock(&node->mx); +} + +// nni_posix_pollq_init merely ensures that the node is ready for use. +// It does not register the node with any pollq in particular. +int +nni_posix_pollq_init(nni_posix_pollq_node *node) +{ + nni_mtx_init(&node->mx); + nni_cv_init(&node->cv, &node->mx); + node->pq = NULL; + node->armed = false; + NNI_LIST_NODE_INIT(&node->node); + return (0); +} + +// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, +// but it also ensures that the node is removed properly. +void +nni_posix_pollq_fini(nni_posix_pollq_node *node) +{ + nni_posix_pollq *pq = node->pq; + + nni_mtx_lock(&node->mx); + if ((pq = node->pq) != NULL) { + // Dissociate the port; if it isn't already associated we + // don't care. (An extra syscall, but it should not matter.) + (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); + node->armed = false; + + for (;;) { + if (port_send(pq->port, 0, node) == 0) { + break; + } + switch (errno) { + case EAGAIN: + case ENOMEM: + // Resource exhaustion. + // Best bet in these cases is to sleep it off. + // This may appear like a total application + // hang, but by sleeping here maybe we give + // a chance for things to clear up. + nni_mtx_unlock(&node->mx); + nni_msleep(5000); + nni_mtx_unlock(&node->mx); + continue; + case EBADFD: + case EBADF: + // Most likely these indicate that the pollq + // itself has been closed. That's ok. + break; + } + } + // Wait for the pollq thread to tell us with certainty that + // they are done. This is needed to ensure that the pollq + // thread isn't executing (or about to execute) the callback + // before we destroy it. + while (node->pq != NULL) { + nni_cv_wait(&node->cv); + } + } + nni_mtx_unlock(&node->mx); + nni_cv_fini(&node->cv); + nni_mtx_fini(&node->mx); +} + +void +nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +{ + nni_posix_pollq *pq = node->pq; + + NNI_ASSERT(pq != NULL); + if (events == 0) { + return; + } + + nni_mtx_lock(&node->mx); + node->events |= events; + node->armed = true; + (void) port_associate( + pq->port, PORT_SOURCE_FD, node->fd, node->events, node); + + // Possible errors here are: + // + // EBADF -- programming error on our part + // EBADFD -- programming error on our part + // ENOMEM -- not much we can do here + // EAGAIN -- too many port events registered (65K!!) + // + // For now we ignore them all. (We need to be able to return + // errors to our caller.) Effect on the application will appear + // to be a stalled file descriptor (no notifications). + nni_mtx_unlock(&node->mx); +} + +static void +nni_posix_poll_thr(void *arg) +{ + + for (;;) { + nni_posix_pollq * pq = arg; + port_event_t ev; + nni_posix_pollq_node *node; + + if (port_get(pq->port, &ev, NULL) != 0) { + if (errno == EINTR) { + continue; + } + return; + } + + switch (ev.portev_source) { + case PORT_SOURCE_ALERT: + return; + + case PORT_SOURCE_FD: + node = ev.portev_user; + + nni_mtx_lock(&node->mx); + node->revents = ev.portev_events; + // mark events as cleared + node->events &= ~node->revents; + node->armed = false; + nni_mtx_unlock(&node->mx); + + node->cb(node->data); + break; + + case PORT_SOURCE_USER: + // User event telling us to stop doing things. + // We signal back to use this as a coordination event + // between the pollq and the thread handler. + // NOTE: It is absolutely critical that there is only + // a single thread per pollq. Otherwise we cannot + // be sure that we are blocked completely, + node = ev.portev_user; + nni_mtx_lock(&node->mx); + node->pq = NULL; + nni_cv_wake(&node->cv); + nni_mtx_unlock(&node->mx); + } + } +} + +static void +nni_posix_pollq_destroy(nni_posix_pollq *pq) +{ + port_alert(pq->port, PORT_ALERT_SET, 1, NULL); + (void) close(pq->port); + nni_thr_fini(&pq->thr); +} + +static int +nni_posix_pollq_create(nni_posix_pollq *pq) +{ + int rv; + + if ((pq->port = port_create()) < 0) { + return (nni_plat_errno(errno)); + } + + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + nni_posix_pollq_destroy(pq); + return (rv); + } + + nni_thr_run(&pq->thr); + return (0); +} + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + +nni_posix_pollq * +nni_posix_pollq_get(int fd) +{ + NNI_ARG_UNUSED(fd); + return (&nni_posix_global_pollq); +} + +int +nni_posix_pollq_sysinit(void) +{ + return (nni_posix_pollq_create(&nni_posix_global_pollq)); +} + +void +nni_posix_pollq_sysfini(void) +{ + nni_posix_pollq_destroy(&nni_posix_global_pollq); +} + +#endif // NNG_HAVE_PORT_CREATE |
