diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-09 17:21:27 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-14 17:09:20 -0700 |
| commit | 16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch) | |
| tree | 9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/platform/posix/posix_pollq_kqueue.c | |
| parent | e0beb13b066d27ce32347a1c18c9d441828dc553 (diff) | |
| download | nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2 nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip | |
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch()
fixes #410 kqueue implementation could be smarter
fixes #411 epoll_implementation could be smarter
fixes #426 synchronous completion can lead to panic
fixes #421 pipe close race condition/duplicate destroy
This is a major refactoring of two significant parts of the code base,
which are closely interrelated.
First the aio and taskq framework have undergone a number of simplifications,
and improvements. We have ditched a few parts of the internal API (for
example tasks no longer support cancellation) that weren't terribly useful
but added a lot of complexity, and we've made aio_schedule something that
now checks for cancellation or other "premature" completions. The
aio framework now uses the tasks more tightly, so that aio wait can
devolve into just nni_task_wait(). We did have to add a "task_prep()"
step to prevent race conditions.
Second, the entire POSIX poller framework has been simplified, and made
more robust, and more scalable. There were some fairly inherent race
conditions around the shutdown/close code, where we *thought* we were
synchronizing against the other thread, but weren't doing so adequately.
With a cleaner design, we've been able to tighten up the implementation
to remove these race conditions, while substantially reducing the chance
for lock contention, thereby improving scalability. The illumos poller
also got a performance boost by polling for multiple events.
In highly "busy" systems, we expect to see vast reductions in lock
contention, and therefore greater scalability, in addition to overall
improved reliability.
One area where we currently can do better is that there is still only
a single poller thread run. Scaling this out is a task that has to be done
differently for each poller, and carefuly to ensure that close conditions
are safe on all pollers, and that no chance for deadlock/livelock waiting
for pfd finalizers can occur.
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 432 |
1 files changed, 194 insertions, 238 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 0f312170..36ced3ff 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -12,196 +12,203 @@ #ifdef NNG_HAVE_KQUEUE #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ #include <sys/event.h> +#include <sys/socket.h> #include <unistd.h> #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -// TODO: can this be feature detected in cmake, -// rather than relying on platform? -#if defined NNG_PLATFORM_NETBSD -#define kevent_udata_t intptr_t -#else -#define kevent_udata_t void * -#endif - -#define NNI_MAX_KQUEUE_EVENTS 64 - -// user event id used to shutdown the polling thread -#define NNI_KQ_EV_EXIT_ID 0xF +typedef struct nni_posix_pollq nni_posix_pollq; // nni_posix_pollq is a work structure that manages state for the kqueue-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int kq; // kqueue handle - bool close; // request for worker to exit - bool started; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; +struct nni_posix_pfd { + nni_list_node node; // linkage into the reap list + nni_posix_pollq *pq; // associated pollq + int fd; // file descriptor to poll + void * data; // user data + nni_posix_pfd_cb cb; // user callback on event + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; + int events; + bool closing; + bool closed; +}; + +#define NNI_MAX_KQUEUE_EVENTS 64 + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { + nni_posix_pfd * pf; nni_posix_pollq *pq; - struct kevent kevents[2]; + struct kevent ev[2]; + unsigned flags = EV_ADD | EV_DISABLE; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); + if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - - node->pq = pq; - node->events = 0; - - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); + // Create entries in the kevent queue, without enabling them. + EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); + EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); - - if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) { - nni_mtx_unlock(&pq->mtx); + // We update the kqueue list, without polling for events. + if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + NNI_FREE_STRUCT(pf); return (nni_plat_errno(errno)); } + pf->fd = fd; + pf->cb = NULL; + pf->pq = pq; + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; - nni_mtx_unlock(&pq->mtx); return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +void +nni_posix_pfd_close(nni_posix_pfd *pf) { - struct kevent kevents[2]; - - node->events = 0; - node->pq = NULL; + nni_posix_pollq *pq = pf->pq; - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0, - (kevent_udata_t) node); - - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0, - 0, (kevent_udata_t) node); - - // So it turns out that we can get EBADF, ENOENT, and apparently - // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting - // an event, and its harmless if the event removal fails (worst - // case would be a spurious wakeup), so lets ignore it. - (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL); + nni_mtx_lock(&pq->mtx); + if (!pf->closing) { + struct kevent ev[2]; + pf->closing = true; + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + } + nni_mtx_unlock(&pq->mtx); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pf) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq; - if (pq == NULL) { - return; - } + pq = pf->pq; - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); + nni_posix_pfd_close(pf); - if (pq->close) { - nni_cv_wake(&pq->cv); + if (!nni_thr_is_self(&pq->thr)) { + struct kevent ev; + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pf); + EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL); + + // If this fails, the cleanup will stall. That should + // only occur in a memory pressure situation, and it + // will self-heal when the next event comes in. + (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); + while (!pf->closed) { + nni_cv_wait(&pf->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_unlock(&pq->mtx); + + (void) close(pf->fd); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); + NNI_FREE_STRUCT(pf); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. int -nni_posix_pollq_init(nni_posix_pollq_node *node) +nni_posix_pfd_fd(nni_posix_pfd *pf) { - NNI_ARG_UNUSED(node); - return (0); + return (pf->fd); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + NNI_ASSERT(cb != NULL); // must not be null when established. + nni_mtx_lock(&pf->mtx); + pf->cb = cb; + pf->data = arg; + nni_mtx_unlock(&pf->mtx); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pf, int events) { - nni_posix_pollq *pq = node->pq; - struct kevent kevents[2]; - int nevents = 0; + struct kevent ev[2]; + int nev = 0; + unsigned flags = EV_ENABLE | EV_DISPATCH; + nni_posix_pollq *pq = pf->pq; + + nni_mtx_lock(&pf->mtx); + if (pf->closing) { + events = 0; + } else { + pf->events |= events; + events = pf->events; + } + nni_mtx_unlock(&pf->mtx); - NNI_ASSERT(pq != NULL); if (events == 0) { - return; + // No events, and kqueue is oneshot, so nothing to do. + return (0); } - nni_mtx_lock(&pq->mtx); - - if (!(node->events & POLLIN) && (events & POLLIN)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLIN) { + EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - - if (!(node->events & POLLOUT) && (events & POLLOUT)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLOUT) { + EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } - - if (nevents > 0) { - // This call should never fail, really. The only possible - // legitimate failure would be ENOMEM, but in that case - // lots of other things are going to be failing, or ENOENT - // or ESRCH, indicating we already lost interest; the - // only consequence of ignoring these errors is that a given - // descriptor might appear "stuck". This beats the alternative - // of just blithely crashing the application with an assertion. - (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL); - node->events |= events; + while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); } + return (0); +} +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pf; + nni_mtx_lock(&pq->mtx); + while ((pf = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pf); + pf->closed = true; + nni_cv_wake(&pf->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -209,117 +216,71 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq *pq = arg; - struct kevent kevents[NNI_MAX_KQUEUE_EVENTS]; - - nni_mtx_lock(&pq->mtx); - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - nevents = kevent( - pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL); - nni_mtx_lock(&pq->mtx); - - if (nevents < 0) { - continue; + for (;;) { + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd * pf; + nni_posix_pfd_cb cb; + void * cbarg; + int revents; + bool reap = false; + + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); + if (n < 0) { + if (errno == EBADF) { + nni_posix_pollq_reap(pq); + return; + } + reap = true; } - // dispatch events - for (i = 0; i < nevents; ++i) { - struct kevent ev_disable; - const struct kevent * ev; - nni_posix_pollq_node *node; + for (int i = 0; i < n; i++) { + struct kevent *ev = &evs[i]; - ev = &kevents[i]; - if (ev->filter == EVFILT_USER && - ev->ident == NNI_KQ_EV_EXIT_ID) { - // we've woken up to exit the polling thread + switch (ev->filter) { + case EVFILT_READ: + revents = POLLIN; break; - } - - node = (nni_posix_pollq_node *) ev->udata; - if (node->pq == NULL) { - // node was removed while we were blocking + case EVFILT_WRITE: + revents = POLLOUT; + break; + case EVFILT_USER: + default: + reap = true; continue; } - node->revents = 0; - + pf = (void *) ev->udata; if (ev->flags & (EV_ERROR | EV_EOF)) { - node->revents |= POLLHUP; - } - if (ev->filter == EVFILT_WRITE) { - node->revents |= POLLOUT; - } else if (ev->filter == EVFILT_READ) { - node->revents |= POLLIN; - } else { - NNI_ASSERT(false); // unhandled filter - break; + revents |= POLLHUP; } - // explicitly disable this event. we'd ideally rely on - // the behavior of EV_DISPATCH to do this, - // but that only happens once we've acknowledged the - // event by reading/or writing the fd. because there - // can currently be some latency between the time we - // receive this event and the time we read/write in - // response, disable the event in the meantime to avoid - // needless wakeups. - // revisit if we're able to reduce/remove this latency. - EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter, - EV_DISABLE, 0, 0, NULL); - // this will only fail if the fd is already - // closed/invalid which we don't mind anyway, - // so ignore return value. - (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); - nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pf->mtx); + cb = pf->cb; + cbarg = pf->data; + pf->events &= ~(revents); + nni_mtx_unlock(&pf->mtx); + + if (cb != NULL) { + cb(pf, revents, cbarg); } } + if (reap) { + nni_posix_pollq_reap(pq); + } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE, - NOTE_TRIGGER, 0, NULL); - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); - if (pq->kq >= 0) { close(pq->kq); pq->kq = -1; } + nni_thr_fini(&pq->thr); + + nni_posix_pollq_reap(pq); nni_mtx_fini(&pq->mtx); } @@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) static int nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) { - // add user event so we can wake ourself on exit + int rv; struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL); - return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL))); + + EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL); + while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + return (0); } static int @@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (nni_plat_errno(errno)); } - pq->close = false; - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { @@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (rv); } - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } |
