aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_pollq_epoll.c5
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c146
2 files changed, 78 insertions, 73 deletions
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 63cdaeb5..d7a2c70f 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -61,8 +61,8 @@ struct nni_posix_pollq {
};
struct nni_posix_pfd {
+ nni_list_node node;
nni_posix_pollq *pq;
- nni_list_node node;
int fd;
nni_posix_pfd_cb cb;
void * arg;
@@ -96,7 +96,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
- nni_mtx_lock(&pfd->mtx);
pfd->pq = pq;
pfd->fd = fd;
pfd->cb = NULL;
@@ -106,7 +105,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->closed = false;
NNI_LIST_NODE_INIT(&pfd->node);
- nni_mtx_unlock(&pfd->mtx);
// notifications disabled to begin with
ev.events = 0;
@@ -115,6 +113,7 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
rv = nni_plat_errno(errno);
nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
NNI_FREE_STRUCT(pfd);
return (rv);
}
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 299479ab..7d827d7d 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -29,22 +29,24 @@ typedef struct nni_posix_pollq nni_posix_pollq;
// pollq implementation
struct nni_posix_pollq {
nni_mtx mtx;
- int kq; // kqueue handle
- nni_thr thr; // worker thread
- nni_list reapq; // items to reap
+ int wake_wfd; // write side of wake pipe
+ int wake_rfd; // read side of wake pipe
+ bool closed; // request for worker to exit
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
struct nni_posix_pfd {
nni_list_node node; // linkage into the reap list
nni_posix_pollq *pq; // associated pollq
int fd; // file descriptor to poll
- void * data; // user data
+ void * arg; // user data
nni_posix_pfd_cb cb; // user callback on event
- nni_cv cv; // signaled when poller has unregistered
- nni_mtx mtx;
- unsigned events;
- bool closing;
bool closed;
+ unsigned events;
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
};
#define NNI_MAX_KQUEUE_EVENTS 64
@@ -77,22 +79,31 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
return (NNG_ENOMEM);
}
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+
+ pf->pq = pq;
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->arg = NULL;
+ pf->events = 0;
+ pf->closed = false;
+
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
// Create entries in the kevent queue, without enabling them.
EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
// We update the kqueue list, without polling for events.
if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ int rv;
+ rv = nni_plat_errno(errno);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
NNI_FREE_STRUCT(pf);
- return (nni_plat_errno(errno));
+ return (rv);
}
- pf->fd = fd;
- pf->cb = NULL;
- pf->pq = pq;
- nni_mtx_init(&pf->mtx);
- nni_cv_init(&pf->cv, &pq->mtx);
- NNI_LIST_NODE_INIT(&pf->node);
- *pfdp = pf;
return (0);
}
@@ -103,9 +114,9 @@ nni_posix_pfd_close(nni_posix_pfd *pf)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pq->mtx);
- if (!pf->closing) {
+ if (!pf->closed) {
struct kevent ev[2];
- pf->closing = true;
+ pf->closed = true;
EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
(void) shutdown(pf->fd, SHUT_RDWR);
@@ -128,18 +139,17 @@ nni_posix_pfd_fini(nni_posix_pfd *pf)
// unless they are synchronous on user threads.
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- struct kevent ev;
nni_mtx_lock(&pq->mtx);
- nni_list_append(&pq->reapq, pf);
- EV_SET(
- &ev, 0, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER, 0, NULL);
-
- // If this fails, the cleanup will stall. That should
- // only occur in a memory pressure situation, and it
- // will self-heal when the next event comes in.
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- while (!pf->closed) {
- nni_cv_wait(&pf->cv);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_list_append(&pq->reapq, pf);
+ nni_plat_pipe_raise(pq->wake_wfd);
+ while (nni_list_active(&pq->reapq, pf)) {
+ nni_cv_wait(&pf->cv);
+ }
}
nni_mtx_unlock(&pq->mtx);
@@ -160,8 +170,8 @@ nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
NNI_ASSERT(cb != NULL); // must not be null when established.
nni_mtx_lock(&pf->mtx);
- pf->cb = cb;
- pf->data = arg;
+ pf->cb = cb;
+ pf->arg = arg;
nni_mtx_unlock(&pf->mtx);
}
@@ -174,7 +184,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pf->mtx);
- if (pf->closing) {
+ if (pf->closed) {
events = 0;
} else {
pf->events |= events;
@@ -209,7 +219,6 @@ nni_posix_pollq_reap(nni_posix_pollq *pq)
nni_mtx_lock(&pq->mtx);
while ((pf = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pf);
- pf->closed = true;
nni_cv_wake(&pf->cv);
}
nni_mtx_unlock(&pq->mtx);
@@ -227,16 +236,15 @@ nni_posix_poll_thr(void *arg)
nni_posix_pfd_cb cb;
void * cbarg;
unsigned revents;
- bool reap = false;
- n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
- if (n < 0) {
- if (errno == EBADF) {
- nni_posix_pollq_reap(pq);
- return;
- }
- reap = true;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closed) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_posix_pollq_reap(pq);
+ return;
}
+ nni_mtx_unlock(&pq->mtx);
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
for (int i = 0; i < n; i++) {
struct kevent *ev = &evs[i];
@@ -248,9 +256,10 @@ nni_posix_poll_thr(void *arg)
case EVFILT_WRITE:
revents = POLLOUT;
break;
- case EVFILT_USER:
- default:
- reap = true;
+ }
+ if (ev->udata == NULL) {
+ nni_plat_pipe_clear(pq->wake_rfd);
+ nni_posix_pollq_reap(pq);
continue;
}
pf = (void *) ev->udata;
@@ -260,7 +269,7 @@ nni_posix_poll_thr(void *arg)
nni_mtx_lock(&pf->mtx);
cb = pf->cb;
- cbarg = pf->data;
+ cbarg = pf->arg;
pf->events &= ~(revents);
nni_mtx_unlock(&pf->mtx);
@@ -268,47 +277,33 @@ nni_posix_poll_thr(void *arg)
cb(pf, revents, cbarg);
}
}
- if (reap) {
- nni_posix_pollq_reap(pq);
- }
}
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
+ nni_mtx_lock(&pq->mtx);
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wake_wfd);
+
+ nni_thr_fini(&pq->thr);
+ nni_plat_pipe_close(pq->wake_wfd, pq->wake_rfd);
+
if (pq->kq >= 0) {
close(pq->kq);
+ pq->kq = -1;
}
- nni_thr_fini(&pq->thr);
- pq->kq = -1;
-
- nni_posix_pollq_reap(pq);
-
nni_mtx_fini(&pq->mtx);
}
static int
-nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
+nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
struct kevent ev;
- EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL);
- while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
- if (errno == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
-{
- int rv;
-
if ((pq->kq = kqueue()) < 0) {
return (nni_plat_errno(errno));
}
@@ -317,7 +312,18 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
- (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
+ ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) {
+ nni_posix_pollq_destroy(pq);
+ return (rv);
+ }
+
+ // Register the wake pipe. We use this to synchronize closing
+ // file descriptors.
+ EV_SET(&ev, (uintptr_t) pq->wake_rfd, EVFILT_READ, EV_ADD | EV_CLEAR,
+ 0, 0, NULL);
+
+ if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ rv = nni_plat_errno(rv);
nni_posix_pollq_destroy(pq);
return (rv);
}