From bbc04b889523c137a1556917571a4ca9ee8a324e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 8 Aug 2020 10:04:17 -0700 Subject: fixes #1275 Test timeouts on FreeBSD This was responsible for hangs in close on FreeBSD. Apparently our use of EVFILT_USER was incorrect, and rather than fix it, we have switched to using a notification pipe for synchronizing closing pipes. In addition to fixing this problem, it should significantly improve things for NetBSD and OpenBSD, which will now be able tbenefit from kqueue(), since we no longer depend on EVFILT_USER. --- src/platform/posix/posix_pollq_epoll.c | 5 +- src/platform/posix/posix_pollq_kqueue.c | 146 +++++++++++++++++--------------- 2 files changed, 78 insertions(+), 73 deletions(-) (limited to 'src/platform') diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 63cdaeb5..d7a2c70f 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -61,8 +61,8 @@ struct nni_posix_pollq { }; struct nni_posix_pfd { + nni_list_node node; nni_posix_pollq *pq; - nni_list_node node; int fd; nni_posix_pfd_cb cb; void * arg; @@ -96,7 +96,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) nni_mtx_init(&pfd->mtx); nni_cv_init(&pfd->cv, &pq->mtx); - nni_mtx_lock(&pfd->mtx); pfd->pq = pq; pfd->fd = fd; pfd->cb = NULL; @@ -106,7 +105,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pfd->closed = false; NNI_LIST_NODE_INIT(&pfd->node); - nni_mtx_unlock(&pfd->mtx); // notifications disabled to begin with ev.events = 0; @@ -115,6 +113,7 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { rv = nni_plat_errno(errno); nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); NNI_FREE_STRUCT(pfd); return (rv); } diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 299479ab..7d827d7d 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -29,22 +29,24 @@ typedef struct nni_posix_pollq nni_posix_pollq; // pollq implementation struct nni_posix_pollq { nni_mtx mtx; - int kq; // kqueue handle - nni_thr thr; // worker thread - nni_list reapq; // items to reap + int wake_wfd; // write side of wake pipe + int wake_rfd; // read side of wake pipe + bool closed; // request for worker to exit + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; struct nni_posix_pfd { nni_list_node node; // linkage into the reap list nni_posix_pollq *pq; // associated pollq int fd; // file descriptor to poll - void * data; // user data + void * arg; // user data nni_posix_pfd_cb cb; // user callback on event - nni_cv cv; // signaled when poller has unregistered - nni_mtx mtx; - unsigned events; - bool closing; bool closed; + unsigned events; + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; }; #define NNI_MAX_KQUEUE_EVENTS 64 @@ -77,22 +79,31 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) return (NNG_ENOMEM); } + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + + pf->pq = pq; + pf->fd = fd; + pf->cb = NULL; + pf->arg = NULL; + pf->events = 0; + pf->closed = false; + + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; // Create entries in the kevent queue, without enabling them. EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); // We update the kqueue list, without polling for events. if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + int rv; + rv = nni_plat_errno(errno); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); NNI_FREE_STRUCT(pf); - return (nni_plat_errno(errno)); + return (rv); } - pf->fd = fd; - pf->cb = NULL; - pf->pq = pq; - nni_mtx_init(&pf->mtx); - nni_cv_init(&pf->cv, &pq->mtx); - NNI_LIST_NODE_INIT(&pf->node); - *pfdp = pf; return (0); } @@ -103,9 +114,9 @@ nni_posix_pfd_close(nni_posix_pfd *pf) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pq->mtx); - if (!pf->closing) { + if (!pf->closed) { struct kevent ev[2]; - pf->closing = true; + pf->closed = true; EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); (void) shutdown(pf->fd, SHUT_RDWR); @@ -128,18 +139,17 @@ nni_posix_pfd_fini(nni_posix_pfd *pf) // unless they are synchronous on user threads. NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - struct kevent ev; nni_mtx_lock(&pq->mtx); - nni_list_append(&pq->reapq, pf); - EV_SET( - &ev, 0, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER, 0, NULL); - - // If this fails, the cleanup will stall. That should - // only occur in a memory pressure situation, and it - // will self-heal when the next event comes in. - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - while (!pf->closed) { - nni_cv_wait(&pf->cv); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_list_append(&pq->reapq, pf); + nni_plat_pipe_raise(pq->wake_wfd); + while (nni_list_active(&pq->reapq, pf)) { + nni_cv_wait(&pf->cv); + } } nni_mtx_unlock(&pq->mtx); @@ -160,8 +170,8 @@ nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { NNI_ASSERT(cb != NULL); // must not be null when established. nni_mtx_lock(&pf->mtx); - pf->cb = cb; - pf->data = arg; + pf->cb = cb; + pf->arg = arg; nni_mtx_unlock(&pf->mtx); } @@ -174,7 +184,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pf->mtx); - if (pf->closing) { + if (pf->closed) { events = 0; } else { pf->events |= events; @@ -209,7 +219,6 @@ nni_posix_pollq_reap(nni_posix_pollq *pq) nni_mtx_lock(&pq->mtx); while ((pf = nni_list_first(&pq->reapq)) != NULL) { nni_list_remove(&pq->reapq, pf); - pf->closed = true; nni_cv_wake(&pf->cv); } nni_mtx_unlock(&pq->mtx); @@ -227,16 +236,15 @@ nni_posix_poll_thr(void *arg) nni_posix_pfd_cb cb; void * cbarg; unsigned revents; - bool reap = false; - n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); - if (n < 0) { - if (errno == EBADF) { - nni_posix_pollq_reap(pq); - return; - } - reap = true; + nni_mtx_lock(&pq->mtx); + if (pq->closed) { + nni_mtx_unlock(&pq->mtx); + nni_posix_pollq_reap(pq); + return; } + nni_mtx_unlock(&pq->mtx); + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); for (int i = 0; i < n; i++) { struct kevent *ev = &evs[i]; @@ -248,9 +256,10 @@ nni_posix_poll_thr(void *arg) case EVFILT_WRITE: revents = POLLOUT; break; - case EVFILT_USER: - default: - reap = true; + } + if (ev->udata == NULL) { + nni_plat_pipe_clear(pq->wake_rfd); + nni_posix_pollq_reap(pq); continue; } pf = (void *) ev->udata; @@ -260,7 +269,7 @@ nni_posix_poll_thr(void *arg) nni_mtx_lock(&pf->mtx); cb = pf->cb; - cbarg = pf->data; + cbarg = pf->arg; pf->events &= ~(revents); nni_mtx_unlock(&pf->mtx); @@ -268,47 +277,33 @@ nni_posix_poll_thr(void *arg) cb(pf, revents, cbarg); } } - if (reap) { - nni_posix_pollq_reap(pq); - } } } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { + nni_mtx_lock(&pq->mtx); + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wake_wfd); + + nni_thr_fini(&pq->thr); + nni_plat_pipe_close(pq->wake_wfd, pq->wake_rfd); + if (pq->kq >= 0) { close(pq->kq); + pq->kq = -1; } - nni_thr_fini(&pq->thr); - pq->kq = -1; - - nni_posix_pollq_reap(pq); - nni_mtx_fini(&pq->mtx); } static int -nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) +nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; struct kevent ev; - EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); - while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { - if (errno == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - return (0); -} - -static int -nni_posix_pollq_create(nni_posix_pollq *pq) -{ - int rv; - if ((pq->kq = kqueue()) < 0) { return (nni_plat_errno(errno)); } @@ -317,7 +312,18 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || - (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { + ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) { + nni_posix_pollq_destroy(pq); + return (rv); + } + + // Register the wake pipe. We use this to synchronize closing + // file descriptors. + EV_SET(&ev, (uintptr_t) pq->wake_rfd, EVFILT_READ, EV_ADD | EV_CLEAR, + 0, 0, NULL); + + if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + rv = nni_plat_errno(rv); nni_posix_pollq_destroy(pq); return (rv); } -- cgit v1.2.3-70-g09d2