aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq_port.c66
1 files changed, 52 insertions, 14 deletions
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 2e3debad..58691613 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -32,17 +32,18 @@ typedef struct nni_posix_pollq {
nni_thr thr; // worker thread
nni_mtx mtx;
nni_cv cv;
+ bool init;
} nni_posix_pollq;
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
+static nni_posix_pollq *nni_port_pqs;
+static int nni_port_npq;
void
nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg)
{
nni_posix_pollq *pq;
- pq = &nni_posix_global_pollq;
+ pq = &nni_port_pqs[fd % nni_port_npq];
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
@@ -135,7 +136,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
}
static void
-nni_posix_poll_thr(void *arg)
+nni_port_thr(void *arg)
{
for (;;) {
nni_posix_pollq *pq = arg;
@@ -193,10 +194,12 @@ nni_posix_poll_thr(void *arg)
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- (void) close(pq->port);
- nni_cv_destroy(&pq->cv);
- nni_mtx_fini(&pq->mtx);
- nni_thr_fini(&pq->thr);
+ if (pq->init) {
+ (void) close(pq->port);
+ nni_cv_destroy(&pq->cv);
+ nni_mtx_fini(&pq->mtx);
+ nni_thr_fini(&pq->thr);
+ }
}
static int
@@ -204,18 +207,21 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, pq->mtx);
+ pq->init = true;
+ pq->port = -1;
+
if ((pq->port = port_create()) < 0) {
return (nni_plat_errno(errno));
}
- if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ if ((rv = nni_thr_init(&pq->thr, nni_port_thr, pq)) != 0) {
nni_posix_pollq_destroy(pq);
return (rv);
}
nni_thr_set_name(&pq->thr, "nng:poll:port");
- nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
@@ -223,14 +229,46 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
- NNI_ARG_UNUSED(params);
- return (nni_posix_pollq_create(&nni_posix_global_pollq));
+ int16_t num_thr;
+ int16_t max_thr;
+
+ max_thr = params->max_poller_threads;
+ num_thr = params->num_poller_threads;
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ params->num_poller_threads = num_thr;
+ if ((nni_port_pqs = NNI_ALLOC_STRUCTS(nni_port_pqs, num_thr)) ==
+ NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_port_npq = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ int rv;
+ if ((rv = nni_port_pq_create(&nni_port_pqs[i])) != 0) {
+ nni_posix_pollq_sysfini();
+ return (rv);
+ }
+ }
+ return (0);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_destroy(&nni_posix_global_pollq);
+ if (nni_port_npq > 0) {
+ for (int i = 0; i < nni_port_npq; i++) {
+ nni_port_pq_destroy(&nni_port_pqs[i]);
+ }
+ NNI_FREE_STRUCTS(nni_port_pqs, nni_port_npq);
+ nni_port_pqs = NULL;
+ nni_port_npq = 0;
+ }
}
#endif // NNG_HAVE_PORT_CREATE