aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_pollq_poll.c82
1 files changed, 59 insertions, 23 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 76a9d3bc..83a1126c 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -46,14 +46,16 @@ typedef struct nni_posix_pollq {
nni_posix_pfd **pfds;
unsigned nalloc;
bool closed;
+ bool init;
} nni_posix_pollq;
-static nni_posix_pollq nni_posix_global_pollq;
+static nni_posix_pollq *nni_poll_pqs;
+static int nni_poll_npq;
void
nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq = &nni_posix_global_pollq;
+ nni_posix_pollq *pq = &nni_poll_pqs[fd % nni_poll_npq];
// Set this is as soon as possible (narrow the close-exec race as
// much as we can; better options are system calls that suppress
@@ -155,7 +157,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
}
static void
-nni_posix_poll_thr(void *arg)
+nni_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
struct pollfd *fds = pq->fds;
@@ -262,27 +264,29 @@ nni_posix_poll_thr(void *arg)
}
static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
+nni_poll_pq_destroy(nni_posix_pollq *pq)
{
- nni_plat_pipe_raise(pq->wakewfd);
+ if (pq->init) {
+ nni_plat_pipe_raise(pq->wakewfd);
- (void) close(pq->wakewfd);
- nni_thr_fini(&pq->thr);
- (void) close(pq->wakerfd);
- nni_cv_fini(&pq->cv);
- nni_mtx_fini(&pq->mtx);
- if (pq->fds != NULL) {
- NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
- pq->fds = NULL;
- }
- if (pq->pfds != NULL) {
- NNI_FREE_STRUCTS(pq->pfds, pq->nalloc);
- pq->pfds = NULL;
+ (void) close(pq->wakewfd);
+ nni_thr_fini(&pq->thr);
+ (void) close(pq->wakerfd);
+ nni_cv_fini(&pq->cv);
+ nni_mtx_fini(&pq->mtx);
+ if (pq->fds != NULL) {
+ NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
+ pq->fds = NULL;
+ }
+ if (pq->pfds != NULL) {
+ NNI_FREE_STRUCTS(pq->pfds, pq->nalloc);
+ pq->pfds = NULL;
+ }
}
}
static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
+nni_poll_pq_create(nni_posix_pollq *pq)
{
int rv;
@@ -291,7 +295,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->addq, nni_posix_pfd, reap);
nni_mtx_init(&pq->mtx);
nni_cv_init(&pq->cv, &pq->mtx);
-
+ pq->init = true;
pq->closed = false;
#if NNG_MAX_OPEN
pq->nalloc = NNG_MAX_OPEN;
@@ -325,7 +329,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);
}
- if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ if ((rv = nni_thr_init(&pq->thr, nni_poll_thr, pq)) != 0) {
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
return (rv);
}
@@ -337,12 +341,44 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
- NNI_ARG_UNUSED(params);
- return (nni_posix_pollq_create(&nni_posix_global_pollq));
+ int16_t num_thr;
+ int16_t max_thr;
+
+ max_thr = params->max_poller_threads;
+ num_thr = params->num_poller_threads;
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ params->num_poller_threads = num_thr;
+ if ((nni_poll_pqs = NNI_ALLOC_STRUCTS(nni_poll_pqs, num_thr)) ==
+ NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_poll_npq = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ int rv;
+ if ((rv = nni_poll_pq_create(&nni_poll_pqs[i])) != 0) {
+ nni_posix_pollq_sysfini();
+ return (rv);
+ }
+ }
+ return (0);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_destroy(&nni_posix_global_pollq);
+ if (nni_poll_npq > 0) {
+ for (int i = 0; i < nni_poll_npq; i++) {
+ nni_poll_pq_destroy(&nni_poll_pqs[i]);
+ }
+ NNI_FREE_STRUCTS(nni_poll_pqs, nni_poll_npq);
+ nni_poll_pqs = NULL;
+ nni_poll_npq = 0;
+ }
}