aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pollq_kqueue.c
diff options
context:
space:
mode:
authorLiam Staskawicz <liam@stask.net>2018-01-15 14:38:13 -0800
committerGarrett D'Amore <garrett@damore.org>2018-03-02 08:58:27 -0800
commit7849fd286533560f10eff202f24007ea6b3dd787 (patch)
treed9693bcaf99b3b1bdc12c784c40da8a91e46eba9 /src/platform/posix/posix_pollq_kqueue.c
parent91089a2a60d2a74334fc67757fd23ee1f3ae56d5 (diff)
downloadnng-7849fd286533560f10eff202f24007ea6b3dd787.tar.gz
nng-7849fd286533560f10eff202f24007ea6b3dd787.tar.bz2
nng-7849fd286533560f10eff202f24007ea6b3dd787.zip
kqueue: add kqueue-based pollq implementation
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
new file mode 100644
index 00000000..d9a89472
--- /dev/null
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -0,0 +1,402 @@
+#ifdef NNG_HAVE_KQUEUE
+
+#include <errno.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h> /* for strerror() */
+#include <sys/event.h>
+#include <unistd.h>
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
+
+// TODO: can this be feature detected in cmake,
+// rather than relying on platform?
+#if defined NNG_PLATFORM_NETBSD
+#define kevent_udata_t intptr_t
+#else
+#define kevent_udata_t void *
+#endif
+
+#define NNI_MAX_KQUEUE_EVENTS 64
+
+// user event id used to shutdown the polling thread
+#define NNI_KQ_EV_EXIT_ID 0xF
+
+// nni_posix_pollq is a work structure that manages state for the kqueue-based
+// pollq implementation
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ nni_cv cv;
+ int kq; // kqueue handle
+ bool close; // request for worker to exit
+ bool started;
+ nni_thr thr; // worker thread
+ nni_posix_pollq_node *wait; // cancel waiting on this
+ nni_posix_pollq_node *active; // active node (in callback)
+};
+
+int
+nni_posix_pollq_add(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq;
+ struct kevent kevents[2];
+
+ pq = nni_posix_pollq_get(node->fd);
+ if (pq == NULL) {
+ return (NNG_EINVAL);
+ }
+
+ // ensure node was not previously associated with a pollq
+ if (node->pq != NULL) {
+ return (NNG_ESTATE);
+ }
+
+ nni_mtx_lock(&pq->mtx);
+ if (pq->close) {
+ // This shouldn't happen!
+ nni_mtx_unlock(&pq->mtx);
+ return (NNG_ECLOSED);
+ }
+
+ node->pq = pq;
+ node->events = 0;
+
+ EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ,
+ EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+
+ EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE,
+ EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+
+ if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return (nni_plat_errno(errno));
+ }
+
+ nni_mtx_unlock(&pq->mtx);
+ return (0);
+}
+
+// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
+// called while pq's lock is held
+static void
+nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+{
+ int rv;
+ struct kevent kevents[2];
+
+ node->events = 0;
+ node->pq = NULL;
+
+ EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0,
+ (kevent_udata_t) node);
+
+ EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0,
+ 0, (kevent_udata_t) node);
+
+ rv = kevent(pq->kq, kevents, 2, NULL, 0, NULL);
+ // allow errnos that indicate the fd has already been removed
+ if (rv < 0 && errno != EBADF && errno != ENOENT) {
+ NNI_ASSERT(false);
+ }
+}
+
+// nni_posix_pollq_remove removes the node from the pollq, but
+// does not ensure that the pollq node is safe to destroy. In particular,
+// this function can be called from a callback (the callback may be active).
+void
+nni_posix_pollq_remove(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&pq->mtx);
+ nni_posix_pollq_remove_helper(pq, node);
+
+ if (pq->close) {
+ nni_cv_wake(&pq->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+// nni_posix_pollq_init merely ensures that the node is ready for use.
+// It does not register the node with any pollq in particular.
+int
+nni_posix_pollq_init(nni_posix_pollq_node *node)
+{
+ NNI_ARG_UNUSED(node);
+ return (0);
+}
+
+// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
+// but it also ensures that the callback is not active, so that the node
+// may be deallocated. This function must not be called in a callback.
+void
+nni_posix_pollq_fini(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&pq->mtx);
+ while (pq->active == node) {
+ pq->wait = node;
+ nni_cv_wait(&pq->cv);
+ }
+
+ nni_posix_pollq_remove_helper(pq, node);
+
+ if (pq->close) {
+ nni_cv_wake(&pq->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+{
+ nni_posix_pollq *pq = node->pq;
+ struct kevent kevents[2];
+ int nevents = 0;
+
+ NNI_ASSERT(pq != NULL);
+ if (events == 0) {
+ return;
+ }
+
+ nni_mtx_lock(&pq->mtx);
+
+ if (!(node->events & POLLIN) && (events & POLLIN)) {
+ EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
+ EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ }
+
+ if (!(node->events & POLLOUT) && (events & POLLOUT)) {
+ EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
+ EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ }
+
+ if (nevents > 0) {
+ int rv;
+ rv = kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
+ NNI_ASSERT(rv >= 0);
+ node->events |= events;
+ }
+
+ nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
+{
+ struct kevent kevents[2];
+ int nevents = 0;
+
+ nni_posix_pollq *pq = node->pq;
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&pq->mtx);
+
+ if ((node->events & POLLIN) && (events & POLLIN)) {
+ EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
+ EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ }
+
+ if ((node->events & POLLOUT) && (events & POLLOUT)) {
+ EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
+ EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ }
+
+ if (nevents > 0) {
+ int rv = kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
+ if (rv < 0 && errno != ENOENT && errno != EBADF) {
+ NNI_ASSERT(false);
+ }
+ node->events &= ~events;
+ }
+
+ nni_mtx_unlock(&pq->mtx);
+}
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pq = arg;
+ struct kevent kevents[NNI_MAX_KQUEUE_EVENTS];
+
+ nni_mtx_lock(&pq->mtx);
+
+ while (!pq->close) {
+ int i;
+ int nevents;
+
+ // block indefinitely, timers are handled separately
+ nni_mtx_unlock(&pq->mtx);
+ nevents = kevent(
+ pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL);
+ nni_mtx_lock(&pq->mtx);
+
+ if (nevents < 0) {
+ continue;
+ }
+
+ // dispatch events
+ for (i = 0; i < nevents; ++i) {
+ struct kevent ev_disable;
+ const struct kevent * ev;
+ nni_posix_pollq_node *node;
+
+ ev = &kevents[i];
+ if (ev->filter == EVFILT_USER &&
+ ev->ident == NNI_KQ_EV_EXIT_ID) {
+ // we've woken up to exit the polling thread
+ break;
+ }
+
+ node = (nni_posix_pollq_node *) ev->udata;
+ if (node->pq == NULL) {
+ // node was removed while we were blocking
+ continue;
+ }
+ node->revents = 0;
+
+ if (ev->flags & (EV_ERROR | EV_EOF)) {
+ node->revents |= POLLHUP;
+ }
+ if (ev->filter == EVFILT_WRITE) {
+ node->revents |= POLLOUT;
+ } else if (ev->filter == EVFILT_READ) {
+ node->revents |= POLLIN;
+ } else {
+ NNI_ASSERT(false); // unhandled filter
+ break;
+ }
+
+ // explicitly disable this event. we'd ideally rely on
+ // the behavior of EV_DISPATCH to do this,
+ // but that only happens once we've acknowledged the
+ // event by reading/or writing the fd. because there
+ // can currently be some latency between the time we
+ // receive this event and the time we read/write in
+ // response, disable the event in the meantime to avoid
+ // needless wakeups.
+ // revisit if we're able to reduce/remove this latency.
+ EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter,
+ EV_DISABLE, 0, 0, NULL);
+ // this will only fail if the fd is already
+ // closed/invalid which we don't mind anyway,
+ // so ignore return value.
+ kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL);
+
+ // mark events as cleared
+ node->events &= ~node->revents;
+
+ // Save the active node; we can notice this way
+ // when it is busy, and avoid freeing it until
+ // we are sure that it is not in use.
+ pq->active = node;
+
+ // Execute the callback with lock released
+ nni_mtx_unlock(&pq->mtx);
+ node->cb(node->data);
+ nni_mtx_lock(&pq->mtx);
+
+ // We finished with this node. If something
+ // was blocked waiting for that, wake it up.
+ pq->active = NULL;
+ if (pq->wait == node) {
+ pq->wait = NULL;
+ nni_cv_wake(&pq->cv);
+ }
+ }
+ }
+
+ nni_mtx_unlock(&pq->mtx);
+}
+
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
+{
+ if (pq->started) {
+ struct kevent ev;
+ EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE,
+ NOTE_TRIGGER, 0, NULL);
+ nni_mtx_lock(&pq->mtx);
+ pq->close = true;
+ pq->started = false;
+ kevent(pq->kq, &ev, 1, NULL, 0, NULL);
+ nni_mtx_unlock(&pq->mtx);
+ }
+ nni_thr_fini(&pq->thr);
+
+ if (pq->kq >= 0) {
+ close(pq->kq);
+ pq->kq = -1;
+ }
+
+ nni_mtx_fini(&pq->mtx);
+}
+
+static int
+nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
+{
+ // add user event so we can wake ourself on exit
+ struct kevent ev;
+ EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL)));
+}
+
+static int
+nni_posix_pollq_create(nni_posix_pollq *pq)
+{
+ int rv;
+
+ if ((pq->kq = kqueue()) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ pq->close = false;
+
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
+
+ if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
+ (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
+ nni_posix_pollq_destroy(pq);
+ return (rv);
+ }
+
+ pq->started = true;
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
+nni_posix_pollq *
+nni_posix_pollq_get(int fd)
+{
+ NNI_ARG_UNUSED(fd);
+ return (&nni_posix_global_pollq);
+}
+
+int
+nni_posix_pollq_sysinit(void)
+{
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
+}
+
+void
+nni_posix_pollq_sysfini(void)
+{
+ nni_posix_pollq_destroy(&nni_posix_global_pollq);
+}
+
+#endif // NNG_HAVE_KQUEUE