summaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-08-08 10:04:17 -0700
committerGarrett D'Amore <garrett@damore.org>2020-08-08 11:25:30 -0700
commitbbc04b889523c137a1556917571a4ca9ee8a324e (patch)
tree9297bcc64597d9751f27cbf75849f3d2ccaaf9e9 /src/platform
parent47c66c37d1849d49f9e79b14a5463e550c31c9c8 (diff)
downloadnng-bbc04b889523c137a1556917571a4ca9ee8a324e.tar.gz
nng-bbc04b889523c137a1556917571a4ca9ee8a324e.tar.bz2
nng-bbc04b889523c137a1556917571a4ca9ee8a324e.zip
fixes #1275 Test timeouts on FreeBSD
This was responsible for hangs in close on FreeBSD. Apparently our use of EVFILT_USER was incorrect, and rather than fix it, we have switched to using a notification pipe for synchronizing closing pipes. In addition to fixing this problem, it should significantly improve things for NetBSD and OpenBSD, which will now be able tbenefit from kqueue(), since we no longer depend on EVFILT_USER.
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq_epoll.c5
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c146
2 files changed, 78 insertions, 73 deletions
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 63cdaeb5..d7a2c70f 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -61,8 +61,8 @@ struct nni_posix_pollq {
};
struct nni_posix_pfd {
+ nni_list_node node;
nni_posix_pollq *pq;
- nni_list_node node;
int fd;
nni_posix_pfd_cb cb;
void * arg;
@@ -96,7 +96,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
- nni_mtx_lock(&pfd->mtx);
pfd->pq = pq;
pfd->fd = fd;
pfd->cb = NULL;
@@ -106,7 +105,6 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->closed = false;
NNI_LIST_NODE_INIT(&pfd->node);
- nni_mtx_unlock(&pfd->mtx);
// notifications disabled to begin with
ev.events = 0;
@@ -115,6 +113,7 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
rv = nni_plat_errno(errno);
nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
NNI_FREE_STRUCT(pfd);
return (rv);
}
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 299479ab..7d827d7d 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -29,22 +29,24 @@ typedef struct nni_posix_pollq nni_posix_pollq;
// pollq implementation
struct nni_posix_pollq {
nni_mtx mtx;
- int kq; // kqueue handle
- nni_thr thr; // worker thread
- nni_list reapq; // items to reap
+ int wake_wfd; // write side of wake pipe
+ int wake_rfd; // read side of wake pipe
+ bool closed; // request for worker to exit
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
struct nni_posix_pfd {
nni_list_node node; // linkage into the reap list
nni_posix_pollq *pq; // associated pollq
int fd; // file descriptor to poll
- void * data; // user data
+ void * arg; // user data
nni_posix_pfd_cb cb; // user callback on event
- nni_cv cv; // signaled when poller has unregistered
- nni_mtx mtx;
- unsigned events;
- bool closing;
bool closed;
+ unsigned events;
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
};
#define NNI_MAX_KQUEUE_EVENTS 64
@@ -77,22 +79,31 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
return (NNG_ENOMEM);
}
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+
+ pf->pq = pq;
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->arg = NULL;
+ pf->events = 0;
+ pf->closed = false;
+
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
// Create entries in the kevent queue, without enabling them.
EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
// We update the kqueue list, without polling for events.
if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ int rv;
+ rv = nni_plat_errno(errno);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
NNI_FREE_STRUCT(pf);
- return (nni_plat_errno(errno));
+ return (rv);
}
- pf->fd = fd;
- pf->cb = NULL;
- pf->pq = pq;
- nni_mtx_init(&pf->mtx);
- nni_cv_init(&pf->cv, &pq->mtx);
- NNI_LIST_NODE_INIT(&pf->node);
- *pfdp = pf;
return (0);
}
@@ -103,9 +114,9 @@ nni_posix_pfd_close(nni_posix_pfd *pf)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pq->mtx);
- if (!pf->closing) {
+ if (!pf->closed) {
struct kevent ev[2];
- pf->closing = true;
+ pf->closed = true;
EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
(void) shutdown(pf->fd, SHUT_RDWR);
@@ -128,18 +139,17 @@ nni_posix_pfd_fini(nni_posix_pfd *pf)
// unless they are synchronous on user threads.
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- struct kevent ev;
nni_mtx_lock(&pq->mtx);
- nni_list_append(&pq->reapq, pf);
- EV_SET(
- &ev, 0, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER, 0, NULL);
-
- // If this fails, the cleanup will stall. That should
- // only occur in a memory pressure situation, and it
- // will self-heal when the next event comes in.
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- while (!pf->closed) {
- nni_cv_wait(&pf->cv);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_list_append(&pq->reapq, pf);
+ nni_plat_pipe_raise(pq->wake_wfd);
+ while (nni_list_active(&pq->reapq, pf)) {
+ nni_cv_wait(&pf->cv);
+ }
}
nni_mtx_unlock(&pq->mtx);
@@ -160,8 +170,8 @@ nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
NNI_ASSERT(cb != NULL); // must not be null when established.
nni_mtx_lock(&pf->mtx);
- pf->cb = cb;
- pf->data = arg;
+ pf->cb = cb;
+ pf->arg = arg;
nni_mtx_unlock(&pf->mtx);
}
@@ -174,7 +184,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pf->mtx);
- if (pf->closing) {
+ if (pf->closed) {
events = 0;
} else {
pf->events |= events;
@@ -209,7 +219,6 @@ nni_posix_pollq_reap(nni_posix_pollq *pq)
nni_mtx_lock(&pq->mtx);
while ((pf = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pf);
- pf->closed = true;
nni_cv_wake(&pf->cv);
}
nni_mtx_unlock(&pq->mtx);
@@ -227,16 +236,15 @@ nni_posix_poll_thr(void *arg)
nni_posix_pfd_cb cb;
void * cbarg;
unsigned revents;
- bool reap = false;
- n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
- if (n < 0) {
- if (errno == EBADF) {
- nni_posix_pollq_reap(pq);
- return;
- }
- reap = true;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closed) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_posix_pollq_reap(pq);
+ return;
}
+ nni_mtx_unlock(&pq->mtx);
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
for (int i = 0; i < n; i++) {
struct kevent *ev = &evs[i];
@@ -248,9 +256,10 @@ nni_posix_poll_thr(void *arg)
case EVFILT_WRITE:
revents = POLLOUT;
break;
- case EVFILT_USER:
- default:
- reap = true;
+ }
+ if (ev->udata == NULL) {
+ nni_plat_pipe_clear(pq->wake_rfd);
+ nni_posix_pollq_reap(pq);
continue;
}
pf = (void *) ev->udata;
@@ -260,7 +269,7 @@ nni_posix_poll_thr(void *arg)
nni_mtx_lock(&pf->mtx);
cb = pf->cb;
- cbarg = pf->data;
+ cbarg = pf->arg;
pf->events &= ~(revents);
nni_mtx_unlock(&pf->mtx);
@@ -268,47 +277,33 @@ nni_posix_poll_thr(void *arg)
cb(pf, revents, cbarg);
}
}
- if (reap) {
- nni_posix_pollq_reap(pq);
- }
}
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
+ nni_mtx_lock(&pq->mtx);
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ nni_plat_pipe_raise(pq->wake_wfd);
+
+ nni_thr_fini(&pq->thr);
+ nni_plat_pipe_close(pq->wake_wfd, pq->wake_rfd);
+
if (pq->kq >= 0) {
close(pq->kq);
+ pq->kq = -1;
}
- nni_thr_fini(&pq->thr);
- pq->kq = -1;
-
- nni_posix_pollq_reap(pq);
-
nni_mtx_fini(&pq->mtx);
}
static int
-nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
+nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
struct kevent ev;
- EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL);
- while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
- if (errno == EINTR) {
- continue;
- }
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-static int
-nni_posix_pollq_create(nni_posix_pollq *pq)
-{
- int rv;
-
if ((pq->kq = kqueue()) < 0) {
return (nni_plat_errno(errno));
}
@@ -317,7 +312,18 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
- (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
+ ((rv = nni_plat_pipe_open(&pq->wake_wfd, &pq->wake_rfd)) != 0)) {
+ nni_posix_pollq_destroy(pq);
+ return (rv);
+ }
+
+ // Register the wake pipe. We use this to synchronize closing
+ // file descriptors.
+ EV_SET(&ev, (uintptr_t) pq->wake_rfd, EVFILT_READ, EV_ADD | EV_CLEAR,
+ 0, 0, NULL);
+
+ if ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ rv = nni_plat_errno(rv);
nni_posix_pollq_destroy(pq);
return (rv);
}