aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pollq_kqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c432
1 files changed, 194 insertions, 238 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 0f312170..36ced3ff 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -12,196 +12,203 @@
#ifdef NNG_HAVE_KQUEUE
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
#include <sys/event.h>
+#include <sys/socket.h>
#include <unistd.h>
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-// TODO: can this be feature detected in cmake,
-// rather than relying on platform?
-#if defined NNG_PLATFORM_NETBSD
-#define kevent_udata_t intptr_t
-#else
-#define kevent_udata_t void *
-#endif
-
-#define NNI_MAX_KQUEUE_EVENTS 64
-
-// user event id used to shutdown the polling thread
-#define NNI_KQ_EV_EXIT_ID 0xF
+typedef struct nni_posix_pollq nni_posix_pollq;
// nni_posix_pollq is a work structure that manages state for the kqueue-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int kq; // kqueue handle
- bool close; // request for worker to exit
- bool started;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
+struct nni_posix_pfd {
+ nni_list_node node; // linkage into the reap list
+ nni_posix_pollq *pq; // associated pollq
+ int fd; // file descriptor to poll
+ void * data; // user data
+ nni_posix_pfd_cb cb; // user callback on event
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
+ int events;
+ bool closing;
+ bool closed;
+};
+
+#define NNI_MAX_KQUEUE_EVENTS 64
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
+ nni_posix_pfd * pf;
nni_posix_pollq *pq;
- struct kevent kevents[2];
+ struct kevent ev[2];
+ unsigned flags = EV_ADD | EV_DISABLE;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
+ if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
-
- node->pq = pq;
- node->events = 0;
-
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ // Create entries in the kevent queue, without enabling them.
+ EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
+ EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
-
- if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) {
- nni_mtx_unlock(&pq->mtx);
+ // We update the kqueue list, without polling for events.
+ if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ NNI_FREE_STRUCT(pf);
return (nni_plat_errno(errno));
}
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->pq = pq;
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
- nni_mtx_unlock(&pq->mtx);
return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+void
+nni_posix_pfd_close(nni_posix_pfd *pf)
{
- struct kevent kevents[2];
-
- node->events = 0;
- node->pq = NULL;
+ nni_posix_pollq *pq = pf->pq;
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0,
- (kevent_udata_t) node);
-
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0,
- 0, (kevent_udata_t) node);
-
- // So it turns out that we can get EBADF, ENOENT, and apparently
- // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting
- // an event, and its harmless if the event removal fails (worst
- // case would be a spurious wakeup), so lets ignore it.
- (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL);
+ nni_mtx_lock(&pq->mtx);
+ if (!pf->closing) {
+ struct kevent ev[2];
+ pf->closing = true;
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq;
- if (pq == NULL) {
- return;
- }
+ pq = pf->pq;
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
+ nni_posix_pfd_close(pf);
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ if (!nni_thr_is_self(&pq->thr)) {
+ struct kevent ev;
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pf);
+ EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL);
+
+ // If this fails, the cleanup will stall. That should
+ // only occur in a memory pressure situation, and it
+ // will self-heal when the next event comes in.
+ (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
+ while (!pf->closed) {
+ nni_cv_wait(&pf->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_unlock(&pq->mtx);
+
+ (void) close(pf->fd);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
+ NNI_FREE_STRUCT(pf);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+nni_posix_pfd_fd(nni_posix_pfd *pf)
{
- NNI_ARG_UNUSED(node);
- return (0);
+ return (pf->fd);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+ nni_mtx_lock(&pf->mtx);
+ pf->cb = cb;
+ pf->data = arg;
+ nni_mtx_unlock(&pf->mtx);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pf, int events)
{
- nni_posix_pollq *pq = node->pq;
- struct kevent kevents[2];
- int nevents = 0;
+ struct kevent ev[2];
+ int nev = 0;
+ unsigned flags = EV_ENABLE | EV_DISPATCH;
+ nni_posix_pollq *pq = pf->pq;
+
+ nni_mtx_lock(&pf->mtx);
+ if (pf->closing) {
+ events = 0;
+ } else {
+ pf->events |= events;
+ events = pf->events;
+ }
+ nni_mtx_unlock(&pf->mtx);
- NNI_ASSERT(pq != NULL);
if (events == 0) {
- return;
+ // No events, and kqueue is oneshot, so nothing to do.
+ return (0);
}
- nni_mtx_lock(&pq->mtx);
-
- if (!(node->events & POLLIN) && (events & POLLIN)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLIN) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
-
- if (!(node->events & POLLOUT) && (events & POLLOUT)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLOUT) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
-
- if (nevents > 0) {
- // This call should never fail, really. The only possible
- // legitimate failure would be ENOMEM, but in that case
- // lots of other things are going to be failing, or ENOENT
- // or ESRCH, indicating we already lost interest; the
- // only consequence of ignoring these errors is that a given
- // descriptor might appear "stuck". This beats the alternative
- // of just blithely crashing the application with an assertion.
- (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
- node->events |= events;
+ while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
}
+ return (0);
+}
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pf;
+ nni_mtx_lock(&pq->mtx);
+ while ((pf = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pf);
+ pf->closed = true;
+ nni_cv_wake(&pf->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -209,117 +216,71 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
- struct kevent kevents[NNI_MAX_KQUEUE_EVENTS];
-
- nni_mtx_lock(&pq->mtx);
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
- nevents = kevent(
- pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL);
- nni_mtx_lock(&pq->mtx);
-
- if (nevents < 0) {
- continue;
+ for (;;) {
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd * pf;
+ nni_posix_pfd_cb cb;
+ void * cbarg;
+ int revents;
+ bool reap = false;
+
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
+ if (n < 0) {
+ if (errno == EBADF) {
+ nni_posix_pollq_reap(pq);
+ return;
+ }
+ reap = true;
}
- // dispatch events
- for (i = 0; i < nevents; ++i) {
- struct kevent ev_disable;
- const struct kevent * ev;
- nni_posix_pollq_node *node;
+ for (int i = 0; i < n; i++) {
+ struct kevent *ev = &evs[i];
- ev = &kevents[i];
- if (ev->filter == EVFILT_USER &&
- ev->ident == NNI_KQ_EV_EXIT_ID) {
- // we've woken up to exit the polling thread
+ switch (ev->filter) {
+ case EVFILT_READ:
+ revents = POLLIN;
break;
- }
-
- node = (nni_posix_pollq_node *) ev->udata;
- if (node->pq == NULL) {
- // node was removed while we were blocking
+ case EVFILT_WRITE:
+ revents = POLLOUT;
+ break;
+ case EVFILT_USER:
+ default:
+ reap = true;
continue;
}
- node->revents = 0;
-
+ pf = (void *) ev->udata;
if (ev->flags & (EV_ERROR | EV_EOF)) {
- node->revents |= POLLHUP;
- }
- if (ev->filter == EVFILT_WRITE) {
- node->revents |= POLLOUT;
- } else if (ev->filter == EVFILT_READ) {
- node->revents |= POLLIN;
- } else {
- NNI_ASSERT(false); // unhandled filter
- break;
+ revents |= POLLHUP;
}
- // explicitly disable this event. we'd ideally rely on
- // the behavior of EV_DISPATCH to do this,
- // but that only happens once we've acknowledged the
- // event by reading/or writing the fd. because there
- // can currently be some latency between the time we
- // receive this event and the time we read/write in
- // response, disable the event in the meantime to avoid
- // needless wakeups.
- // revisit if we're able to reduce/remove this latency.
- EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter,
- EV_DISABLE, 0, 0, NULL);
- // this will only fail if the fd is already
- // closed/invalid which we don't mind anyway,
- // so ignore return value.
- (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pf->mtx);
+ cb = pf->cb;
+ cbarg = pf->data;
+ pf->events &= ~(revents);
+ nni_mtx_unlock(&pf->mtx);
+
+ if (cb != NULL) {
+ cb(pf, revents, cbarg);
}
}
+ if (reap) {
+ nni_posix_pollq_reap(pq);
+ }
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE,
- NOTE_TRIGGER, 0, NULL);
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
-
if (pq->kq >= 0) {
close(pq->kq);
pq->kq = -1;
}
+ nni_thr_fini(&pq->thr);
+
+ nni_posix_pollq_reap(pq);
nni_mtx_fini(&pq->mtx);
}
@@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
static int
nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
{
- // add user event so we can wake ourself on exit
+ int rv;
struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL);
- return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL)));
+
+ EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ return (0);
}
static int
@@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (nni_plat_errno(errno));
}
- pq->close = false;
-
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
(rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
@@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (rv);
}
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
+
return (nni_posix_pollq_create(&nni_posix_global_pollq));
}