aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c99
1 files changed, 72 insertions, 27 deletions
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index f1f742db..a1479d25 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -30,15 +30,16 @@ typedef struct nni_posix_pollq {
int wake_wfd; // write side of wake pipe
int wake_rfd; // read side of wake pipe
bool closed; // request for worker to exit
- int kq; // kqueue handle
- nni_thr thr; // worker thread
- nni_list reapq; // items to reap
+ bool init;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
} nni_posix_pollq;
#define NNI_MAX_KQUEUE_EVENTS 64
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
+static nni_posix_pollq *nni_kqueue_pqs;
+static int nni_kqueue_npq;
void
nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
@@ -58,7 +59,8 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
(void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
#endif
- pq = &nni_posix_global_pollq;
+ // hopefully FDs are distributed somewhat
+ pq = &nni_kqueue_pqs[fd % nni_kqueue_npq];
nni_atomic_init(&pf->events);
nni_cv_init(&pf->cv, &pq->mtx);
@@ -247,40 +249,45 @@ nni_posix_poll_thr(void *arg)
}
static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
+nni_kqueue_pollq_destroy(nni_posix_pollq *pq)
{
- nni_mtx_lock(&pq->mtx);
- pq->closed = true;
- nni_mtx_unlock(&pq->mtx);
- nni_plat_pipe_raise(pq->wake_wfd);
+ if (pq->init) {
+ nni_mtx_lock(&pq->mtx);
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wake_wfd);
- (void) close(pq->wake_wfd);
- nni_thr_fini(&pq->thr);
- (void) close(pq->wake_rfd);
+ (void) close(pq->wake_wfd);
+ nni_thr_fini(&pq->thr);
+ (void) close(pq->wake_rfd);
- if (pq->kq >= 0) {
- close(pq->kq);
- pq->kq = -1;
+ if (pq->kq >= 0) {
+ close(pq->kq);
+ pq->kq = -1;
+ }
+ nni_mtx_fini(&pq->mtx);
+ pq->init = false;
}
- nni_mtx_fini(&pq->mtx);
}
static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
+nni_kqueue_pollq_create(nni_posix_pollq *pq)
{
int rv;
struct kevent ev;
+ nni_mtx_init(&pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ pq->kq = -1;
+ pq->init = true;
+
if ((pq->kq = kqueue()) < 0) {
return (nni_plat_errno(errno));
}
- nni_mtx_init(&pq->mtx);
- NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
-
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ nni_kqueue_pollq_destroy(pq);
return (rv);
}
@@ -291,7 +298,7 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
rv = nni_plat_errno(rv);
- nni_posix_pollq_destroy(pq);
+ nni_kqueue_pollq_destroy(pq);
return (rv);
}
@@ -299,17 +306,55 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
+// Our concurrency model is simple. We assign PFDs to one of pollq threads.
+// The PFD will be bound to that pollq for its entire lifetime. The way to get
+// good balance is to have lots of PFDs. If you don't have lots of PFDs, you
+// do not need this scalability anyway.
+
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
- NNI_ARG_UNUSED(params);
- return (nni_posix_pollq_create(&nni_posix_global_pollq));
+ int16_t num_thr;
+ int16_t max_thr;
+
+ max_thr = params->max_poller_threads;
+ num_thr = params->num_poller_threads;
+
+ if ((max_thr > 0) && (num_thr > max_thr)) {
+ num_thr = max_thr;
+ }
+ if (num_thr < 1) {
+ num_thr = 1;
+ }
+ params->num_poller_threads = num_thr;
+ if ((nni_kqueue_pqs = NNI_ALLOC_STRUCTS(nni_kqueue_pqs, num_thr)) ==
+ NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_kqueue_npq = num_thr;
+ for (int i = 0; i < num_thr; i++) {
+ int rv;
+ if ((rv = nni_kqueue_pollq_create(&nni_kqueue_pqs[i])) != 0) {
+ nni_posix_pollq_sysfini();
+ return (rv);
+ }
+ }
+
+ return (0);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_destroy(&nni_posix_global_pollq);
+ if (nni_kqueue_npq > 0) {
+ for (int i = 0; i < nni_kqueue_npq; i++) {
+ nni_kqueue_pollq_destroy(&nni_kqueue_pqs[i]);
+ }
+ NNI_FREE_STRUCTS(nni_kqueue_pqs, nni_kqueue_npq);
+ nni_kqueue_pqs = NULL;
+ nni_kqueue_npq = 0;
+ }
}
#endif // NNG_HAVE_KQUEUE