diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 99 |
1 files changed, 72 insertions, 27 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index f1f742db..a1479d25 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -30,15 +30,16 @@ typedef struct nni_posix_pollq { int wake_wfd; // write side of wake pipe int wake_rfd; // read side of wake pipe bool closed; // request for worker to exit - int kq; // kqueue handle - nni_thr thr; // worker thread - nni_list reapq; // items to reap + bool init; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap } nni_posix_pollq; #define NNI_MAX_KQUEUE_EVENTS 64 -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; +static nni_posix_pollq *nni_kqueue_pqs; +static int nni_kqueue_npq; void nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) @@ -58,7 +59,8 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); #endif - pq = &nni_posix_global_pollq; + // hopefully FDs are distributed somewhat + pq = &nni_kqueue_pqs[fd % nni_kqueue_npq]; nni_atomic_init(&pf->events); nni_cv_init(&pf->cv, &pq->mtx); @@ -247,40 +249,45 @@ nni_posix_poll_thr(void *arg) } static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) +nni_kqueue_pollq_destroy(nni_posix_pollq *pq) { - nni_mtx_lock(&pq->mtx); - pq->closed = true; - nni_mtx_unlock(&pq->mtx); - nni_plat_pipe_raise(pq->wake_wfd); + if (pq->init) { + nni_mtx_lock(&pq->mtx); + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + nni_plat_pipe_raise(pq->wake_wfd); - (void) close(pq->wake_wfd); - nni_thr_fini(&pq->thr); - (void) close(pq->wake_rfd); + (void) close(pq->wake_wfd); + nni_thr_fini(&pq->thr); + (void) close(pq->wake_rfd); - if (pq->kq >= 0) { - close(pq->kq); - pq->kq = -1; + if (pq->kq >= 0) { + close(pq->kq); + pq->kq = -1; + } + nni_mtx_fini(&pq->mtx); + pq->init = false; } - nni_mtx_fini(&pq->mtx); } static int -nni_posix_pollq_create(nni_posix_pollq *pq) +nni_kqueue_pollq_create(nni_posix_pollq *pq) { int rv; struct kevent ev; + nni_mtx_init(&pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); + pq->kq = -1; + pq->init = true; + if ((pq->kq = kqueue()) < 0) { return (nni_plat_errno(errno)); } - nni_mtx_init(&pq->mtx); - NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); - if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) { - nni_posix_pollq_destroy(pq); + nni_kqueue_pollq_destroy(pq); return (rv); } @@ -291,7 +298,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq) if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { rv = nni_plat_errno(rv); - nni_posix_pollq_destroy(pq); + nni_kqueue_pollq_destroy(pq); return (rv); } @@ -299,17 +306,55 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (0); } +// Our concurrency model is simple. We assign PFDs to one of pollq threads. +// The PFD will be bound to that pollq for its entire lifetime. The way to get +// good balance is to have lots of PFDs. If you don't have lots of PFDs, you +// do not need this scalability anyway. + int nni_posix_pollq_sysinit(nng_init_params *params) { - NNI_ARG_UNUSED(params); - return (nni_posix_pollq_create(&nni_posix_global_pollq)); + int16_t num_thr; + int16_t max_thr; + + max_thr = params->max_poller_threads; + num_thr = params->num_poller_threads; + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + params->num_poller_threads = num_thr; + if ((nni_kqueue_pqs = NNI_ALLOC_STRUCTS(nni_kqueue_pqs, num_thr)) == + NULL) { + return (NNG_ENOMEM); + } + + nni_kqueue_npq = num_thr; + for (int i = 0; i < num_thr; i++) { + int rv; + if ((rv = nni_kqueue_pollq_create(&nni_kqueue_pqs[i])) != 0) { + nni_posix_pollq_sysfini(); + return (rv); + } + } + + return (0); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_destroy(&nni_posix_global_pollq); + if (nni_kqueue_npq > 0) { + for (int i = 0; i < nni_kqueue_npq; i++) { + nni_kqueue_pollq_destroy(&nni_kqueue_pqs[i]); + } + NNI_FREE_STRUCTS(nni_kqueue_pqs, nni_kqueue_npq); + nni_kqueue_pqs = NULL; + nni_kqueue_npq = 0; + } } #endif // NNG_HAVE_KQUEUE |
