diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-08-08 10:04:17 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-08-08 11:25:30 -0700 |
| commit | bbc04b889523c137a1556917571a4ca9ee8a324e (patch) | |
| tree | 9297bcc64597d9751f27cbf75849f3d2ccaaf9e9 /src/platform/posix/posix_pollq_kqueue.c | |
| parent | 47c66c37d1849d49f9e79b14a5463e550c31c9c8 (diff) | |
| download | nng-bbc04b889523c137a1556917571a4ca9ee8a324e.tar.gz nng-bbc04b889523c137a1556917571a4ca9ee8a324e.tar.bz2 nng-bbc04b889523c137a1556917571a4ca9ee8a324e.zip | |
fixes #1275 Test timeouts on FreeBSD
This was responsible for hangs in close on FreeBSD. Apparently
our use of EVFILT_USER was incorrect, and rather than fix it, we
have switched to using a notification pipe for synchronizing
closing pipes. In addition to fixing this problem, it should
significantly improve things for NetBSD and OpenBSD, which will
now be able tbenefit from kqueue(), since we no longer depend on
EVFILT_USER.
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 146 |
1 files changed, 76 insertions, 70 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 299479ab..7d827d7d 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -29,22 +29,24 @@ typedef struct nni_posix_pollq nni_posix_pollq; // pollq implementation struct nni_posix_pollq { nni_mtx mtx; - int kq; // kqueue handle - nni_thr thr; // worker thread - nni_list reapq; // items to reap + int wake_wfd; // write side of wake pipe + int wake_rfd; // read side of wake pipe + bool closed; // request for worker to exit + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; struct nni_posix_pfd { nni_list_node node; // linkage into the reap list nni_posix_pollq *pq; // associated pollq int fd; // file descriptor to poll - void * data; // user data + void * arg; // user data nni_posix_pfd_cb cb; // user callback on event - nni_cv cv; // signaled when poller has unregistered - nni_mtx mtx; - unsigned events; - bool closing; bool closed; + unsigned events; + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; }; #define NNI_MAX_KQUEUE_EVENTS 64 @@ -77,22 +79,31 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) return (NNG_ENOMEM); } + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + + pf->pq = pq; + pf->fd = fd; + pf->cb = NULL; + pf->arg = NULL; + pf->events = 0; + pf->closed = false; + + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; // Create entries in the kevent queue, without enabling them. EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); // We update the kqueue list, without polling for events. if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + int rv; + rv = nni_plat_errno(errno); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); NNI_FREE_STRUCT(pf); - return (nni_plat_errno(errno)); + return (rv); } - pf->fd = fd; - pf->cb = NULL; - pf->pq = pq; - nni_mtx_init(&pf->mtx); - nni_cv_init(&pf->cv, &pq->mtx); - NNI_LIST_NODE_INIT(&pf->node); - *pfdp = pf; return (0); } @@ -103,9 +114,9 @@ nni_posix_pfd_close(nni_posix_pfd *pf) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pq->mtx); - if (!pf->closing) { + if (!pf->closed) { struct kevent ev[2]; - pf->closing = true; + pf->closed = true; EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); (void) shutdown(pf->fd, SHUT_RDWR); @@ -128,18 +139,17 @@ nni_posix_pfd_fini(nni_posix_pfd *pf) // unless they are synchronous on user threads. NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - struct kevent ev; nni_mtx_lock(&pq->mtx); - nni_list_append(&pq->reapq, pf); - EV_SET( - &ev, 0, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER, 0, NULL); - - // If this fails, the cleanup will stall. That should - // only occur in a memory pressure situation, and it - // will self-heal when the next event comes in. - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - while (!pf->closed) { - nni_cv_wait(&pf->cv); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_list_append(&pq->reapq, pf); + nni_plat_pipe_raise(pq->wake_wfd); + while (nni_list_active(&pq->reapq, pf)) { + nni_cv_wait(&pf->cv); + } } nni_mtx_unlock(&pq->mtx); @@ -160,8 +170,8 @@ nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { NNI_ASSERT(cb != NULL); // must not be null when established. nni_mtx_lock(&pf->mtx); - pf->cb = cb; - pf->data = arg; + pf->cb = cb; + pf->arg = arg; nni_mtx_unlock(&pf->mtx); } @@ -174,7 +184,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pf->mtx); - if (pf->closing) { + if (pf->closed) { events = 0; } else { pf->events |= events; @@ -209,7 +219,6 @@ nni_posix_pollq_reap(nni_posix_pollq *pq) nni_mtx_lock(&pq->mtx); while ((pf = nni_list_first(&pq->reapq)) != NULL) { nni_list_remove(&pq->reapq, pf); - pf->closed = true; nni_cv_wake(&pf->cv); } nni_mtx_unlock(&pq->mtx); @@ -227,16 +236,15 @@ nni_posix_poll_thr(void *arg) nni_posix_pfd_cb cb; void * cbarg; unsigned revents; - bool reap = false; - n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); - if (n < 0) { - if (errno == EBADF) { - nni_posix_pollq_reap(pq); - return; - } - reap = true; + nni_mtx_lock(&pq->mtx); + if (pq->closed) { + nni_mtx_unlock(&pq->mtx); + nni_posix_pollq_reap(pq); + return; } + nni_mtx_unlock(&pq->mtx); + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); for (int i = 0; i < n; i++) { struct kevent *ev = &evs[i]; @@ -248,9 +256,10 @@ nni_posix_poll_thr(void *arg) case EVFILT_WRITE: revents = POLLOUT; break; - case EVFILT_USER: - default: - reap = true; + } + if (ev->udata == NULL) { + nni_plat_pipe_clear(pq->wake_rfd); + nni_posix_pollq_reap(pq); continue; } pf = (void *) ev->udata; @@ -260,7 +269,7 @@ nni_posix_poll_thr(void *arg) nni_mtx_lock(&pf->mtx); cb = pf->cb; - cbarg = pf->data; + cbarg = pf->arg; pf->events &= ~(revents); nni_mtx_unlock(&pf->mtx); @@ -268,47 +277,33 @@ nni_posix_poll_thr(void *arg) cb(pf, revents, cbarg); } } - if (reap) { - nni_posix_pollq_reap(pq); - } } } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { + nni_mtx_lock(&pq->mtx); + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wake_wfd); + + nni_thr_fini(&pq->thr); + nni_plat_pipe_close(pq->wake_wfd, pq->wake_rfd); + if (pq->kq >= 0) { close(pq->kq); + pq->kq = -1; } - nni_thr_fini(&pq->thr); - pq->kq = -1; - - nni_posix_pollq_reap(pq); - nni_mtx_fini(&pq->mtx); } static int -nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) +nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; struct kevent ev; - EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); - while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { - if (errno == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - return (0); -} - -static int -nni_posix_pollq_create(nni_posix_pollq *pq) -{ - int rv; - if ((pq->kq = kqueue()) < 0) { return (nni_plat_errno(errno)); } @@ -317,7 +312,18 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || - (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { + ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) { + nni_posix_pollq_destroy(pq); + return (rv); + } + + // Register the wake pipe. We use this to synchronize closing + // file descriptors. + EV_SET(&ev, (uintptr_t) pq->wake_rfd, EVFILT_READ, EV_ADD | EV_CLEAR, + 0, 0, NULL); + + if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + rv = nni_plat_errno(rv); nni_posix_pollq_destroy(pq); return (rv); } |
