diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 82 |
1 files changed, 59 insertions, 23 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 76a9d3bc..83a1126c 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -46,14 +46,16 @@ typedef struct nni_posix_pollq { nni_posix_pfd **pfds; unsigned nalloc; bool closed; + bool init; } nni_posix_pollq; -static nni_posix_pollq nni_posix_global_pollq; +static nni_posix_pollq *nni_poll_pqs; +static int nni_poll_npq; void nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq = &nni_posix_global_pollq; + nni_posix_pollq *pq = &nni_poll_pqs[fd % nni_poll_npq]; // Set this is as soon as possible (narrow the close-exec race as // much as we can; better options are system calls that suppress @@ -155,7 +157,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) } static void -nni_posix_poll_thr(void *arg) +nni_poll_thr(void *arg) { nni_posix_pollq *pq = arg; struct pollfd *fds = pq->fds; @@ -262,27 +264,29 @@ nni_posix_poll_thr(void *arg) } static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) +nni_poll_pq_destroy(nni_posix_pollq *pq) { - nni_plat_pipe_raise(pq->wakewfd); + if (pq->init) { + nni_plat_pipe_raise(pq->wakewfd); - (void) close(pq->wakewfd); - nni_thr_fini(&pq->thr); - (void) close(pq->wakerfd); - nni_cv_fini(&pq->cv); - nni_mtx_fini(&pq->mtx); - if (pq->fds != NULL) { - NNI_FREE_STRUCTS(pq->fds, pq->nalloc); - pq->fds = NULL; - } - if (pq->pfds != NULL) { - NNI_FREE_STRUCTS(pq->pfds, pq->nalloc); - pq->pfds = NULL; + (void) close(pq->wakewfd); + nni_thr_fini(&pq->thr); + (void) close(pq->wakerfd); + nni_cv_fini(&pq->cv); + nni_mtx_fini(&pq->mtx); + if (pq->fds != NULL) { + NNI_FREE_STRUCTS(pq->fds, pq->nalloc); + pq->fds = NULL; + } + if (pq->pfds != NULL) { + NNI_FREE_STRUCTS(pq->pfds, pq->nalloc); + pq->pfds = NULL; + } } } static int -nni_posix_pollq_create(nni_posix_pollq *pq) +nni_poll_pq_create(nni_posix_pollq *pq) { int rv; @@ -291,7 +295,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq) NNI_LIST_INIT(&pq->addq, nni_posix_pfd, reap); nni_mtx_init(&pq->mtx); nni_cv_init(&pq->cv, &pq->mtx); - + pq->init = true; pq->closed = false; #if NNG_MAX_OPEN pq->nalloc = NNG_MAX_OPEN; @@ -325,7 +329,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq) if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { return (rv); } - if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + if ((rv = nni_thr_init(&pq->thr, nni_poll_thr, pq)) != 0) { nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); return (rv); } @@ -337,12 +341,44 @@ nni_posix_pollq_create(nni_posix_pollq *pq) int nni_posix_pollq_sysinit(nng_init_params *params) { - NNI_ARG_UNUSED(params); - return (nni_posix_pollq_create(&nni_posix_global_pollq)); + int16_t num_thr; + int16_t max_thr; + + max_thr = params->max_poller_threads; + num_thr = params->num_poller_threads; + + if ((max_thr > 0) && (num_thr > max_thr)) { + num_thr = max_thr; + } + if (num_thr < 1) { + num_thr = 1; + } + params->num_poller_threads = num_thr; + if ((nni_poll_pqs = NNI_ALLOC_STRUCTS(nni_poll_pqs, num_thr)) == + NULL) { + return (NNG_ENOMEM); + } + + nni_poll_npq = num_thr; + for (int i = 0; i < num_thr; i++) { + int rv; + if ((rv = nni_poll_pq_create(&nni_poll_pqs[i])) != 0) { + nni_posix_pollq_sysfini(); + return (rv); + } + } + return (0); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_destroy(&nni_posix_global_pollq); + if (nni_poll_npq > 0) { + for (int i = 0; i < nni_poll_npq; i++) { + nni_poll_pq_destroy(&nni_poll_pqs[i]); + } + NNI_FREE_STRUCTS(nni_poll_pqs, nni_poll_npq); + nni_poll_pqs = NULL; + nni_poll_npq = 0; + } } |
