diff options
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 432 |
1 files changed, 194 insertions, 238 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 0f312170..36ced3ff 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -12,196 +12,203 @@ #ifdef NNG_HAVE_KQUEUE #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ #include <sys/event.h> +#include <sys/socket.h> #include <unistd.h> #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -// TODO: can this be feature detected in cmake, -// rather than relying on platform? -#if defined NNG_PLATFORM_NETBSD -#define kevent_udata_t intptr_t -#else -#define kevent_udata_t void * -#endif - -#define NNI_MAX_KQUEUE_EVENTS 64 - -// user event id used to shutdown the polling thread -#define NNI_KQ_EV_EXIT_ID 0xF +typedef struct nni_posix_pollq nni_posix_pollq; // nni_posix_pollq is a work structure that manages state for the kqueue-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int kq; // kqueue handle - bool close; // request for worker to exit - bool started; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; +struct nni_posix_pfd { + nni_list_node node; // linkage into the reap list + nni_posix_pollq *pq; // associated pollq + int fd; // file descriptor to poll + void * data; // user data + nni_posix_pfd_cb cb; // user callback on event + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; + int events; + bool closing; + bool closed; +}; + +#define NNI_MAX_KQUEUE_EVENTS 64 + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { + nni_posix_pfd * pf; nni_posix_pollq *pq; - struct kevent kevents[2]; + struct kevent ev[2]; + unsigned flags = EV_ADD | EV_DISABLE; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); + if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - - node->pq = pq; - node->events = 0; - - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); + // Create entries in the kevent queue, without enabling them. + EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); + EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); - - if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) { - nni_mtx_unlock(&pq->mtx); + // We update the kqueue list, without polling for events. + if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + NNI_FREE_STRUCT(pf); return (nni_plat_errno(errno)); } + pf->fd = fd; + pf->cb = NULL; + pf->pq = pq; + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; - nni_mtx_unlock(&pq->mtx); return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +void +nni_posix_pfd_close(nni_posix_pfd *pf) { - struct kevent kevents[2]; - - node->events = 0; - node->pq = NULL; + nni_posix_pollq *pq = pf->pq; - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0, - (kevent_udata_t) node); - - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0, - 0, (kevent_udata_t) node); - - // So it turns out that we can get EBADF, ENOENT, and apparently - // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting - // an event, and its harmless if the event removal fails (worst - // case would be a spurious wakeup), so lets ignore it. - (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL); + nni_mtx_lock(&pq->mtx); + if (!pf->closing) { + struct kevent ev[2]; + pf->closing = true; + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + } + nni_mtx_unlock(&pq->mtx); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pf) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq; - if (pq == NULL) { - return; - } + pq = pf->pq; - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); + nni_posix_pfd_close(pf); - if (pq->close) { - nni_cv_wake(&pq->cv); + if (!nni_thr_is_self(&pq->thr)) { + struct kevent ev; + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pf); + EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL); + + // If this fails, the cleanup will stall. That should + // only occur in a memory pressure situation, and it + // will self-heal when the next event comes in. + (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); + while (!pf->closed) { + nni_cv_wait(&pf->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_unlock(&pq->mtx); + + (void) close(pf->fd); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); + NNI_FREE_STRUCT(pf); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. int -nni_posix_pollq_init(nni_posix_pollq_node *node) +nni_posix_pfd_fd(nni_posix_pfd *pf) { - NNI_ARG_UNUSED(node); - return (0); + return (pf->fd); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + NNI_ASSERT(cb != NULL); // must not be null when established. + nni_mtx_lock(&pf->mtx); + pf->cb = cb; + pf->data = arg; + nni_mtx_unlock(&pf->mtx); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pf, int events) { - nni_posix_pollq *pq = node->pq; - struct kevent kevents[2]; - int nevents = 0; + struct kevent ev[2]; + int nev = 0; + unsigned flags = EV_ENABLE | EV_DISPATCH; + nni_posix_pollq *pq = pf->pq; + + nni_mtx_lock(&pf->mtx); + if (pf->closing) { + events = 0; + } else { + pf->events |= events; + events = pf->events; + } + nni_mtx_unlock(&pf->mtx); - NNI_ASSERT(pq != NULL); if (events == 0) { - return; + // No events, and kqueue is oneshot, so nothing to do. + return (0); } - nni_mtx_lock(&pq->mtx); - - if (!(node->events & POLLIN) && (events & POLLIN)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLIN) { + EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - - if (!(node->events & POLLOUT) && (events & POLLOUT)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLOUT) { + EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } - - if (nevents > 0) { - // This call should never fail, really. The only possible - // legitimate failure would be ENOMEM, but in that case - // lots of other things are going to be failing, or ENOENT - // or ESRCH, indicating we already lost interest; the - // only consequence of ignoring these errors is that a given - // descriptor might appear "stuck". This beats the alternative - // of just blithely crashing the application with an assertion. - (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL); - node->events |= events; + while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); } + return (0); +} +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pf; + nni_mtx_lock(&pq->mtx); + while ((pf = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pf); + pf->closed = true; + nni_cv_wake(&pf->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -209,117 +216,71 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq *pq = arg; - struct kevent kevents[NNI_MAX_KQUEUE_EVENTS]; - - nni_mtx_lock(&pq->mtx); - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - nevents = kevent( - pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL); - nni_mtx_lock(&pq->mtx); - - if (nevents < 0) { - continue; + for (;;) { + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd * pf; + nni_posix_pfd_cb cb; + void * cbarg; + int revents; + bool reap = false; + + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); + if (n < 0) { + if (errno == EBADF) { + nni_posix_pollq_reap(pq); + return; + } + reap = true; } - // dispatch events - for (i = 0; i < nevents; ++i) { - struct kevent ev_disable; - const struct kevent * ev; - nni_posix_pollq_node *node; + for (int i = 0; i < n; i++) { + struct kevent *ev = &evs[i]; - ev = &kevents[i]; - if (ev->filter == EVFILT_USER && - ev->ident == NNI_KQ_EV_EXIT_ID) { - // we've woken up to exit the polling thread + switch (ev->filter) { + case EVFILT_READ: + revents = POLLIN; break; - } - - node = (nni_posix_pollq_node *) ev->udata; - if (node->pq == NULL) { - // node was removed while we were blocking + case EVFILT_WRITE: + revents = POLLOUT; + break; + case EVFILT_USER: + default: + reap = true; continue; } - node->revents = 0; - + pf = (void *) ev->udata; if (ev->flags & (EV_ERROR | EV_EOF)) { - node->revents |= POLLHUP; - } - if (ev->filter == EVFILT_WRITE) { - node->revents |= POLLOUT; - } else if (ev->filter == EVFILT_READ) { - node->revents |= POLLIN; - } else { - NNI_ASSERT(false); // unhandled filter - break; + revents |= POLLHUP; } - // explicitly disable this event. we'd ideally rely on - // the behavior of EV_DISPATCH to do this, - // but that only happens once we've acknowledged the - // event by reading/or writing the fd. because there - // can currently be some latency between the time we - // receive this event and the time we read/write in - // response, disable the event in the meantime to avoid - // needless wakeups. - // revisit if we're able to reduce/remove this latency. - EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter, - EV_DISABLE, 0, 0, NULL); - // this will only fail if the fd is already - // closed/invalid which we don't mind anyway, - // so ignore return value. - (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); - nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pf->mtx); + cb = pf->cb; + cbarg = pf->data; + pf->events &= ~(revents); + nni_mtx_unlock(&pf->mtx); + + if (cb != NULL) { + cb(pf, revents, cbarg); } } + if (reap) { + nni_posix_pollq_reap(pq); + } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE, - NOTE_TRIGGER, 0, NULL); - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); - if (pq->kq >= 0) { close(pq->kq); pq->kq = -1; } + nni_thr_fini(&pq->thr); + + nni_posix_pollq_reap(pq); nni_mtx_fini(&pq->mtx); } @@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) static int nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) { - // add user event so we can wake ourself on exit + int rv; struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL); - return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL))); + + EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL); + while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + return (0); } static int @@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (nni_plat_errno(errno)); } - pq->close = false; - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { @@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (rv); } - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } |
