diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-19 23:50:13 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-20 08:12:42 -0800 |
| commit | 20e625137a38232c87871661c684953ef2cfc5f8 (patch) | |
| tree | af75f9c134ea89160a1ccbe80d17b0f3582c8239 /src/platform/posix/posix_pollq_kqueue.c | |
| parent | 60f63557d87528497fe1392fa6a676b2a51efb16 (diff) | |
| download | nng-20e625137a38232c87871661c684953ef2cfc5f8.tar.gz nng-20e625137a38232c87871661c684953ef2cfc5f8.tar.bz2 nng-20e625137a38232c87871661c684953ef2cfc5f8.zip | |
posix pollers: inline the pfd and make callbacks constant
This change moves the posix pollers to inline the PFD and makes
the callbacks constant, so that we can dispense with tests, failures,
and locks. It is anticipated that this will reduce lock based
pressure on the bus and increase performance modestly.
Diffstat (limited to 'src/platform/posix/posix_pollq_kqueue.c')
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 128 |
1 files changed, 58 insertions, 70 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index e3727ed3..c754535a 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -40,10 +40,9 @@ typedef struct nni_posix_pollq { // single global instance for now static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pfd *pf; nni_posix_pollq *pq; struct kevent ev[2]; unsigned flags = EV_ADD | EV_DISABLE | EV_CLEAR; @@ -61,88 +60,97 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pq = &nni_posix_global_pollq; - if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&pf->mtx); nni_cv_init(&pf->cv, &pq->mtx); pf->pq = pq; pf->fd = fd; - pf->cb = NULL; - pf->arg = NULL; + pf->cb = cb; + pf->arg = arg; pf->events = 0; - pf->closed = false; + + nni_atomic_flag_reset(&pf->closed); + nni_atomic_flag_reset(&pf->stopped); NNI_LIST_NODE_INIT(&pf->node); - *pfdp = pf; // Create entries in the kevent queue, without enabling them. EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - // We update the kqueue list, without polling for events. - if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { - int rv; - rv = nni_plat_errno(errno); - nni_cv_fini(&pf->cv); - nni_mtx_fini(&pf->mtx); - NNI_FREE_STRUCT(pf); - return (rv); - } - - return (0); + // This may fail, but if it does, we get another try with + // ARM. It's an attempt to preallocate anyway. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); } void nni_posix_pfd_close(nni_posix_pfd *pf) { nni_posix_pollq *pq = pf->pq; + struct kevent ev[2]; + if (pq == NULL) { + return; + } - nni_mtx_lock(&pq->mtx); - if (!pf->closed) { - struct kevent ev[2]; - pf->closed = true; - EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); - EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); - (void) shutdown(pf->fd, SHUT_RDWR); - // This should never fail -- no allocations, just deletion. - (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + if (nni_atomic_flag_test_and_set(&pf->closed)) { + return; } + + nni_mtx_lock(&pq->mtx); + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); nni_mtx_unlock(&pq->mtx); } void -nni_posix_pfd_fini(nni_posix_pfd *pf) +nni_posix_pfd_stop(nni_posix_pfd *pf) { - nni_posix_pollq *pq; - - pq = pf->pq; + nni_posix_pollq *pq = pf->pq; - nni_posix_pfd_close(pf); + if (pq == NULL) { + return; + } // All consumers take care to move finalization to the reap thread, // unless they are synchronous on user threads. NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - nni_mtx_lock(&pq->mtx); + if (nni_atomic_flag_test_and_set(&pf->stopped)) { + return; + } - // If we're running on the callback, then don't bother to kick - // the pollq again. This is necessary because we cannot modify - // the poller while it is polling. - if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_posix_pfd_close(pf); + nni_mtx_lock(&pq->mtx); + if (!pq->closed) { nni_list_append(&pq->reapq, pf); nni_plat_pipe_raise(pq->wake_wfd); - while (nni_list_active(&pq->reapq, pf)) { + while (nni_list_node_active(&pf->node)) { nni_cv_wait(&pf->cv); } } nni_mtx_unlock(&pq->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pf) +{ + nni_posix_pollq *pq = pf->pq; + + if (pq == NULL) { + return; + } + + // All consumers take care to move finalization to the reap thread, + // unless they are synchronous on user threads. + NNI_ASSERT(!nni_thr_is_self(&pq->thr)); + + nni_posix_pfd_stop(pf); (void) close(pf->fd); nni_cv_fini(&pf->cv); nni_mtx_fini(&pf->mtx); - NNI_FREE_STRUCT(pf); } int @@ -151,16 +159,6 @@ nni_posix_pfd_fd(nni_posix_pfd *pf) return (pf->fd); } -void -nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) -{ - NNI_ASSERT(cb != NULL); // must not be null when established. - nni_mtx_lock(&pf->mtx); - pf->cb = cb; - pf->arg = arg; - nni_mtx_unlock(&pf->mtx); -} - int nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) { @@ -170,12 +168,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pf->mtx); - if (pf->closed) { - events = 0; - } else { - pf->events |= events; - events = pf->events; - } + pf->events |= events; + events = pf->events; nni_mtx_unlock(&pf->mtx); if (events == 0) { @@ -218,12 +212,10 @@ nni_posix_poll_thr(void *arg) nni_thr_set_name(NULL, "nng:poll:kqueue"); for (;;) { - int n; - struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; - nni_posix_pfd *pf; - nni_posix_pfd_cb cb; - void *cbarg; - unsigned revents; + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd *pf; + unsigned revents; n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); @@ -253,14 +245,10 @@ nni_posix_poll_thr(void *arg) } nni_mtx_lock(&pf->mtx); - cb = pf->cb; - cbarg = pf->arg; pf->events &= ~(revents); nni_mtx_unlock(&pf->mtx); - if (cb != NULL) { - cb(pf, revents, cbarg); - } + pf->cb(pf->arg, revents); } } } |
