diff options
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 94 |
1 files changed, 66 insertions, 28 deletions
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 90d10926..49c93627 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -40,19 +40,21 @@ typedef struct nni_posix_pollq { int epfd; // epoll handle int evfd; // event fd (to wake us for other stuff) bool close; // request for worker to exit - nni_thr thr; // worker thread + bool init; + nni_thr thr; // worker thread nni_list reapq; } nni_posix_pollq; // single global instance for now. -static nni_posix_pollq nni_posix_global_pollq; +static nni_posix_pollq *nni_epoll_pqs; +static int nni_epoll_npq; void nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { nni_posix_pollq *pq; - pq = &nni_posix_global_pollq; + pq = &nni_epoll_pqs[fd % nni_epoll_npq]; (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); @@ -191,7 +193,7 @@ nni_posix_pollq_reap(nni_posix_pollq *pq) } static void -nni_posix_poll_thr(void *arg) +nni_epoll_thr(void *arg) { nni_posix_pollq *pq = arg; struct epoll_event events[NNI_MAX_EPOLL_EVENTS]; @@ -245,30 +247,32 @@ nni_posix_poll_thr(void *arg) } static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) +nni_epoll_pq_destroy(nni_posix_pollq *pq) { uint64_t one = 1; - nni_mtx_lock(&pq->mtx); - pq->close = true; + if (pq->init) { + nni_mtx_lock(&pq->mtx); + pq->close = true; - if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) { - // This should never occur, and if it does it could - // lead to a hang. - nni_panic("BUG! unable to write to evfd!"); - } - nni_mtx_unlock(&pq->mtx); + if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) { + // This should never occur, and if it does it could + // lead to a hang. + nni_panic("BUG! unable to write to evfd!"); + } + nni_mtx_unlock(&pq->mtx); - nni_thr_fini(&pq->thr); + nni_thr_fini(&pq->thr); - close(pq->evfd); - close(pq->epfd); + close(pq->evfd); + close(pq->epfd); - nni_mtx_fini(&pq->mtx); + nni_mtx_fini(&pq->mtx); + } } static int -nni_posix_pollq_add_eventfd(nni_posix_pollq *pq) +nni_epoll_pq_add_eventfd(nni_posix_pollq *pq) { // add event fd so we can wake ourself on exit struct epoll_event ev; @@ -295,10 +299,16 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq) } static int -nni_posix_pollq_create(nni_posix_pollq *pq) +nni_epoll_pq_create(nni_posix_pollq *pq) { int rv; + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); + pq->epfd = -1; + pq->init = true; + #if NNG_HAVE_EPOLL_CREATE1 if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) { return (nni_plat_errno(errno)); @@ -314,16 +324,12 @@ nni_posix_pollq_create(nni_posix_pollq *pq) pq->close = false; - NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); - - if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) { + if ((rv = nni_epoll_pq_add_eventfd(pq)) != 0) { (void) close(pq->epfd); nni_mtx_fini(&pq->mtx); return (rv); } - if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + if ((rv = nni_thr_init(&pq->thr, nni_epoll_thr, pq)) != 0) { (void) close(pq->epfd); (void) close(pq->evfd); nni_mtx_fini(&pq->mtx); @@ -337,14 +343,46 @@ nni_posix_pollq_create(nni_posix_pollq *pq) int nni_posix_pollq_sysinit(nng_init_params *params) { - NNI_ARG_UNUSED(params); - return (nni_posix_pollq_create(&nni_posix_global_pollq)); + int16_t num_thr; + int16_t max_thr; + + max_thr = params->max_poller_threads; + num_thr = params->num_poller_threads; + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + params->num_poller_threads = num_thr; + if ((nni_epoll_pqs = NNI_ALLOC_STRUCTS(nni_epoll_pqs, num_thr)) == + NULL) { + return (NNG_ENOMEM); + } + + nni_epoll_npq = num_thr; + for (int i = 0; i < num_thr; i++) { + int rv; + if ((rv = nni_epoll_pq_create(&nni_epoll_pqs[i])) != 0) { + nni_posix_pollq_sysfini(); + return (rv); + } + } + return (0); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_destroy(&nni_posix_global_pollq); + if (nni_epoll_npq > 0) { + for (int i = 0; i < nni_epoll_npq; i++) { + nni_epoll_pq_destroy(&nni_epoll_pqs[i]); + } + NNI_FREE_STRUCTS(nni_epoll_pqs, nni_epoll_npq); + nni_epoll_pqs = NULL; + nni_epoll_npq = 0; + } } #endif // NNG_HAVE_EPOLL |
