diff options
| -rw-r--r-- | CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 146 |
3 files changed, 79 insertions, 76 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 46df35d9..0aecbd8e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -333,9 +333,7 @@ else () nng_check_sym(backtrace_symbols_fd execinfo.h NNG_HAVE_BACKTRACE) nng_check_struct_member(msghdr msg_control sys/socket.h NNG_HAVE_MSG_CONTROL) nng_check_sym(eventfd sys/eventfd.h NNG_HAVE_EVENTFD) - # While some systems (NetBSD) have kqueue, our use depends on EVFILT_USER - # This means that NetBSD and OpenBSD will be stuck with poll(). - nng_check_sym(EVFILT_USER sys/event.h NNG_HAVE_KQUEUE) + nng_check_sym(kqueue sys/event.h NNG_HAVE_KQUEUE) nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE) nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL) nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1) diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 63cdaeb5..d7a2c70f 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -61,8 +61,8 @@ struct nni_posix_pollq { }; struct nni_posix_pfd { + nni_list_node node; nni_posix_pollq *pq; - nni_list_node node; int fd; nni_posix_pfd_cb cb; void * arg; @@ -96,7 +96,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) nni_mtx_init(&pfd->mtx); nni_cv_init(&pfd->cv, &pq->mtx); - nni_mtx_lock(&pfd->mtx); pfd->pq = pq; pfd->fd = fd; pfd->cb = NULL; @@ -106,7 +105,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pfd->closed = false; NNI_LIST_NODE_INIT(&pfd->node); - nni_mtx_unlock(&pfd->mtx); // notifications disabled to begin with ev.events = 0; @@ -115,6 +113,7 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { rv = nni_plat_errno(errno); nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); NNI_FREE_STRUCT(pfd); return (rv); } diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 299479ab..7d827d7d 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -29,22 +29,24 @@ typedef struct nni_posix_pollq nni_posix_pollq; // pollq implementation struct nni_posix_pollq { nni_mtx mtx; - int kq; // kqueue handle - nni_thr thr; // worker thread - nni_list reapq; // items to reap + int wake_wfd; // write side of wake pipe + int wake_rfd; // read side of wake pipe + bool closed; // request for worker to exit + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; struct nni_posix_pfd { nni_list_node node; // linkage into the reap list nni_posix_pollq *pq; // associated pollq int fd; // file descriptor to poll - void * data; // user data + void * arg; // user data nni_posix_pfd_cb cb; // user callback on event - nni_cv cv; // signaled when poller has unregistered - nni_mtx mtx; - unsigned events; - bool closing; bool closed; + unsigned events; + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; }; #define NNI_MAX_KQUEUE_EVENTS 64 @@ -77,22 +79,31 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) return (NNG_ENOMEM); } + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + + pf->pq = pq; + pf->fd = fd; + pf->cb = NULL; + pf->arg = NULL; + pf->events = 0; + pf->closed = false; + + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; // Create entries in the kevent queue, without enabling them. EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); // We update the kqueue list, without polling for events. if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + int rv; + rv = nni_plat_errno(errno); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); NNI_FREE_STRUCT(pf); - return (nni_plat_errno(errno)); + return (rv); } - pf->fd = fd; - pf->cb = NULL; - pf->pq = pq; - nni_mtx_init(&pf->mtx); - nni_cv_init(&pf->cv, &pq->mtx); - NNI_LIST_NODE_INIT(&pf->node); - *pfdp = pf; return (0); } @@ -103,9 +114,9 @@ nni_posix_pfd_close(nni_posix_pfd *pf) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pq->mtx); - if (!pf->closing) { + if (!pf->closed) { struct kevent ev[2]; - pf->closing = true; + pf->closed = true; EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); (void) shutdown(pf->fd, SHUT_RDWR); @@ -128,18 +139,17 @@ nni_posix_pfd_fini(nni_posix_pfd *pf) // unless they are synchronous on user threads. NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - struct kevent ev; nni_mtx_lock(&pq->mtx); - nni_list_append(&pq->reapq, pf); - EV_SET( - &ev, 0, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER, 0, NULL); - - // If this fails, the cleanup will stall. That should - // only occur in a memory pressure situation, and it - // will self-heal when the next event comes in. - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - while (!pf->closed) { - nni_cv_wait(&pf->cv); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_list_append(&pq->reapq, pf); + nni_plat_pipe_raise(pq->wake_wfd); + while (nni_list_active(&pq->reapq, pf)) { + nni_cv_wait(&pf->cv); + } } nni_mtx_unlock(&pq->mtx); @@ -160,8 +170,8 @@ nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { NNI_ASSERT(cb != NULL); // must not be null when established. nni_mtx_lock(&pf->mtx); - pf->cb = cb; - pf->data = arg; + pf->cb = cb; + pf->arg = arg; nni_mtx_unlock(&pf->mtx); } @@ -174,7 +184,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pf->mtx); - if (pf->closing) { + if (pf->closed) { events = 0; } else { pf->events |= events; @@ -209,7 +219,6 @@ nni_posix_pollq_reap(nni_posix_pollq *pq) nni_mtx_lock(&pq->mtx); while ((pf = nni_list_first(&pq->reapq)) != NULL) { nni_list_remove(&pq->reapq, pf); - pf->closed = true; nni_cv_wake(&pf->cv); } nni_mtx_unlock(&pq->mtx); @@ -227,16 +236,15 @@ nni_posix_poll_thr(void *arg) nni_posix_pfd_cb cb; void * cbarg; unsigned revents; - bool reap = false; - n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); - if (n < 0) { - if (errno == EBADF) { - nni_posix_pollq_reap(pq); - return; - } - reap = true; + nni_mtx_lock(&pq->mtx); + if (pq->closed) { + nni_mtx_unlock(&pq->mtx); + nni_posix_pollq_reap(pq); + return; } + nni_mtx_unlock(&pq->mtx); + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); for (int i = 0; i < n; i++) { struct kevent *ev = &evs[i]; @@ -248,9 +256,10 @@ nni_posix_poll_thr(void *arg) case EVFILT_WRITE: revents = POLLOUT; break; - case EVFILT_USER: - default: - reap = true; + } + if (ev->udata == NULL) { + nni_plat_pipe_clear(pq->wake_rfd); + nni_posix_pollq_reap(pq); continue; } pf = (void *) ev->udata; @@ -260,7 +269,7 @@ nni_posix_poll_thr(void *arg) nni_mtx_lock(&pf->mtx); cb = pf->cb; - cbarg = pf->data; + cbarg = pf->arg; pf->events &= ~(revents); nni_mtx_unlock(&pf->mtx); @@ -268,47 +277,33 @@ nni_posix_poll_thr(void *arg) cb(pf, revents, cbarg); } } - if (reap) { - nni_posix_pollq_reap(pq); - } } } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { + nni_mtx_lock(&pq->mtx); + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wake_wfd); + + nni_thr_fini(&pq->thr); + nni_plat_pipe_close(pq->wake_wfd, pq->wake_rfd); + if (pq->kq >= 0) { close(pq->kq); + pq->kq = -1; } - nni_thr_fini(&pq->thr); - pq->kq = -1; - - nni_posix_pollq_reap(pq); - nni_mtx_fini(&pq->mtx); } static int -nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) +nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; struct kevent ev; - EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); - while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { - if (errno == EINTR) { - continue; - } - return (nni_plat_errno(errno)); - } - return (0); -} - -static int -nni_posix_pollq_create(nni_posix_pollq *pq) -{ - int rv; - if ((pq->kq = kqueue()) < 0) { return (nni_plat_errno(errno)); } @@ -317,7 +312,18 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || - (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { + ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) { + nni_posix_pollq_destroy(pq); + return (rv); + } + + // Register the wake pipe. We use this to synchronize closing + // file descriptors. + EV_SET(&ev, (uintptr_t) pq->wake_rfd, EVFILT_READ, EV_ADD | EV_CLEAR, + 0, 0, NULL); + + if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + rv = nni_plat_errno(rv); nni_posix_pollq_destroy(pq); return (rv); } |
