aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_aio.h3
-rw-r--r--src/platform/posix/posix_epdesc.c476
-rw-r--r--src/platform/posix/posix_pipedesc.c212
-rw-r--r--src/platform/posix/posix_pollq.h38
-rw-r--r--src/platform/posix/posix_pollq_epoll.c445
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c432
-rw-r--r--src/platform/posix/posix_pollq_poll.c532
-rw-r--r--src/platform/posix/posix_pollq_port.c288
-rw-r--r--src/platform/posix/posix_resolv_gai.c136
-rw-r--r--src/platform/posix/posix_thread.c6
-rw-r--r--src/platform/posix/posix_udp.c87
-rw-r--r--src/platform/windows/win_impl.h1
-rw-r--r--src/platform/windows/win_iocp.c7
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_resolv.c138
-rw-r--r--src/platform/windows/win_thread.c7
16 files changed, 1425 insertions, 1390 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 15d91db2..08ae99ca 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,11 +18,12 @@
// one of several possible different backends.
#include "core/nng_impl.h"
+#include "posix_pollq.h"
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
-extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
+extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, nni_posix_pfd *);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 7431dedf..0065806d 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -39,7 +39,7 @@
#endif
struct nni_posix_epdesc {
- nni_posix_pollq_node node;
+ nni_posix_pfd * pfd;
nni_list connectq;
nni_list acceptq;
bool closed;
@@ -53,10 +53,14 @@ struct nni_posix_epdesc {
nni_mtx mtx;
};
+static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
+static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
+
static void
-nni_posix_epdesc_cancel(nni_aio *aio, int rv)
+nni_epdesc_cancel(nni_aio *aio, int rv)
{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_pfd * pfd = NULL;
NNI_ASSERT(rv != 0);
nni_mtx_lock(&ed->mtx);
@@ -64,200 +68,136 @@ nni_posix_epdesc_cancel(nni_aio *aio, int rv)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
+ if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
+ ((pfd = ed->pfd) != NULL)) {
+ nni_posix_pfd_close(pfd);
+ }
nni_mtx_unlock(&ed->mtx);
}
static void
-nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
+nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
{
nni_posix_pipedesc *pd = NULL;
// acceptq or connectq.
nni_aio_list_remove(aio);
- if (rv == 0) {
- if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) {
- (void) close(newfd);
- }
- }
if (rv != 0) {
+ NNI_ASSERT(newpfd == NULL);
nni_aio_finish_error(aio, rv);
- } else {
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
+ return;
}
-}
-
-static void
-nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- socklen_t sz;
- int rv;
-
- // Note that normally there will only be a single connect AIO...
- // A socket that is here will have *initiated* with a connect()
- // call, which returned EINPROGRESS. When the connection attempt
- // is done, either way, the descriptor will be noted as writable.
- // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual
- // status of the connection attempt...
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- rv = -1;
- sz = sizeof(rv);
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) <
- 0) {
- rv = errno;
- }
- switch (rv) {
- case 0:
- // Success!
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, 0, ed->node.fd);
- ed->node.fd = -1;
- continue;
-
- case EINPROGRESS:
- // Still in progress... keep trying
- return;
- default:
- if (rv == ENOENT) {
- rv = ECONNREFUSED;
- }
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
- (void) close(ed->node.fd);
- ed->node.fd = -1;
- continue;
- }
+ NNI_ASSERT(newpfd != NULL);
+ if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
+ nni_posix_pfd_fini(newpfd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ nni_aio_set_output(aio, 0, pd);
+ nni_aio_finish(aio, 0, 0);
}
static void
-nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
+nni_epdesc_doaccept(nni_posix_epdesc *ed)
{
nni_aio *aio;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+
+ fd = nni_posix_pfd_fd(ed->pfd);
#ifdef NNG_USE_ACCEPT4
- newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC);
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
}
#else
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
#endif
-
- if (newfd >= 0) {
- // successful connection request!
- nni_posix_epdesc_finish(aio, 0, newfd);
- continue;
- }
-
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
- // Well, let's try later. Note that EWOULDBLOCK
- // is required by standards, but some platforms may
- // use EAGAIN. The values may be the same, so we
- // can't use switch.
- return;
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
+ if (rv != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
}
- if ((errno == ECONNABORTED) || (errno == ECONNRESET)) {
- // Let's just eat this one. Perhaps it may be
- // better to report it to the application, but we
- // think most applications don't want to see this.
- // Only someone with a packet trace is going to
- // notice this.
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_epdesc_finish(aio, rv, NULL);
continue;
}
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
- }
-}
-
-static void
-nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- int rv = 1;
- socklen_t sz = sizeof(rv);
-
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
- if (rv == 0) {
- return;
- }
- rv = nni_plat_errno(rv);
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ nni_epdesc_finish(aio, 0, pfd);
}
}
static void
-nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
+nni_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio *aio;
- int fd;
ed->closed = true;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
- nni_posix_pollq_remove(&ed->node);
+ if (ed->pfd != NULL) {
- if ((fd = ed->node.fd) != -1) {
+ nni_posix_pfd_close(ed->pfd);
+ }
+
+ // clean up stale UNIX socket when closing the server.
+ if ((ed->mode == NNI_EP_MODE_LISTEN) && (ed->loclen != 0) &&
+ (ed->locaddr.ss_family == AF_UNIX)) {
struct sockaddr_un *sun = (void *) &ed->locaddr;
- ed->node.fd = -1;
- (void) shutdown(fd, SHUT_RDWR);
- (void) close(fd);
- if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
- (void) unlink(sun->sun_path);
- }
+ (void) unlink(sun->sun_path);
}
}
static void
-nni_posix_epdesc_cb(void *arg)
+nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_epdesc *ed = arg;
- int events;
nni_mtx_lock(&ed->mtx);
-
- if (ed->node.revents & POLLIN) {
- nni_posix_epdesc_doaccept(ed);
- }
- if (ed->node.revents & POLLOUT) {
- nni_posix_epdesc_doconnect(ed);
- }
- if (ed->node.revents & (POLLERR | POLLHUP)) {
- nni_posix_epdesc_doerror(ed);
- }
- if (ed->node.revents & POLLNVAL) {
- nni_posix_epdesc_doclose(ed);
+ if (events & POLLNVAL) {
+ nni_epdesc_doclose(ed);
+ nni_mtx_unlock(&ed->mtx);
+ return;
}
+ NNI_ASSERT(pfd == ed->pfd);
- events = 0;
- if (!nni_list_empty(&ed->connectq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&ed->acceptq)) {
- events |= POLLIN;
- }
- if ((!ed->closed) && (events != 0)) {
- nni_posix_pollq_arm(&ed->node, events);
- }
+ // Anything else will turn up in accept.
+ nni_epdesc_doaccept(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -265,7 +205,7 @@ void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
nni_mtx_lock(&ed->mtx);
- nni_posix_epdesc_doclose(ed);
+ nni_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -276,9 +216,23 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
struct sockaddr_storage *ss;
int rv;
int fd;
+ nni_posix_pfd * pfd;
nni_mtx_lock(&ed->mtx);
+ if (ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ESTATE);
+ }
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ECLOSED);
+ }
+ if ((len = ed->loclen) == 0) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_EADDRINVAL);
+ }
+
ss = &ed->locaddr;
len = ed->loclen;
@@ -286,18 +240,17 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
nni_mtx_unlock(&ed->mtx);
return (nni_plat_errno(errno));
}
- (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
if (bind(fd, (struct sockaddr *) ss, len) < 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
@@ -314,7 +267,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if ((rv = chmod(sun->sun_path, perms)) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
}
@@ -324,27 +277,24 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if (listen(fd, 128) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
- ed->node.fd = fd;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- ed->node.fd = -1;
- nni_mtx_unlock(&ed->mtx);
- return (rv);
- }
+ ed->pfd = pfd;
ed->started = true;
nni_mtx_unlock(&ed->mtx);
+
return (0);
}
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
+ int rv;
+
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
@@ -354,14 +304,25 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
}
nni_mtx_lock(&ed->mtx);
+ if (!ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (ed->closed) {
nni_mtx_unlock(&ed->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ed->acceptq, aio);
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_posix_pollq_arm(&ed->node, POLLIN);
+ if (nni_list_first(&ed->acceptq) == aio) {
+ nni_epdesc_doaccept(ed);
+ }
nni_mtx_unlock(&ed->mtx);
}
@@ -370,79 +331,171 @@ nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = -1;
+
+ nni_mtx_lock(&ed->mtx);
+ if (ed->pfd != NULL) {
+ fd = nni_posix_pfd_fd(ed->pfd);
+ }
+ nni_mtx_unlock(&ed->mtx);
- if (getsockname(ed->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
}
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+static void
+nni_epdesc_connect_start(nni_posix_epdesc *ed)
{
- // NB: We assume that the FD is already set to nonblocking mode.
- int rv;
- int fd;
+ nni_posix_pfd *pfd;
+ int fd;
+ int rv;
+ nni_aio * aio;
- if (nni_aio_begin(aio) != 0) {
+loop:
+ if ((aio = nni_list_first(&ed->connectq)) == NULL) {
return;
}
- nni_mtx_lock(&ed->mtx);
+
+ NNI_ASSERT(ed->pfd == NULL);
+ if (ed->closed) {
+ nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
+ goto loop;
+ }
+ ed->started = true;
if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
}
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
+ }
// Possibly bind.
if ((ed->loclen != 0) &&
(bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
- ed->started = true;
- nni_posix_epdesc_finish(aio, 0, fd);
- nni_mtx_unlock(&ed->mtx);
- return;
+ nni_epdesc_finish(aio, 0, pfd);
+ goto loop;
}
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
if (errno == ENOENT) { // For UNIX domain sockets
- errno = ECONNREFUSED;
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
}
- rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ ed->pfd = pfd;
+ // all done... wait for this to signal via callback
+}
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
nni_aio_finish_error(aio, rv);
return;
}
- // We have to submit to the pollq, because the connection is pending.
- ed->node.fd = fd;
ed->started = true;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- ed->node.fd = -1;
+ nni_list_append(&ed->connectq, aio);
+ if (nni_list_first(&ed->connectq) == aio) {
+ // If there was a stale pfd (probably from an aborted or
+ // canceled connect attempt), discard it so we start fresh.
+ if (ed->pfd != NULL) {
+ nni_posix_pfd_fini(ed->pfd);
+ ed->pfd = NULL;
+ }
+ nni_epdesc_connect_start(ed);
+ }
+ nni_mtx_unlock(&ed->mtx);
+}
+
+static void
+nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_posix_epdesc *ed = arg;
+ nni_aio * aio;
+ socklen_t sz;
+ int rv;
+ int fd;
+
+ nni_mtx_lock(&ed->mtx);
+ if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
+ (pfd != ed->pfd)) {
+ // Spurious completion. Ignore it, but discard the PFD.
+ if (ed->pfd == pfd) {
+ ed->pfd = NULL;
+ }
+ nni_posix_pfd_fini(pfd);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ fd = nni_posix_pfd_fd(pfd);
+ sz = sizeof(rv);
+
+ if ((events & POLLNVAL) != 0) {
+ rv = EBADF;
+
+ } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+
+ switch (rv) {
+ case 0:
+ // Good connect!
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, 0, pfd);
+ break;
+ case EINPROGRESS: // still connecting... come back later
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
return;
+ default:
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
+ nni_posix_pfd_fini(pfd);
+ break;
}
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_aio_list_append(&ed->connectq, aio);
- nni_posix_pollq_arm(&ed->node, POLLOUT);
+ // Start another connect running, if any is waiting.
+ nni_epdesc_connect_start(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -450,7 +503,6 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
{
nni_posix_epdesc *ed;
- int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
return (NNG_ENOMEM);
@@ -458,28 +510,14 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
nni_mtx_init(&ed->mtx);
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the ed to a single pollq we may get some kind of cache warmth.
-
- ed->node.index = 0;
- ed->node.cb = nni_posix_epdesc_cb;
- ed->node.data = ed;
- ed->node.fd = -1;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
+ ed->pfd = NULL;
+ ed->closed = false;
+ ed->started = false;
+ ed->perms = 0; // zero means use default (no change)
+ ed->mode = mode;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
-
- if ((rv = nni_posix_pollq_init(&ed->node)) != 0) {
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
*edp = ed;
return (0);
}
@@ -532,14 +570,16 @@ nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- int fd;
+ nni_posix_pfd *pfd;
+
nni_mtx_lock(&ed->mtx);
- if ((fd = ed->node.fd) != -1) {
- (void) close(ed->node.fd);
- nni_posix_epdesc_doclose(ed);
- }
+ nni_epdesc_doclose(ed);
+ pfd = ed->pfd;
nni_mtx_unlock(&ed->mtx);
- nni_posix_pollq_fini(&ed->node);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index e7225395..b11036ea 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -40,11 +40,11 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq_node node;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
+ nni_posix_pfd *pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
};
static void
@@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
+ nni_posix_pfd_close(pd->pfd);
}
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
unsigned i;
@@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;
- n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
nni_aio_bump_count(aio, n);
// We completed the entire operation on this aioq.
+ // (Sendmsg never returns a partial result.)
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -147,6 +158,12 @@ static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
unsigned i;
@@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
}
- n = readv(pd->node.fd, iovec, niov);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
if (n == 0) {
@@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cb(void *arg)
+nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_pipedesc *pd = arg;
nni_mtx_lock(&pd->mtx);
- if (pd->node.revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_pipedesc_doread(pd);
}
- if (pd->node.revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_pipedesc_dowrite(pd);
}
- if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&pd->writeq)) {
events |= POLLOUT;
}
@@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg)
events |= POLLIN;
}
if ((!pd->closed) && (events != 0)) {
- nni_posix_pollq_arm(&pd->node, events);
+ nni_posix_pfd_arm(pfd, events);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_remove(&pd->node);
-
+ // NB: Events may still occur.
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
@@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->readq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
- // If we are only job on the list, go ahead and try to do an immediate
- // transfer. This allows for faster completions in many cases. We
- // also need not arm a list if it was already armed.
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
if (nni_list_first(&pd->readq) == aio) {
nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ nni_posix_pfd_arm(pd->pfd, POLLIN);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->writeq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ nni_posix_pfd_arm(pd->pfd, POLLOUT);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -349,9 +384,9 @@ int
nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
{
int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
@@ -361,61 +396,19 @@ int
nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
{
int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
}
int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
-{
- nni_posix_pipedesc *pd;
- int rv;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the pd to a single pollq we may get some kind of cache warmth.
-
- pd->closed = false;
- pd->node.fd = fd;
- pd->node.cb = nni_posix_pipedesc_cb;
- pd->node.data = pd;
-
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
- ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
- nni_mtx_fini(&pd->mtx);
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
- *pdp = pd;
- return (0);
-}
-
-int
nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
uint64_t *egid, uint64_t *prid, uint64_t *znid)
{
- int fd = pd->node.fd;
+ int fd = nni_posix_pfd_fd(pd->pfd);
#if defined(NNG_HAVE_GETPEEREID)
uid_t uid;
gid_t gid;
@@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
}
*euid = xu.cr_uid;
*egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID...
+ *prid = (uint64_t) -1; // XXX: macOS has undocumented
+ // LOCAL_PEERPID...
*znid = (uint64_t) -1;
return (0);
#else
@@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
#endif
}
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
+{
+ nni_posix_pipedesc *pd;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ pd->closed = false;
+ pd->pfd = pfd;
+
+ nni_mtx_init(&pd->mtx);
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
+
+ *pdp = pd;
+ return (0);
+}
+
void
nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
- // Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_fini(&pd->node);
- if (pd->node.fd >= 0) {
- (void) close(pd->node.fd);
- }
-
+ nni_posix_pfd_fini(pd->pfd);
+ pd->pfd = NULL;
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 2c855da1..b9786330 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -22,32 +22,18 @@
#include "core/nng_impl.h"
#include <poll.h>
-typedef struct nni_posix_pollq_node nni_posix_pollq_node;
-typedef struct nni_posix_pollq nni_posix_pollq;
-
-struct nni_posix_pollq_node {
- nni_list_node node; // linkage into the pollq list
- nni_posix_pollq *pq; // associated pollq
- int index; // used by the poller impl
- int armed; // used by the poller impl
- int fd; // file descriptor to poll
- int events; // events to watch for
- int revents; // events received
- void * data; // user data
- nni_cb cb; // user callback on event
- nni_mtx mx;
- nni_cv cv;
-};
-
-extern nni_posix_pollq *nni_posix_pollq_get(int);
-extern int nni_posix_pollq_sysinit(void);
-extern void nni_posix_pollq_sysfini(void);
-
-extern int nni_posix_pollq_init(nni_posix_pollq_node *);
-extern void nni_posix_pollq_fini(nni_posix_pollq_node *);
-extern int nni_posix_pollq_add(nni_posix_pollq_node *);
-extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
-extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
+typedef struct nni_posix_pfd nni_posix_pfd;
+typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, int, void *);
+
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
+
+extern int nni_posix_pfd_init(nni_posix_pfd **, int);
+extern void nni_posix_pfd_fini(nni_posix_pfd *);
+extern int nni_posix_pfd_arm(nni_posix_pfd *, int);
+extern int nni_posix_pfd_fd(nni_posix_pfd *);
+extern void nni_posix_pfd_close(nni_posix_pfd *);
+extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 0f6867da..a8d8693a 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -12,6 +12,7 @@
#ifdef NNG_HAVE_EPOLL
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -22,179 +23,215 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+#ifndef EFD_CLOEXEC
+#define EFD_CLOEXEC 0
+#endif
+#ifndef EFD_NONBLOCK
+#define EFD_NONBLOCK 0
+#endif
+
#define NNI_MAX_EPOLL_EVENTS 64
// flags we always want enabled as long as at least one event is active
#define NNI_EPOLL_FLAGS (EPOLLONESHOT | EPOLLERR | EPOLLHUP)
+// Locking strategy:
+//
+// The pollq mutex protects its own reapq, close state, and the close
+// state of the individual pfds. It also protects the pfd cv, which is
+// only signaled when the pfd is closed. This mutex is only acquired
+// when shutting down the pollq, or closing a pfd. For normal hot-path
+// operations we don't need it.
+//
+// The pfd mutex protects the pfd's own "closing" flag (test and set),
+// the callback and arg, and its event mask. This mutex is used a lot,
+// but it should be uncontended excepting possibly when closing.
+
// nni_posix_pollq is a work structure that manages state for the epoll-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int epfd; // epoll handle
- int evfd; // event fd
- bool close; // request for worker to exit
- bool started;
- nni_idhash * nodes;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int epfd; // epoll handle
+ int evfd; // event fd (to wake us for other stuff)
+ bool close; // request for worker to exit
+ nni_thr thr; // worker thread
+ nni_list reapq;
+};
+
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ nni_list_node node;
+ int fd;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ bool closed;
+ bool closing;
+ bool reap;
+ int events;
+ nni_mtx mtx;
+ nni_cv cv;
};
+// single global instance for now.
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
- int rv;
+ nni_posix_pfd * pfd;
nni_posix_pollq * pq;
struct epoll_event ev;
- uint64_t id;
-
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ int rv;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((rv = nni_idhash_alloc(pq->nodes, &id, node)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
- node->index = (int) id;
- node->pq = pq;
- node->events = 0;
+ pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->events = 0;
+ pfd->closing = false;
+ pfd->closed = false;
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pfd->node);
// notifications disabled to begin with
ev.events = 0;
- ev.data.u64 = id;
+ ev.data.ptr = pfd;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, node->fd, &ev);
- if (rv != 0) {
+ if ((rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev)) != 0) {
rv = nni_plat_errno(errno);
- nni_idhash_remove(pq->nodes, id);
- node->index = 0;
- node->pq = NULL;
+ nni_cv_fini(&pfd->cv);
+ NNI_FREE_STRUCT(pfd);
+ return (rv);
}
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ *pfdp = pfd;
+ return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- int rv;
- struct epoll_event ev;
-
- node->events = 0;
- node->pq = NULL;
-
- ev.events = 0;
- ev.data.u64 = (uint64_t) node->index;
-
- if (node->index != 0) {
- // This deregisters the node. If the poller was blocked
- // then this keeps it from coming back in to find us.
- nni_idhash_remove(pq->nodes, (uint64_t) node->index);
- }
-
- // NB: EPOLL_CTL_DEL actually *ignores* the event, but older Linux
- // versions need it to be non-NULL.
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_DEL, node->fd, &ev);
- if (rv != 0) {
- NNI_ASSERT(errno == EBADF || errno == ENOENT);
+ nni_posix_pollq *pq = pfd->pq;
+
+ // NB: We depend on epoll event flags being the same as their POLLIN
+ // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so
+ // forth. This turns out to be true both for Linux and the illumos
+ // epoll implementation.
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ struct epoll_event ev;
+ pfd->events |= events;
+ events = pfd->events;
+
+ ev.events = events | NNI_EPOLL_FLAGS;
+ ev.data.ptr = pfd;
+
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ return (pfd->fd);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- node->index = 0;
- return (0);
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_close(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ nni_posix_pollq * pq = pfd->pq;
+ struct epoll_event ev; // Not actually used.
+ pfd->closing = true;
+
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
}
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_unlock(&pfd->mtx);
}
void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- int rv;
- struct epoll_event ev;
- nni_posix_pollq * pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ // We have to synchronize with the pollq thread (unless we are
+ // on that thread!)
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ uint64_t one = 1;
+
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pfd);
+
+ // Wake the remote side. For now we assume this always
+ // succeeds. The only failure modes here occur when we
+ // have already excessively signaled this (2^64 times
+ // with no read!!), or when the evfd is closed, or some
+ // kernel bug occurs. Those errors would manifest as
+ // a hang waiting for the poller to reap the pfd in fini,
+ // if it were possible for them to occur. (Barring other
+ // bugs, it isn't.)
+ (void) write(pq->evfd, &one, sizeof(one));
+
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_lock(&pq->mtx);
+ // We're exclusive now.
- node->events |= events;
- ev.events = node->events | NNI_EPOLL_FLAGS;
- ev.data.u64 = (uint64_t) node->index;
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev);
- NNI_ASSERT(rv == 0);
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pfd;
+ nni_mtx_lock(&pq->mtx);
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ // Let fini know we're done with it, and it's safe to
+ // remove.
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -204,101 +241,74 @@ nni_posix_poll_thr(void *arg)
nni_posix_pollq * pq = arg;
struct epoll_event events[NNI_MAX_EPOLL_EVENTS];
- nni_mtx_lock(&pq->mtx);
-
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
-
- nevents =
- epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ for (;;) {
+ int n;
+ bool reap = false;
- nni_mtx_lock(&pq->mtx);
-
- if (nevents <= 0) {
- continue;
+ n = epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ if ((n < 0) && (errno == EBADF)) {
+ // Epoll fd closed, bail.
+ return;
}
// dispatch events
- for (i = 0; i < nevents; ++i) {
+ for (int i = 0; i < n; ++i) {
const struct epoll_event *ev;
- nni_posix_pollq_node * node;
ev = &events[i];
// If the waker pipe was signaled, read from it.
- if ((ev->data.u64 == 0) && (ev->events & POLLIN)) {
- int rv;
+ if ((ev->data.ptr == NULL) && (ev->events & POLLIN)) {
uint64_t clear;
- rv = read(pq->evfd, &clear, sizeof(clear));
- NNI_ASSERT(rv == sizeof(clear));
- continue;
- }
-
- if (nni_idhash_find(pq->nodes, ev->data.u64,
- (void **) &node) != 0) {
- // node was removed while we were blocking
- continue;
+ (void) read(pq->evfd, &clear, sizeof(clear));
+ reap = true;
+ } else {
+ nni_posix_pfd * pfd = ev->data.ptr;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ int events;
+
+ events = ev->events &
+ (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events &= ~events;
+ cb = pfd->cb;
+ arg = pfd->arg;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // Execute the callback with lock released
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
}
+ }
- node->revents = ev->events &
- (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
+ if (reap) {
+ nni_posix_pollq_reap(pq);
nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ if (pq->close) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
}
+ nni_mtx_unlock(&pq->mtx);
}
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- int rv;
- uint64_t wakeval = 1;
+ uint64_t one = 1;
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- rv = write(pq->evfd, &wakeval, sizeof(wakeval));
- NNI_ASSERT(rv == sizeof(wakeval));
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
+ nni_mtx_lock(&pq->mtx);
+ pq->close = true;
+ (void) write(pq->evfd, &one, sizeof(one));
+ nni_mtx_unlock(&pq->mtx);
- if (pq->evfd >= 0) {
- close(pq->evfd);
- pq->evfd = -1;
- }
+ nni_thr_fini(&pq->thr);
+ close(pq->evfd);
close(pq->epfd);
- pq->epfd = -1;
-
- if (pq->nodes != NULL) {
- nni_idhash_fini(pq->nodes);
- }
nni_mtx_fini(&pq->mtx);
}
@@ -308,22 +318,25 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
{
// add event fd so we can wake ourself on exit
struct epoll_event ev;
- int rv;
+ int fd;
memset(&ev, 0, sizeof(ev));
- pq->evfd = eventfd(0, EFD_NONBLOCK);
- if (pq->evfd == -1) {
+ if ((fd = eventfd(0, EFD_NONBLOCK)) < 0) {
return (nni_plat_errno(errno));
}
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ // This is *NOT* one shot. We want to wake EVERY single time.
ev.events = EPOLLIN;
- ev.data.u64 = 0;
+ ev.data.ptr = 0;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pq->evfd, &ev);
- if (rv != 0) {
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ (void) close(fd);
return (nni_plat_errno(errno));
}
+ pq->evfd = fd;
return (0);
}
@@ -332,40 +345,30 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- if ((pq->epfd = epoll_create1(0)) < 0) {
+ if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) {
return (nni_plat_errno(errno));
}
- pq->evfd = -1;
pq->close = false;
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
- if (((rv = nni_idhash_init(&pq->nodes)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
- ((rv = nni_posix_pollq_add_eventfd(pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
+ (void) close(pq->epfd);
+ nni_mtx_fini(&pq->mtx);
+ return (rv);
+ }
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ (void) close(pq->epfd);
+ (void) close(pq->evfd);
+ nni_mtx_fini(&pq->mtx);
return (rv);
}
-
- // Positive values only for node indices. (0 is reserved for eventfd).
- nni_idhash_set_limits(pq->nodes, 1, 0x7FFFFFFFu, 1);
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 0f312170..36ced3ff 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -12,196 +12,203 @@
#ifdef NNG_HAVE_KQUEUE
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
#include <sys/event.h>
+#include <sys/socket.h>
#include <unistd.h>
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-// TODO: can this be feature detected in cmake,
-// rather than relying on platform?
-#if defined NNG_PLATFORM_NETBSD
-#define kevent_udata_t intptr_t
-#else
-#define kevent_udata_t void *
-#endif
-
-#define NNI_MAX_KQUEUE_EVENTS 64
-
-// user event id used to shutdown the polling thread
-#define NNI_KQ_EV_EXIT_ID 0xF
+typedef struct nni_posix_pollq nni_posix_pollq;
// nni_posix_pollq is a work structure that manages state for the kqueue-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int kq; // kqueue handle
- bool close; // request for worker to exit
- bool started;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
+struct nni_posix_pfd {
+ nni_list_node node; // linkage into the reap list
+ nni_posix_pollq *pq; // associated pollq
+ int fd; // file descriptor to poll
+ void * data; // user data
+ nni_posix_pfd_cb cb; // user callback on event
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
+ int events;
+ bool closing;
+ bool closed;
+};
+
+#define NNI_MAX_KQUEUE_EVENTS 64
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
+ nni_posix_pfd * pf;
nni_posix_pollq *pq;
- struct kevent kevents[2];
+ struct kevent ev[2];
+ unsigned flags = EV_ADD | EV_DISABLE;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
+ if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
-
- node->pq = pq;
- node->events = 0;
-
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ // Create entries in the kevent queue, without enabling them.
+ EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
+ EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
-
- if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) {
- nni_mtx_unlock(&pq->mtx);
+ // We update the kqueue list, without polling for events.
+ if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ NNI_FREE_STRUCT(pf);
return (nni_plat_errno(errno));
}
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->pq = pq;
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
- nni_mtx_unlock(&pq->mtx);
return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+void
+nni_posix_pfd_close(nni_posix_pfd *pf)
{
- struct kevent kevents[2];
-
- node->events = 0;
- node->pq = NULL;
+ nni_posix_pollq *pq = pf->pq;
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0,
- (kevent_udata_t) node);
-
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0,
- 0, (kevent_udata_t) node);
-
- // So it turns out that we can get EBADF, ENOENT, and apparently
- // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting
- // an event, and its harmless if the event removal fails (worst
- // case would be a spurious wakeup), so lets ignore it.
- (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL);
+ nni_mtx_lock(&pq->mtx);
+ if (!pf->closing) {
+ struct kevent ev[2];
+ pf->closing = true;
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq;
- if (pq == NULL) {
- return;
- }
+ pq = pf->pq;
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
+ nni_posix_pfd_close(pf);
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ if (!nni_thr_is_self(&pq->thr)) {
+ struct kevent ev;
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pf);
+ EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL);
+
+ // If this fails, the cleanup will stall. That should
+ // only occur in a memory pressure situation, and it
+ // will self-heal when the next event comes in.
+ (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
+ while (!pf->closed) {
+ nni_cv_wait(&pf->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_unlock(&pq->mtx);
+
+ (void) close(pf->fd);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
+ NNI_FREE_STRUCT(pf);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+nni_posix_pfd_fd(nni_posix_pfd *pf)
{
- NNI_ARG_UNUSED(node);
- return (0);
+ return (pf->fd);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+ nni_mtx_lock(&pf->mtx);
+ pf->cb = cb;
+ pf->data = arg;
+ nni_mtx_unlock(&pf->mtx);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pf, int events)
{
- nni_posix_pollq *pq = node->pq;
- struct kevent kevents[2];
- int nevents = 0;
+ struct kevent ev[2];
+ int nev = 0;
+ unsigned flags = EV_ENABLE | EV_DISPATCH;
+ nni_posix_pollq *pq = pf->pq;
+
+ nni_mtx_lock(&pf->mtx);
+ if (pf->closing) {
+ events = 0;
+ } else {
+ pf->events |= events;
+ events = pf->events;
+ }
+ nni_mtx_unlock(&pf->mtx);
- NNI_ASSERT(pq != NULL);
if (events == 0) {
- return;
+ // No events, and kqueue is oneshot, so nothing to do.
+ return (0);
}
- nni_mtx_lock(&pq->mtx);
-
- if (!(node->events & POLLIN) && (events & POLLIN)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLIN) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
-
- if (!(node->events & POLLOUT) && (events & POLLOUT)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLOUT) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
-
- if (nevents > 0) {
- // This call should never fail, really. The only possible
- // legitimate failure would be ENOMEM, but in that case
- // lots of other things are going to be failing, or ENOENT
- // or ESRCH, indicating we already lost interest; the
- // only consequence of ignoring these errors is that a given
- // descriptor might appear "stuck". This beats the alternative
- // of just blithely crashing the application with an assertion.
- (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
- node->events |= events;
+ while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
}
+ return (0);
+}
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pf;
+ nni_mtx_lock(&pq->mtx);
+ while ((pf = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pf);
+ pf->closed = true;
+ nni_cv_wake(&pf->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -209,117 +216,71 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
- struct kevent kevents[NNI_MAX_KQUEUE_EVENTS];
-
- nni_mtx_lock(&pq->mtx);
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
- nevents = kevent(
- pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL);
- nni_mtx_lock(&pq->mtx);
-
- if (nevents < 0) {
- continue;
+ for (;;) {
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd * pf;
+ nni_posix_pfd_cb cb;
+ void * cbarg;
+ int revents;
+ bool reap = false;
+
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
+ if (n < 0) {
+ if (errno == EBADF) {
+ nni_posix_pollq_reap(pq);
+ return;
+ }
+ reap = true;
}
- // dispatch events
- for (i = 0; i < nevents; ++i) {
- struct kevent ev_disable;
- const struct kevent * ev;
- nni_posix_pollq_node *node;
+ for (int i = 0; i < n; i++) {
+ struct kevent *ev = &evs[i];
- ev = &kevents[i];
- if (ev->filter == EVFILT_USER &&
- ev->ident == NNI_KQ_EV_EXIT_ID) {
- // we've woken up to exit the polling thread
+ switch (ev->filter) {
+ case EVFILT_READ:
+ revents = POLLIN;
break;
- }
-
- node = (nni_posix_pollq_node *) ev->udata;
- if (node->pq == NULL) {
- // node was removed while we were blocking
+ case EVFILT_WRITE:
+ revents = POLLOUT;
+ break;
+ case EVFILT_USER:
+ default:
+ reap = true;
continue;
}
- node->revents = 0;
-
+ pf = (void *) ev->udata;
if (ev->flags & (EV_ERROR | EV_EOF)) {
- node->revents |= POLLHUP;
- }
- if (ev->filter == EVFILT_WRITE) {
- node->revents |= POLLOUT;
- } else if (ev->filter == EVFILT_READ) {
- node->revents |= POLLIN;
- } else {
- NNI_ASSERT(false); // unhandled filter
- break;
+ revents |= POLLHUP;
}
- // explicitly disable this event. we'd ideally rely on
- // the behavior of EV_DISPATCH to do this,
- // but that only happens once we've acknowledged the
- // event by reading/or writing the fd. because there
- // can currently be some latency between the time we
- // receive this event and the time we read/write in
- // response, disable the event in the meantime to avoid
- // needless wakeups.
- // revisit if we're able to reduce/remove this latency.
- EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter,
- EV_DISABLE, 0, 0, NULL);
- // this will only fail if the fd is already
- // closed/invalid which we don't mind anyway,
- // so ignore return value.
- (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pf->mtx);
+ cb = pf->cb;
+ cbarg = pf->data;
+ pf->events &= ~(revents);
+ nni_mtx_unlock(&pf->mtx);
+
+ if (cb != NULL) {
+ cb(pf, revents, cbarg);
}
}
+ if (reap) {
+ nni_posix_pollq_reap(pq);
+ }
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE,
- NOTE_TRIGGER, 0, NULL);
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
-
if (pq->kq >= 0) {
close(pq->kq);
pq->kq = -1;
}
+ nni_thr_fini(&pq->thr);
+
+ nni_posix_pollq_reap(pq);
nni_mtx_fini(&pq->mtx);
}
@@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
static int
nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
{
- // add user event so we can wake ourself on exit
+ int rv;
struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL);
- return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL)));
+
+ EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ return (0);
}
static int
@@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (nni_plat_errno(errno));
}
- pq->close = false;
-
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
(rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
@@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (rv);
}
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
+
return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index efc9ff48..26753df6 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -11,8 +11,6 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-#ifdef NNG_USE_POSIX_POLLQ_POLL
-
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
@@ -32,306 +30,285 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
+
+// Locking strategy: We use the pollq lock to guard the lists on the pollq,
+// the nfds (which counts the number of items in the pollq), the pollq
+// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
+// use a lock on the pfd only to protect the the events field (which we treat
+// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
+// lock is therefore a leaf lock, which is sometimes acquired while holding
+// the pq lock. Every reasonable effort is made to minimize holding locks.
+// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
+// persists until the pfd is destroyed.)
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd * fds;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list polled; // polled nodes
- nni_list armed; // armed nodes
- nni_list idle; // idle nodes
- int nnodes; // num of nodes in nodes list
- int inpoll; // poller asleep in poll
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int nfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ bool closing; // request for worker to exit
+ bool closed;
+ nni_thr thr; // worker thread
+ nni_list pollq; // armed nodes
+ nni_list reapq;
};
-static int
-nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
-{
- int grow = pq->nnodes + 2; // one for us, one for waker
- struct pollfd *newfds;
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_list_node node;
+ nni_cv cv;
+ nni_mtx mtx;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
- if (grow < pq->nfds) {
- return (0);
+int
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+{
+ nni_posix_pfd * pfd;
+ nni_posix_pollq *pq = &nni_posix_global_pollq;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ // If this code is getting used, you really should be using the
+ // kqueue poller, or you need to upgrade your older system.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
+
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
+ NNI_LIST_NODE_INIT(&pfd->node);
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->pq = pq;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closing) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+ return (NNG_ECLOSED);
+ }
+ nni_list_append(&pq->pollq, pfd);
+ pq->nfds++;
+ nni_mtx_unlock(&pq->mtx);
+ *pfdp = pfd;
+ return (0);
+}
- grow = grow + 128;
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
+{
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
+}
- if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) {
- return (NNG_ENOMEM);
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
+{
+ return (pfd->fd);
+}
+
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
+
+ nni_mtx_lock(&pq->mtx);
+ if (nni_list_active(&pq->pollq, pfd)) {
+ nni_list_remove(&pq->pollq, pfd);
+ pq->nfds--;
}
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_list_append(&pq->reapq, pfd);
+ nni_plat_pipe_raise(pq->wakewfd);
+ while (nni_list_active(&pq->reapq, pfd)) {
+ nni_cv_wait(&pfd->cv);
+ }
}
- pq->fds = newfds;
- pq->nfds = grow;
+ nni_mtx_unlock(&pq->mtx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
+
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events |= events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if (!nni_thr_is_self(&pq->thr)) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq * pollq = arg;
- nni_posix_pollq_node *node;
+ nni_posix_pollq *pq = arg;
+ int nalloc = 0;
+ struct pollfd * fds = NULL;
+ nni_posix_pfd ** pfds = NULL;
- nni_mtx_lock(&pollq->mtx);
for (;;) {
- int rv;
int nfds;
- struct pollfd *fds;
+ int events;
+ nni_posix_pfd *pfd;
- if (pollq->close) {
- break;
+ nni_mtx_lock(&pq->mtx);
+ while (nalloc < (pq->nfds + 1)) {
+ int n = pq->nfds + 128;
+
+ // Drop the lock while we sleep or allocate. This
+ // allows additional items to be added or removed (!)
+ // while we wait.
+ nni_mtx_unlock(&pq->mtx);
+
+ // Toss the old ones first; avoids *doubling* memory
+ // consumption during alloc.
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
+ nalloc = 0;
+
+ if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
+ nni_msleep(10); // sleep for a bit, try later
+ } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
+ NNI_FREE_STRUCTS(pfds, n);
+ nni_msleep(10);
+ } else {
+ nalloc = n;
+ }
+ nni_mtx_lock(&pq->mtx);
}
- fds = pollq->fds;
- nfds = 0;
-
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
- fds[nfds].fd = pollq->wakerfd;
- fds[nfds].events = POLLIN;
- fds[nfds].revents = 0;
- nfds++;
+ fds[0].fd = pq->wakerfd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ pfds[0] = NULL;
+ nfds = 1;
+
+ // Also lets reap anything that was in the reaplist!
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ nni_cv_wake(&pfd->cv);
+ }
- // Set up the poll list.
- while ((node = nni_list_first(&pollq->armed)) != NULL) {
- nni_list_remove(&pollq->armed, node);
- nni_list_append(&pollq->polled, node);
- fds[nfds].fd = node->fd;
- fds[nfds].events = node->events;
- fds[nfds].revents = 0;
- node->index = nfds;
- nfds++;
+ // If we're closing down, bail now. This is done *after* we
+ // have ensured that the reapq is empty. Anything still in
+ // the pollq is not going to receive further callbacks.
+ if (pq->closing) {
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ break;
}
- // Now poll it. We block indefinitely, since we use separate
- // timeouts to wake and remove the elements from the list.
- pollq->inpoll = 1;
- nni_mtx_unlock(&pollq->mtx);
- rv = poll(fds, nfds, -1);
- nni_mtx_lock(&pollq->mtx);
- pollq->inpoll = 0;
-
- if (rv < 0) {
- // This shouldn't happen really. If it does, we
- // just try again. (EINTR is probably the only
- // reasonable failure here, unless internal memory
- // allocations fail in the kernel, leading to EAGAIN.)
- continue;
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pq->pollq, pfd) {
+
+ nni_mtx_lock(&pfd->mtx);
+ events = pfd->events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ if (events != 0) {
+ fds[nfds].fd = pfd->fd;
+ fds[nfds].events = events;
+ fds[nfds].revents = 0;
+ pfds[nfds] = pfd;
+ nfds++;
+ }
}
+ nni_mtx_unlock(&pq->mtx);
+
+ // We could get the result from poll, and avoid iterating
+ // over the entire set of pollfds, but since on average we
+ // will be walking half the list, doubling the work we do
+ // (the condition with a potential pipeline stall) seems like
+ // adding complexity with no real benefit. It also makes the
+ // worst case even worse.
+ (void) poll(fds, nfds, -1);
// If the waker pipe was signaled, read from it.
if (fds[0].revents & POLLIN) {
- NNI_ASSERT(fds[0].fd == pollq->wakerfd);
- nni_plat_pipe_clear(pollq->wakerfd);
+ NNI_ASSERT(fds[0].fd == pq->wakerfd);
+ nni_plat_pipe_clear(pq->wakerfd);
}
- while ((node = nni_list_first(&pollq->polled)) != NULL) {
- int index = node->index;
-
- // We remove ourselves from the polled list, and
- // then put it on either the idle or armed list
- // depending on whether it remains armed.
- node->index = 0;
- nni_list_remove(&pollq->polled, node);
- NNI_ASSERT(index > 0);
- if (fds[index].revents == 0) {
- // If still watching for events, return it
- // to the armed list.
- if (node->events) {
- nni_list_append(&pollq->armed, node);
- } else {
- nni_list_append(&pollq->idle, node);
- }
- continue;
- }
+ for (int i = 1; i < nfds; i++) {
+ if ((events = fds[i].revents) != 0) {
+ nni_posix_pfd_cb cb;
+ void * arg;
- // We are calling the callback, so disarm
- // all events; the node can rearm them in its
- // callback.
- node->revents = fds[index].revents;
- node->events &= ~node->revents;
- if (node->events == 0) {
- nni_list_append(&pollq->idle, node);
- } else {
- nni_list_append(&pollq->armed, node);
- }
+ pfd = pfds[i];
+
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->arg;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pollq->active = node;
-
- // Execute the callback -- without locks held.
- nni_mtx_unlock(&pollq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pollq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pollq->active = NULL;
- if (pollq->wait == node) {
- pollq->wait = NULL;
- nni_cv_wake(&pollq->cv);
+ if (cb) {
+ cb(pfd, events, arg);
+ }
}
}
}
- nni_mtx_unlock(&pollq->mtx);
-}
-
-int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
-{
- int rv;
- nni_posix_pollq *pq;
-
- NNI_ASSERT(!nni_list_node_active(&node->node));
- pq = nni_posix_pollq_get(node->fd);
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
-
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
- node->pq = pq;
- if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
- }
- pq->nnodes++;
- nni_list_append(&pq->idle, node);
- nni_mtx_unlock(&pq->mtx);
- return (0);
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
nni_mtx_lock(&pq->mtx);
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
+ pq->closing = 1;
nni_mtx_unlock(&pq->mtx);
-}
-
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
-}
-
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
-void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
-{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
-}
+ nni_plat_pipe_raise(pq->wakewfd);
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
-{
- nni_posix_pollq *pq = node->pq;
- int oevents;
-
- NNI_ASSERT(pq != NULL);
-
- nni_mtx_lock(&pq->mtx);
- oevents = node->events;
- node->events |= events;
-
- // We move this to the armed list if its not armed, or already
- // on the polled list. The polled list would be the case where
- // the index is set to a positive value.
- if ((oevents == 0) && (events != 0) && (node->index < 1)) {
- nni_list_node_remove(&node->node);
- nni_list_append(&pq->armed, node);
- }
- if ((events != 0) && (oevents != events)) {
- // Possibly wake up poller since we're looking for new events.
- if (pq->inpoll) {
- nni_plat_pipe_raise(pq->wakewfd);
- }
- }
- nni_mtx_unlock(&pq->mtx);
-}
-
-static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
-{
- if (pq->started) {
- nni_mtx_lock(&pq->mtx);
- pq->close = 1;
- pq->started = 0;
- nni_plat_pipe_raise(pq->wakewfd);
- nni_mtx_unlock(&pq->mtx);
- }
nni_thr_fini(&pq->thr);
-
- // All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->polled));
- NNI_ASSERT(nni_list_empty(&pq->armed));
- NNI_ASSERT(nni_list_empty(&pq->idle));
- NNI_ASSERT(pq->nnodes == 0);
-
- if (pq->wakewfd >= 0) {
- nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
- pq->wakewfd = pq->wakerfd = -1;
- }
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
- pq->fds = NULL;
- pq->nfds = 0;
- }
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
nni_mtx_fini(&pq->mtx);
}
@@ -340,50 +317,27 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
- pq->wakewfd = -1;
- pq->wakerfd = -1;
- pq->close = 0;
-
- nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ pq->closing = false;
+ pq->closed = false;
- if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
- ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);
}
- pq->started = 1;
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ return (rv);
+ }
+ nni_mtx_init(&pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
-// We use a single pollq for the entire system, which means only a single
-// thread is polling. This may be somewhat less than optimally efficient,
-// and it may be worth investigating having multiple threads to improve
-// efficiency and scalability. (This would shorten the linked lists,
-// improving C10K scalability, and also allow us to engage multiple cores.)
-// It's not entirely clear how many threads are "optimal".
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- // This is the point where we could choose a pollq based on FD.
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
- int rv;
-
- rv = nni_posix_pollq_create(&nni_posix_global_pollq);
- return (rv);
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
void
@@ -391,5 +345,3 @@ nni_posix_pollq_sysfini(void)
{
nni_posix_pollq_destroy(&nni_posix_global_pollq);
}
-
-#endif // NNG_USE_POSIX_POLLQ_POLL
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 7231cb27..63c1d1d1 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -1,7 +1,6 @@
//
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Liam Staskawicz <liam@stask.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -12,7 +11,9 @@
#ifdef NNG_HAVE_PORT_CREATE
#include <errno.h>
+#include <fcntl.h>
#include <port.h>
+#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -21,6 +22,9 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+#define NNI_MAX_PORTEV 64
+typedef struct nni_posix_pollq nni_posix_pollq;
+
// nni_posix_pollq is a work structure that manages state for the port-event
// based pollq implementation. We only really need to keep track of the
// single thread, and the associated port itself.
@@ -29,190 +33,178 @@ struct nni_posix_pollq {
nni_thr thr; // worker thread
};
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_mtx mtx;
+ nni_cv cv;
+ int events;
+ bool closed;
+ bool closing;
+ nni_posix_pfd_cb cb;
+ void * data;
+};
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
nni_posix_pollq *pq;
+ nni_posix_pfd * pfd;
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&node->mx);
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- nni_mtx_unlock(&node->mx);
- return (NNG_ESTATE);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- node->pq = pq;
- node->events = 0;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
-
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pfd->mtx);
+ pfd->closed = false;
+ pfd->closing = false;
+ pfd->fd = fd;
+ pfd->pq = pq;
+ pfd->cb = NULL;
+ pfd->data = NULL;
+ *pfdp = pfd;
return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ return (pfd->fd);
+}
- if (pq == NULL) {
- return;
- }
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- node->events = 0;
- if (node->armed) {
- // Failure modes that can occur are uninteresting.
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->closing = true;
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
}
- nni_mtx_unlock(&node->mx);
-}
+ nni_mtx_unlock(&pfd->mtx);
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- nni_mtx_init(&node->mx);
- nni_cv_init(&node->cv, &node->mx);
- node->pq = NULL;
- node->armed = false;
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
+ // Send the wake event to the poller to synchronize with it.
+ // Note that port_send should only really fail if out of memory
+ // or we run into a resource limit.
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the node is removed properly.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- if ((pq = node->pq) != NULL) {
- // Dissociate the port; if it isn't already associated we
- // don't care. (An extra syscall, but it should not matter.)
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_posix_pfd_close(pfd);
- for (;;) {
- if (port_send(pq->port, 0, node) == 0) {
- break;
- }
- switch (errno) {
- case EAGAIN:
- case ENOMEM:
- // Resource exhaustion.
- // Best bet in these cases is to sleep it off.
- // This may appear like a total application
- // hang, but by sleeping here maybe we give
- // a chance for things to clear up.
- nni_mtx_unlock(&node->mx);
- nni_msleep(5000);
- nni_mtx_lock(&node->mx);
- continue;
- case EBADFD:
- case EBADF:
- // Most likely these indicate that the pollq
- // itself has been closed. That's ok.
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ while (port_send(pq->port, 1, pfd) != 0) {
+ if ((errno == EBADF) || (errno == EBADFD)) {
+ pfd->closed = true;
break;
}
+ sched_yield(); // try again later...
}
- // Wait for the pollq thread to tell us with certainty that
- // they are done. This is needed to ensure that the pollq
- // thread isn't executing (or about to execute) the callback
- // before we destroy it.
- while (node->pq != NULL) {
- nni_cv_wait(&node->cv);
+
+ nni_mtx_lock(&pfd->mtx);
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
}
+ nni_mtx_unlock(&pfd->mtx);
}
- nni_mtx_unlock(&node->mx);
- nni_cv_fini(&node->cv);
- nni_mtx_fini(&node->mx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- nni_posix_pollq *pq = node->pq;
-
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->events |= events;
+ if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd,
+ pfd->events, pfd) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
-
- nni_mtx_lock(&node->mx);
- node->events |= events;
- node->armed = true;
- (void) port_associate(
- pq->port, PORT_SOURCE_FD, node->fd, node->events, node);
-
- // Possible errors here are:
- //
- // EBADF -- programming error on our part
- // EBADFD -- programming error on our part
- // ENOMEM -- not much we can do here
- // EAGAIN -- too many port events registered (65K!!)
- //
- // For now we ignore them all. (We need to be able to return
- // errors to our caller.) Effect on the application will appear
- // to be a stalled file descriptor (no notifications).
- nni_mtx_unlock(&node->mx);
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
-
for (;;) {
- nni_posix_pollq * pq = arg;
- port_event_t ev;
- nni_posix_pollq_node *node;
-
- if (port_get(pq->port, &ev, NULL) != 0) {
+ nni_posix_pollq *pq = arg;
+ port_event_t ev[NNI_MAX_PORTEV];
+ nni_posix_pfd * pfd;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ unsigned n;
+
+ n = 1; // wake us even on just one event
+ if (port_getn(pq->port, ev, NNI_MAX_PORTEV, &n, NULL) != 0) {
if (errno == EINTR) {
continue;
}
return;
}
- switch (ev.portev_source) {
- case PORT_SOURCE_ALERT:
- return;
-
- case PORT_SOURCE_FD:
- node = ev.portev_user;
+ // We run through the returned ports twice. First we
+ // get the callbacks. Then we do the reaps. This way
+ // we ensure that we only reap *after* callbacks have run.
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_FD) {
+ continue;
+ }
+ pfd = ev[i].portev_user;
+ events = ev[i].portev_events;
- nni_mtx_lock(&node->mx);
- node->revents = ev.portev_events;
- // mark events as cleared
- node->events &= ~node->revents;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->data;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- node->cb(node->data);
- break;
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
+ }
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_USER) {
+ continue;
+ }
- case PORT_SOURCE_USER:
// User event telling us to stop doing things.
- // We signal back to use this as a coordination event
- // between the pollq and the thread handler.
- // NOTE: It is absolutely critical that there is only
- // a single thread per pollq. Otherwise we cannot
- // be sure that we are blocked completely,
- node = ev.portev_user;
- nni_mtx_lock(&node->mx);
- node->pq = NULL;
- nni_cv_wake(&node->cv);
- nni_mtx_unlock(&node->mx);
+ // We signal back to use this as a coordination
+ // event between the pollq and the thread
+ // handler. NOTE: It is absolutely critical
+ // that there is only a single thread per
+ // pollq. Otherwise we cannot be sure that we
+ // are blocked completely,
+ pfd = ev[i].portev_user;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ nni_mtx_unlock(&pfd->mtx);
}
}
}
@@ -220,7 +212,6 @@ nni_posix_poll_thr(void *arg)
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- port_alert(pq->port, PORT_ALERT_SET, 1, NULL);
(void) close(pq->port);
nni_thr_fini(&pq->thr);
}
@@ -243,14 +234,15 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->data = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
int
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index d86a2008..96d8debd 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,50 +37,51 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_posix_resolv_tq = NULL;
-static nni_mtx nni_posix_resolv_mtx;
-
-typedef struct nni_posix_resolv_item nni_posix_resolv_item;
-struct nni_posix_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *resolv_tq = NULL;
+static nni_mtx resolv_mtx;
+
+typedef struct resolv_item resolv_item;
+struct resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv)
+resolv_finish(resolv_item *item, int rv)
{
nni_aio *aio;
- if ((aio = item->aio) != NULL) {
- if (nni_aio_get_prov_data(aio) == item) {
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
- }
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
}
}
static void
-nni_posix_resolv_cancel(nni_aio *aio, int rv)
+resolv_cancel(nni_aio *aio, int rv)
{
- nni_posix_resolv_item *item;
+ resolv_item *item;
- nni_mtx_lock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ nni_mtx_unlock(&resolv_mtx);
nni_aio_finish_error(aio, rv);
}
@@ -116,14 +117,23 @@ nni_posix_gai_errno(int rv)
}
static void
-nni_posix_resolv_task(void *arg)
+resolv_task(void *arg)
{
- nni_posix_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ resolv_item * item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
+
+ nni_mtx_lock(&resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&resolv_mtx);
results = NULL;
@@ -170,7 +180,7 @@ nni_posix_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nng_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nng_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -196,17 +206,18 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_posix_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ resolv_finish(item, rv);
+ nni_mtx_unlock(&resolv_mtx);
}
static void
-nni_posix_resolv_ip(const char *host, const char *serv, int passive,
- int family, int proto, nni_aio *aio)
+resolv_ip(const char *host, const char *serv, int passive, int family,
+ int proto, nni_aio *aio)
{
- nni_posix_resolv_item *item;
- sa_family_t fam;
+ resolv_item *item;
+ sa_family_t fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -231,10 +242,15 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
return;
}
- nni_task_init(
- nni_posix_resolv_tq, &item->task, nni_posix_resolv_task, item);
+ if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
+ 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ };
// NB: host and serv must remain valid until this is completed.
+ memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
item->name = host;
item->serv = serv;
@@ -242,24 +258,30 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -267,10 +289,10 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_posix_resolv_mtx);
+ nni_mtx_init(&resolv_mtx);
- if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&resolv_mtx);
return (rv);
}
return (0);
@@ -279,11 +301,11 @@ nni_posix_resolv_sysinit(void)
void
nni_posix_resolv_sysfini(void)
{
- if (nni_posix_resolv_tq != NULL) {
- nni_taskq_fini(nni_posix_resolv_tq);
- nni_posix_resolv_tq = NULL;
+ if (resolv_tq != NULL) {
+ nni_taskq_fini(resolv_tq);
+ resolv_tq = NULL;
}
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ nni_mtx_fini(&resolv_mtx);
}
#endif // NNG_USE_POSIX_RESOLV_GAI
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index cca8165c..0a521513 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -422,6 +422,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (pthread_self() == thr->tid);
+}
+
void
nni_atfork_child(void)
{
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 654d31e3..759bdb96 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -36,24 +36,29 @@
#endif
struct nni_plat_udp {
- nni_posix_pollq_node udp_pitem;
- int udp_fd;
- nni_list udp_recvq;
- nni_list udp_sendq;
- nni_mtx udp_mtx;
+ nni_posix_pfd *udp_pfd;
+ int udp_fd;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
};
static void
-nni_posix_udp_doclose(nni_plat_udp *udp)
+nni_posix_udp_doerror(nni_plat_udp *udp, int rv)
{
nni_aio *aio;
while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) ||
((aio = nni_list_first(&udp->udp_sendq)) != NULL)) {
nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, rv);
}
- // Underlying socket left open until close API called.
+}
+
+static void
+nni_posix_udp_doclose(nni_plat_udp *udp)
+{
+ nni_posix_udp_doerror(udp, NNG_ECLOSED);
}
static void
@@ -169,23 +174,22 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
// This function is called by the poller on activity on the FD.
static void
-nni_posix_udp_cb(void *arg)
+nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- int revents;
+ NNI_ASSERT(pfd == udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
- revents = udp->udp_pitem.revents;
- if (revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_udp_dorecv(udp);
}
- if (revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_udp_dosend(udp);
}
- if (revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_udp_doclose(udp);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&udp->udp_sendq)) {
events |= POLLOUT;
}
@@ -193,7 +197,11 @@ nni_posix_udp_cb(void *arg)
events |= POLLIN;
}
if (events) {
- nni_posix_pollq_arm(&udp->udp_pitem, events);
+ int rv;
+ rv = nni_posix_pfd_arm(udp->udp_pfd, events);
+ if (rv != 0) {
+ nni_posix_udp_doerror(udp, rv);
+ }
}
}
nni_mtx_unlock(&udp->udp_mtx);
@@ -232,22 +240,16 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
NNI_FREE_STRUCT(udp);
return (rv);
}
- udp->udp_pitem.fd = udp->udp_fd;
- udp->udp_pitem.cb = nni_posix_udp_cb;
- udp->udp_pitem.data = udp;
-
- (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK);
-
- nni_aio_list_init(&udp->udp_recvq);
- nni_aio_list_init(&udp->udp_sendq);
-
- if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) ||
- ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) {
+ if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) {
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
return (rv);
}
+ nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp);
+
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
*upp = udp;
return (0);
@@ -256,8 +258,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
void
nni_plat_udp_close(nni_plat_udp *udp)
{
- // We're no longer interested in events.
- nni_posix_pollq_fini(&udp->udp_pitem);
+ nni_posix_pfd_fini(udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
@@ -284,26 +285,46 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_list_first(&udp->udp_recvq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_list_first(&udp->udp_sendq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b3c4738f..0bd12b24 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -34,6 +34,7 @@ struct nni_plat_thr {
void (*func)(void *);
void * arg;
HANDLE handle;
+ DWORD id;
};
struct nni_plat_mtx {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index a3ae3748..5ead9cbc 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -155,11 +155,16 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&evt->mtx);
- nni_aio_schedule(aio, nni_win_event_cancel, evt);
+ if ((rv = nni_aio_schedule(aio, nni_win_event_cancel, evt)) != 0) {
+ nni_mtx_unlock(&evt->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c6376cc7..169a2e00 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -572,17 +572,22 @@ void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
- nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 070bf40d..2bc68c32 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -29,49 +29,55 @@
#define NNG_WIN_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_win_resolv_tq = NULL;
-static nni_mtx nni_win_resolv_mtx;
-
-typedef struct nni_win_resolv_item nni_win_resolv_item;
-struct nni_win_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *win_resolv_tq = NULL;
+static nni_mtx win_resolv_mtx;
+
+typedef struct win_resolv_item win_resolv_item;
+struct win_resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
+win_resolv_finish(win_resolv_item *item, int rv)
{
- nni_aio *aio = item->aio;
-
- nni_aio_set_prov_data(aio, NULL);
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
+ nni_aio *aio;
+
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nni_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ }
}
static void
-nni_win_resolv_cancel(nni_aio *aio, int rv)
+win_resolv_cancel(nni_aio *aio, int rv)
{
- nni_win_resolv_item *item;
+ win_resolv_item *item;
- nni_mtx_lock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_unlock(&win_resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ item->aio = NULL;
+ nni_mtx_unlock(&win_resolv_mtx);
nni_aio_finish_error(aio, rv);
}
static int
-nni_win_gai_errno(int rv)
+win_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -98,17 +104,26 @@ nni_win_gai_errno(int rv)
}
static void
-nni_win_resolv_task(void *arg)
+win_resolv_task(void *arg)
{
- nni_win_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ win_resolv_item *item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
results = NULL;
+ nni_mtx_lock(&win_resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&win_resolv_mtx);
+
// We treat these all as IP addresses. The service and the
// host part are split.
memset(&hints, 0, sizeof(hints));
@@ -124,7 +139,7 @@ nni_win_resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_win_gai_errno(rv);
+ rv = win_gai_errno(rv);
goto done;
}
@@ -142,7 +157,7 @@ nni_win_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nni_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -167,17 +182,18 @@ done:
if (results != NULL) {
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ win_resolv_finish(item, rv);
+ nni_mtx_unlock(&win_resolv_mtx);
}
static void
-nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
+win_resolv_ip(const char *host, const char *serv, int passive, int family,
int proto, nni_aio *aio)
{
- nni_win_resolv_item *item;
- int fam;
+ win_resolv_item *item;
+ int fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -202,8 +218,12 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_task_init(
- nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
+ rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
item->passive = passive;
item->name = host;
@@ -212,24 +232,30 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_aio_schedule(aio, nni_win_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&win_resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -237,10 +263,10 @@ nni_win_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_win_resolv_mtx);
+ nni_mtx_init(&win_resolv_mtx);
- if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_win_resolv_mtx);
+ if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&win_resolv_mtx);
return (rv);
}
return (0);
@@ -249,11 +275,11 @@ nni_win_resolv_sysinit(void)
void
nni_win_resolv_sysfini(void)
{
- if (nni_win_resolv_tq != NULL) {
- nni_taskq_fini(nni_win_resolv_tq);
- nni_win_resolv_tq = NULL;
+ if (win_resolv_tq != NULL) {
+ nni_taskq_fini(win_resolv_tq);
+ win_resolv_tq = NULL;
}
- nni_mtx_fini(&nni_win_resolv_mtx);
+ nni_mtx_fini(&win_resolv_mtx);
}
#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index a2ae72fe..2e9d58d7 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -107,6 +107,7 @@ static unsigned int __stdcall nni_plat_thr_main(void *arg)
{
nni_plat_thr *thr = arg;
+ thr->id = GetCurrentThreadId();
thr->func(thr->arg);
return (0);
}
@@ -138,6 +139,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (GetCurrentThreadId() == thr->id);
+}
+
static LONG plat_inited = 0;
int