aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/platform
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_aio.h3
-rw-r--r--src/platform/posix/posix_epdesc.c476
-rw-r--r--src/platform/posix/posix_pipedesc.c212
-rw-r--r--src/platform/posix/posix_pollq.h38
-rw-r--r--src/platform/posix/posix_pollq_epoll.c445
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c432
-rw-r--r--src/platform/posix/posix_pollq_poll.c532
-rw-r--r--src/platform/posix/posix_pollq_port.c288
-rw-r--r--src/platform/posix/posix_resolv_gai.c136
-rw-r--r--src/platform/posix/posix_thread.c6
-rw-r--r--src/platform/posix/posix_udp.c87
-rw-r--r--src/platform/windows/win_impl.h1
-rw-r--r--src/platform/windows/win_iocp.c7
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_resolv.c138
-rw-r--r--src/platform/windows/win_thread.c7
16 files changed, 1425 insertions, 1390 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 15d91db2..08ae99ca 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,11 +18,12 @@
// one of several possible different backends.
#include "core/nng_impl.h"
+#include "posix_pollq.h"
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
-extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
+extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, nni_posix_pfd *);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 7431dedf..0065806d 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -39,7 +39,7 @@
#endif
struct nni_posix_epdesc {
- nni_posix_pollq_node node;
+ nni_posix_pfd * pfd;
nni_list connectq;
nni_list acceptq;
bool closed;
@@ -53,10 +53,14 @@ struct nni_posix_epdesc {
nni_mtx mtx;
};
+static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
+static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
+
static void
-nni_posix_epdesc_cancel(nni_aio *aio, int rv)
+nni_epdesc_cancel(nni_aio *aio, int rv)
{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_pfd * pfd = NULL;
NNI_ASSERT(rv != 0);
nni_mtx_lock(&ed->mtx);
@@ -64,200 +68,136 @@ nni_posix_epdesc_cancel(nni_aio *aio, int rv)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
+ if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
+ ((pfd = ed->pfd) != NULL)) {
+ nni_posix_pfd_close(pfd);
+ }
nni_mtx_unlock(&ed->mtx);
}
static void
-nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
+nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
{
nni_posix_pipedesc *pd = NULL;
// acceptq or connectq.
nni_aio_list_remove(aio);
- if (rv == 0) {
- if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) {
- (void) close(newfd);
- }
- }
if (rv != 0) {
+ NNI_ASSERT(newpfd == NULL);
nni_aio_finish_error(aio, rv);
- } else {
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
+ return;
}
-}
-
-static void
-nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- socklen_t sz;
- int rv;
-
- // Note that normally there will only be a single connect AIO...
- // A socket that is here will have *initiated* with a connect()
- // call, which returned EINPROGRESS. When the connection attempt
- // is done, either way, the descriptor will be noted as writable.
- // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual
- // status of the connection attempt...
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- rv = -1;
- sz = sizeof(rv);
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) <
- 0) {
- rv = errno;
- }
- switch (rv) {
- case 0:
- // Success!
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, 0, ed->node.fd);
- ed->node.fd = -1;
- continue;
-
- case EINPROGRESS:
- // Still in progress... keep trying
- return;
- default:
- if (rv == ENOENT) {
- rv = ECONNREFUSED;
- }
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
- (void) close(ed->node.fd);
- ed->node.fd = -1;
- continue;
- }
+ NNI_ASSERT(newpfd != NULL);
+ if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
+ nni_posix_pfd_fini(newpfd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ nni_aio_set_output(aio, 0, pd);
+ nni_aio_finish(aio, 0, 0);
}
static void
-nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
+nni_epdesc_doaccept(nni_posix_epdesc *ed)
{
nni_aio *aio;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+
+ fd = nni_posix_pfd_fd(ed->pfd);
#ifdef NNG_USE_ACCEPT4
- newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC);
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
}
#else
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
#endif
-
- if (newfd >= 0) {
- // successful connection request!
- nni_posix_epdesc_finish(aio, 0, newfd);
- continue;
- }
-
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
- // Well, let's try later. Note that EWOULDBLOCK
- // is required by standards, but some platforms may
- // use EAGAIN. The values may be the same, so we
- // can't use switch.
- return;
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
+ if (rv != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
}
- if ((errno == ECONNABORTED) || (errno == ECONNRESET)) {
- // Let's just eat this one. Perhaps it may be
- // better to report it to the application, but we
- // think most applications don't want to see this.
- // Only someone with a packet trace is going to
- // notice this.
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_epdesc_finish(aio, rv, NULL);
continue;
}
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
- }
-}
-
-static void
-nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- int rv = 1;
- socklen_t sz = sizeof(rv);
-
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
- if (rv == 0) {
- return;
- }
- rv = nni_plat_errno(rv);
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ nni_epdesc_finish(aio, 0, pfd);
}
}
static void
-nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
+nni_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio *aio;
- int fd;
ed->closed = true;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
- nni_posix_pollq_remove(&ed->node);
+ if (ed->pfd != NULL) {
- if ((fd = ed->node.fd) != -1) {
+ nni_posix_pfd_close(ed->pfd);
+ }
+
+ // clean up stale UNIX socket when closing the server.
+ if ((ed->mode == NNI_EP_MODE_LISTEN) && (ed->loclen != 0) &&
+ (ed->locaddr.ss_family == AF_UNIX)) {
struct sockaddr_un *sun = (void *) &ed->locaddr;
- ed->node.fd = -1;
- (void) shutdown(fd, SHUT_RDWR);
- (void) close(fd);
- if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
- (void) unlink(sun->sun_path);
- }
+ (void) unlink(sun->sun_path);
}
}
static void
-nni_posix_epdesc_cb(void *arg)
+nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_epdesc *ed = arg;
- int events;
nni_mtx_lock(&ed->mtx);
-
- if (ed->node.revents & POLLIN) {
- nni_posix_epdesc_doaccept(ed);
- }
- if (ed->node.revents & POLLOUT) {
- nni_posix_epdesc_doconnect(ed);
- }
- if (ed->node.revents & (POLLERR | POLLHUP)) {
- nni_posix_epdesc_doerror(ed);
- }
- if (ed->node.revents & POLLNVAL) {
- nni_posix_epdesc_doclose(ed);
+ if (events & POLLNVAL) {
+ nni_epdesc_doclose(ed);
+ nni_mtx_unlock(&ed->mtx);
+ return;
}
+ NNI_ASSERT(pfd == ed->pfd);
- events = 0;
- if (!nni_list_empty(&ed->connectq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&ed->acceptq)) {
- events |= POLLIN;
- }
- if ((!ed->closed) && (events != 0)) {
- nni_posix_pollq_arm(&ed->node, events);
- }
+ // Anything else will turn up in accept.
+ nni_epdesc_doaccept(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -265,7 +205,7 @@ void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
nni_mtx_lock(&ed->mtx);
- nni_posix_epdesc_doclose(ed);
+ nni_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -276,9 +216,23 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
struct sockaddr_storage *ss;
int rv;
int fd;
+ nni_posix_pfd * pfd;
nni_mtx_lock(&ed->mtx);
+ if (ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ESTATE);
+ }
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ECLOSED);
+ }
+ if ((len = ed->loclen) == 0) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_EADDRINVAL);
+ }
+
ss = &ed->locaddr;
len = ed->loclen;
@@ -286,18 +240,17 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
nni_mtx_unlock(&ed->mtx);
return (nni_plat_errno(errno));
}
- (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
if (bind(fd, (struct sockaddr *) ss, len) < 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
@@ -314,7 +267,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if ((rv = chmod(sun->sun_path, perms)) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
}
@@ -324,27 +277,24 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if (listen(fd, 128) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
- ed->node.fd = fd;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- ed->node.fd = -1;
- nni_mtx_unlock(&ed->mtx);
- return (rv);
- }
+ ed->pfd = pfd;
ed->started = true;
nni_mtx_unlock(&ed->mtx);
+
return (0);
}
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
+ int rv;
+
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
@@ -354,14 +304,25 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
}
nni_mtx_lock(&ed->mtx);
+ if (!ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (ed->closed) {
nni_mtx_unlock(&ed->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ed->acceptq, aio);
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_posix_pollq_arm(&ed->node, POLLIN);
+ if (nni_list_first(&ed->acceptq) == aio) {
+ nni_epdesc_doaccept(ed);
+ }
nni_mtx_unlock(&ed->mtx);
}
@@ -370,79 +331,171 @@ nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = -1;
+
+ nni_mtx_lock(&ed->mtx);
+ if (ed->pfd != NULL) {
+ fd = nni_posix_pfd_fd(ed->pfd);
+ }
+ nni_mtx_unlock(&ed->mtx);
- if (getsockname(ed->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
}
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+static void
+nni_epdesc_connect_start(nni_posix_epdesc *ed)
{
- // NB: We assume that the FD is already set to nonblocking mode.
- int rv;
- int fd;
+ nni_posix_pfd *pfd;
+ int fd;
+ int rv;
+ nni_aio * aio;
- if (nni_aio_begin(aio) != 0) {
+loop:
+ if ((aio = nni_list_first(&ed->connectq)) == NULL) {
return;
}
- nni_mtx_lock(&ed->mtx);
+
+ NNI_ASSERT(ed->pfd == NULL);
+ if (ed->closed) {
+ nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
+ goto loop;
+ }
+ ed->started = true;
if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
}
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
+ }
// Possibly bind.
if ((ed->loclen != 0) &&
(bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
- ed->started = true;
- nni_posix_epdesc_finish(aio, 0, fd);
- nni_mtx_unlock(&ed->mtx);
- return;
+ nni_epdesc_finish(aio, 0, pfd);
+ goto loop;
}
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
if (errno == ENOENT) { // For UNIX domain sockets
- errno = ECONNREFUSED;
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
}
- rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ ed->pfd = pfd;
+ // all done... wait for this to signal via callback
+}
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
nni_aio_finish_error(aio, rv);
return;
}
- // We have to submit to the pollq, because the connection is pending.
- ed->node.fd = fd;
ed->started = true;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- ed->node.fd = -1;
+ nni_list_append(&ed->connectq, aio);
+ if (nni_list_first(&ed->connectq) == aio) {
+ // If there was a stale pfd (probably from an aborted or
+ // canceled connect attempt), discard it so we start fresh.
+ if (ed->pfd != NULL) {
+ nni_posix_pfd_fini(ed->pfd);
+ ed->pfd = NULL;
+ }
+ nni_epdesc_connect_start(ed);
+ }
+ nni_mtx_unlock(&ed->mtx);
+}
+
+static void
+nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_posix_epdesc *ed = arg;
+ nni_aio * aio;
+ socklen_t sz;
+ int rv;
+ int fd;
+
+ nni_mtx_lock(&ed->mtx);
+ if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
+ (pfd != ed->pfd)) {
+ // Spurious completion. Ignore it, but discard the PFD.
+ if (ed->pfd == pfd) {
+ ed->pfd = NULL;
+ }
+ nni_posix_pfd_fini(pfd);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ fd = nni_posix_pfd_fd(pfd);
+ sz = sizeof(rv);
+
+ if ((events & POLLNVAL) != 0) {
+ rv = EBADF;
+
+ } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+
+ switch (rv) {
+ case 0:
+ // Good connect!
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, 0, pfd);
+ break;
+ case EINPROGRESS: // still connecting... come back later
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
return;
+ default:
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
+ nni_posix_pfd_fini(pfd);
+ break;
}
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_aio_list_append(&ed->connectq, aio);
- nni_posix_pollq_arm(&ed->node, POLLOUT);
+ // Start another connect running, if any is waiting.
+ nni_epdesc_connect_start(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -450,7 +503,6 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
{
nni_posix_epdesc *ed;
- int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
return (NNG_ENOMEM);
@@ -458,28 +510,14 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
nni_mtx_init(&ed->mtx);
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the ed to a single pollq we may get some kind of cache warmth.
-
- ed->node.index = 0;
- ed->node.cb = nni_posix_epdesc_cb;
- ed->node.data = ed;
- ed->node.fd = -1;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
+ ed->pfd = NULL;
+ ed->closed = false;
+ ed->started = false;
+ ed->perms = 0; // zero means use default (no change)
+ ed->mode = mode;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
-
- if ((rv = nni_posix_pollq_init(&ed->node)) != 0) {
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
*edp = ed;
return (0);
}
@@ -532,14 +570,16 @@ nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- int fd;
+ nni_posix_pfd *pfd;
+
nni_mtx_lock(&ed->mtx);
- if ((fd = ed->node.fd) != -1) {
- (void) close(ed->node.fd);
- nni_posix_epdesc_doclose(ed);
- }
+ nni_epdesc_doclose(ed);
+ pfd = ed->pfd;
nni_mtx_unlock(&ed->mtx);
- nni_posix_pollq_fini(&ed->node);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index e7225395..b11036ea 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -40,11 +40,11 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq_node node;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
+ nni_posix_pfd *pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
};
static void
@@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
+ nni_posix_pfd_close(pd->pfd);
}
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
unsigned i;
@@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;
- n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
nni_aio_bump_count(aio, n);
// We completed the entire operation on this aioq.
+ // (Sendmsg never returns a partial result.)
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -147,6 +158,12 @@ static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
unsigned i;
@@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
}
- n = readv(pd->node.fd, iovec, niov);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
if (n == 0) {
@@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cb(void *arg)
+nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_pipedesc *pd = arg;
nni_mtx_lock(&pd->mtx);
- if (pd->node.revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_pipedesc_doread(pd);
}
- if (pd->node.revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_pipedesc_dowrite(pd);
}
- if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&pd->writeq)) {
events |= POLLOUT;
}
@@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg)
events |= POLLIN;
}
if ((!pd->closed) && (events != 0)) {
- nni_posix_pollq_arm(&pd->node, events);
+ nni_posix_pfd_arm(pfd, events);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_remove(&pd->node);
-
+ // NB: Events may still occur.
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
@@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->readq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
- // If we are only job on the list, go ahead and try to do an immediate
- // transfer. This allows for faster completions in many cases. We
- // also need not arm a list if it was already armed.
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
if (nni_list_first(&pd->readq) == aio) {
nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ nni_posix_pfd_arm(pd->pfd, POLLIN);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->writeq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ nni_posix_pfd_arm(pd->pfd, POLLOUT);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -349,9 +384,9 @@ int
nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
{
int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
@@ -361,61 +396,19 @@ int
nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
{
int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
}
int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
-{
- nni_posix_pipedesc *pd;
- int rv;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the pd to a single pollq we may get some kind of cache warmth.
-
- pd->closed = false;
- pd->node.fd = fd;
- pd->node.cb = nni_posix_pipedesc_cb;
- pd->node.data = pd;
-
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
- ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
- nni_mtx_fini(&pd->mtx);
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
- *pdp = pd;
- return (0);
-}
-
-int
nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
uint64_t *egid, uint64_t *prid, uint64_t *znid)
{
- int fd = pd->node.fd;
+ int fd = nni_posix_pfd_fd(pd->pfd);
#if defined(NNG_HAVE_GETPEEREID)
uid_t uid;
gid_t gid;
@@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
}
*euid = xu.cr_uid;
*egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID...
+ *prid = (uint64_t) -1; // XXX: macOS has undocumented
+ // LOCAL_PEERPID...
*znid = (uint64_t) -1;
return (0);
#else
@@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
#endif
}
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
+{
+ nni_posix_pipedesc *pd;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ pd->closed = false;
+ pd->pfd = pfd;
+
+ nni_mtx_init(&pd->mtx);
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
+
+ *pdp = pd;
+ return (0);
+}
+
void
nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
- // Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_fini(&pd->node);
- if (pd->node.fd >= 0) {
- (void) close(pd->node.fd);
- }
-
+ nni_posix_pfd_fini(pd->pfd);
+ pd->pfd = NULL;
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 2c855da1..b9786330 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -22,32 +22,18 @@
#include "core/nng_impl.h"
#include <poll.h>
-typedef struct nni_posix_pollq_node nni_posix_pollq_node;
-typedef struct nni_posix_pollq nni_posix_pollq;
-
-struct nni_posix_pollq_node {
- nni_list_node node; // linkage into the pollq list
- nni_posix_pollq *pq; // associated pollq
- int index; // used by the poller impl
- int armed; // used by the poller impl
- int fd; // file descriptor to poll
- int events; // events to watch for
- int revents; // events received
- void * data; // user data
- nni_cb cb; // user callback on event
- nni_mtx mx;
- nni_cv cv;
-};
-
-extern nni_posix_pollq *nni_posix_pollq_get(int);
-extern int nni_posix_pollq_sysinit(void);
-extern void nni_posix_pollq_sysfini(void);
-
-extern int nni_posix_pollq_init(nni_posix_pollq_node *);
-extern void nni_posix_pollq_fini(nni_posix_pollq_node *);
-extern int nni_posix_pollq_add(nni_posix_pollq_node *);
-extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
-extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
+typedef struct nni_posix_pfd nni_posix_pfd;
+typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, int, void *);
+
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
+
+extern int nni_posix_pfd_init(nni_posix_pfd **, int);
+extern void nni_posix_pfd_fini(nni_posix_pfd *);
+extern int nni_posix_pfd_arm(nni_posix_pfd *, int);
+extern int nni_posix_pfd_fd(nni_posix_pfd *);
+extern void nni_posix_pfd_close(nni_posix_pfd *);
+extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 0f6867da..a8d8693a 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -12,6 +12,7 @@
#ifdef NNG_HAVE_EPOLL
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -22,179 +23,215 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+#ifndef EFD_CLOEXEC
+#define EFD_CLOEXEC 0
+#endif
+#ifndef EFD_NONBLOCK
+#define EFD_NONBLOCK 0
+#endif
+
#define NNI_MAX_EPOLL_EVENTS 64
// flags we always want enabled as long as at least one event is active
#define NNI_EPOLL_FLAGS (EPOLLONESHOT | EPOLLERR | EPOLLHUP)
+// Locking strategy:
+//
+// The pollq mutex protects its own reapq, close state, and the close
+// state of the individual pfds. It also protects the pfd cv, which is
+// only signaled when the pfd is closed. This mutex is only acquired
+// when shutting down the pollq, or closing a pfd. For normal hot-path
+// operations we don't need it.
+//
+// The pfd mutex protects the pfd's own "closing" flag (test and set),
+// the callback and arg, and its event mask. This mutex is used a lot,
+// but it should be uncontended excepting possibly when closing.
+
// nni_posix_pollq is a work structure that manages state for the epoll-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int epfd; // epoll handle
- int evfd; // event fd
- bool close; // request for worker to exit
- bool started;
- nni_idhash * nodes;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int epfd; // epoll handle
+ int evfd; // event fd (to wake us for other stuff)
+ bool close; // request for worker to exit
+ nni_thr thr; // worker thread
+ nni_list reapq;
+};
+
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ nni_list_node node;
+ int fd;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ bool closed;
+ bool closing;
+ bool reap;
+ int events;
+ nni_mtx mtx;
+ nni_cv cv;
};
+// single global instance for now.
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
- int rv;
+ nni_posix_pfd * pfd;
nni_posix_pollq * pq;
struct epoll_event ev;
- uint64_t id;
-
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ int rv;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((rv = nni_idhash_alloc(pq->nodes, &id, node)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
- node->index = (int) id;
- node->pq = pq;
- node->events = 0;
+ pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->events = 0;
+ pfd->closing = false;
+ pfd->closed = false;
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pfd->node);
// notifications disabled to begin with
ev.events = 0;
- ev.data.u64 = id;
+ ev.data.ptr = pfd;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, node->fd, &ev);
- if (rv != 0) {
+ if ((rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev)) != 0) {
rv = nni_plat_errno(errno);
- nni_idhash_remove(pq->nodes, id);
- node->index = 0;
- node->pq = NULL;
+ nni_cv_fini(&pfd->cv);
+ NNI_FREE_STRUCT(pfd);
+ return (rv);
}
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ *pfdp = pfd;
+ return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- int rv;
- struct epoll_event ev;
-
- node->events = 0;
- node->pq = NULL;
-
- ev.events = 0;
- ev.data.u64 = (uint64_t) node->index;
-
- if (node->index != 0) {
- // This deregisters the node. If the poller was blocked
- // then this keeps it from coming back in to find us.
- nni_idhash_remove(pq->nodes, (uint64_t) node->index);
- }
-
- // NB: EPOLL_CTL_DEL actually *ignores* the event, but older Linux
- // versions need it to be non-NULL.
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_DEL, node->fd, &ev);
- if (rv != 0) {
- NNI_ASSERT(errno == EBADF || errno == ENOENT);
+ nni_posix_pollq *pq = pfd->pq;
+
+ // NB: We depend on epoll event flags being the same as their POLLIN
+ // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so
+ // forth. This turns out to be true both for Linux and the illumos
+ // epoll implementation.
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ struct epoll_event ev;
+ pfd->events |= events;
+ events = pfd->events;
+
+ ev.events = events | NNI_EPOLL_FLAGS;
+ ev.data.ptr = pfd;
+
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ return (pfd->fd);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- node->index = 0;
- return (0);
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_close(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ nni_posix_pollq * pq = pfd->pq;
+ struct epoll_event ev; // Not actually used.
+ pfd->closing = true;
+
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
}
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_unlock(&pfd->mtx);
}
void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- int rv;
- struct epoll_event ev;
- nni_posix_pollq * pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ // We have to synchronize with the pollq thread (unless we are
+ // on that thread!)
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ uint64_t one = 1;
+
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pfd);
+
+ // Wake the remote side. For now we assume this always
+ // succeeds. The only failure modes here occur when we
+ // have already excessively signaled this (2^64 times
+ // with no read!!), or when the evfd is closed, or some
+ // kernel bug occurs. Those errors would manifest as
+ // a hang waiting for the poller to reap the pfd in fini,
+ // if it were possible for them to occur. (Barring other
+ // bugs, it isn't.)
+ (void) write(pq->evfd, &one, sizeof(one));
+
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_lock(&pq->mtx);
+ // We're exclusive now.
- node->events |= events;
- ev.events = node->events | NNI_EPOLL_FLAGS;
- ev.data.u64 = (uint64_t) node->index;
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev);
- NNI_ASSERT(rv == 0);
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pfd;
+ nni_mtx_lock(&pq->mtx);
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ // Let fini know we're done with it, and it's safe to
+ // remove.
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -204,101 +241,74 @@ nni_posix_poll_thr(void *arg)
nni_posix_pollq * pq = arg;
struct epoll_event events[NNI_MAX_EPOLL_EVENTS];
- nni_mtx_lock(&pq->mtx);
-
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
-
- nevents =
- epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ for (;;) {
+ int n;
+ bool reap = false;
- nni_mtx_lock(&pq->mtx);
-
- if (nevents <= 0) {
- continue;
+ n = epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ if ((n < 0) && (errno == EBADF)) {
+ // Epoll fd closed, bail.
+ return;
}
// dispatch events
- for (i = 0; i < nevents; ++i) {
+ for (int i = 0; i < n; ++i) {
const struct epoll_event *ev;
- nni_posix_pollq_node * node;
ev = &events[i];
// If the waker pipe was signaled, read from it.
- if ((ev->data.u64 == 0) && (ev->events & POLLIN)) {
- int rv;
+ if ((ev->data.ptr == NULL) && (ev->events & POLLIN)) {
uint64_t clear;
- rv = read(pq->evfd, &clear, sizeof(clear));
- NNI_ASSERT(rv == sizeof(clear));
- continue;
- }
-
- if (nni_idhash_find(pq->nodes, ev->data.u64,
- (void **) &node) != 0) {
- // node was removed while we were blocking
- continue;
+ (void) read(pq->evfd, &clear, sizeof(clear));
+ reap = true;
+ } else {
+ nni_posix_pfd * pfd = ev->data.ptr;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ int events;
+
+ events = ev->events &
+ (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events &= ~events;
+ cb = pfd->cb;
+ arg = pfd->arg;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // Execute the callback with lock released
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
}
+ }
- node->revents = ev->events &
- (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
+ if (reap) {
+ nni_posix_pollq_reap(pq);
nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ if (pq->close) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
}
+ nni_mtx_unlock(&pq->mtx);
}
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- int rv;
- uint64_t wakeval = 1;
+ uint64_t one = 1;
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- rv = write(pq->evfd, &wakeval, sizeof(wakeval));
- NNI_ASSERT(rv == sizeof(wakeval));
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
+ nni_mtx_lock(&pq->mtx);
+ pq->close = true;
+ (void) write(pq->evfd, &one, sizeof(one));
+ nni_mtx_unlock(&pq->mtx);
- if (pq->evfd >= 0) {
- close(pq->evfd);
- pq->evfd = -1;
- }
+ nni_thr_fini(&pq->thr);
+ close(pq->evfd);
close(pq->epfd);
- pq->epfd = -1;
-
- if (pq->nodes != NULL) {
- nni_idhash_fini(pq->nodes);
- }
nni_mtx_fini(&pq->mtx);
}
@@ -308,22 +318,25 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
{
// add event fd so we can wake ourself on exit
struct epoll_event ev;
- int rv;
+ int fd;
memset(&ev, 0, sizeof(ev));
- pq->evfd = eventfd(0, EFD_NONBLOCK);
- if (pq->evfd == -1) {
+ if ((fd = eventfd(0, EFD_NONBLOCK)) < 0) {
return (nni_plat_errno(errno));
}
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ // This is *NOT* one shot. We want to wake EVERY single time.
ev.events = EPOLLIN;
- ev.data.u64 = 0;
+ ev.data.ptr = 0;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pq->evfd, &ev);
- if (rv != 0) {
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ (void) close(fd);
return (nni_plat_errno(errno));
}
+ pq->evfd = fd;
return (0);
}
@@ -332,40 +345,30 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- if ((pq->epfd = epoll_create1(0)) < 0) {
+ if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) {
return (nni_plat_errno(errno));
}
- pq->evfd = -1;
pq->close = false;
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
- if (((rv = nni_idhash_init(&pq->nodes)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
- ((rv = nni_posix_pollq_add_eventfd(pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
+ (void) close(pq->epfd);
+ nni_mtx_fini(&pq->mtx);
+ return (rv);
+ }
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ (void) close(pq->epfd);
+ (void) close(pq->evfd);
+ nni_mtx_fini(&pq->mtx);
return (rv);
}
-
- // Positive values only for node indices. (0 is reserved for eventfd).
- nni_idhash_set_limits(pq->nodes, 1, 0x7FFFFFFFu, 1);
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 0f312170..36ced3ff 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -12,196 +12,203 @@
#ifdef NNG_HAVE_KQUEUE
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
#include <sys/event.h>
+#include <sys/socket.h>
#include <unistd.h>
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-// TODO: can this be feature detected in cmake,
-// rather than relying on platform?
-#if defined NNG_PLATFORM_NETBSD
-#define kevent_udata_t intptr_t
-#else
-#define kevent_udata_t void *
-#endif
-
-#define NNI_MAX_KQUEUE_EVENTS 64
-
-// user event id used to shutdown the polling thread
-#define NNI_KQ_EV_EXIT_ID 0xF
+typedef struct nni_posix_pollq nni_posix_pollq;
// nni_posix_pollq is a work structure that manages state for the kqueue-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int kq; // kqueue handle
- bool close; // request for worker to exit
- bool started;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
+struct nni_posix_pfd {
+ nni_list_node node; // linkage into the reap list
+ nni_posix_pollq *pq; // associated pollq
+ int fd; // file descriptor to poll
+ void * data; // user data
+ nni_posix_pfd_cb cb; // user callback on event
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
+ int events;
+ bool closing;
+ bool closed;
+};
+
+#define NNI_MAX_KQUEUE_EVENTS 64
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
+ nni_posix_pfd * pf;
nni_posix_pollq *pq;
- struct kevent kevents[2];
+ struct kevent ev[2];
+ unsigned flags = EV_ADD | EV_DISABLE;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
+ if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
-
- node->pq = pq;
- node->events = 0;
-
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ // Create entries in the kevent queue, without enabling them.
+ EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
+ EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
-
- if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) {
- nni_mtx_unlock(&pq->mtx);
+ // We update the kqueue list, without polling for events.
+ if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ NNI_FREE_STRUCT(pf);
return (nni_plat_errno(errno));
}
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->pq = pq;
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
- nni_mtx_unlock(&pq->mtx);
return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+void
+nni_posix_pfd_close(nni_posix_pfd *pf)
{
- struct kevent kevents[2];
-
- node->events = 0;
- node->pq = NULL;
+ nni_posix_pollq *pq = pf->pq;
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0,
- (kevent_udata_t) node);
-
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0,
- 0, (kevent_udata_t) node);
-
- // So it turns out that we can get EBADF, ENOENT, and apparently
- // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting
- // an event, and its harmless if the event removal fails (worst
- // case would be a spurious wakeup), so lets ignore it.
- (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL);
+ nni_mtx_lock(&pq->mtx);
+ if (!pf->closing) {
+ struct kevent ev[2];
+ pf->closing = true;
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq;
- if (pq == NULL) {
- return;
- }
+ pq = pf->pq;
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
+ nni_posix_pfd_close(pf);
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ if (!nni_thr_is_self(&pq->thr)) {
+ struct kevent ev;
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pf);
+ EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL);
+
+ // If this fails, the cleanup will stall. That should
+ // only occur in a memory pressure situation, and it
+ // will self-heal when the next event comes in.
+ (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
+ while (!pf->closed) {
+ nni_cv_wait(&pf->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_unlock(&pq->mtx);
+
+ (void) close(pf->fd);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
+ NNI_FREE_STRUCT(pf);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+nni_posix_pfd_fd(nni_posix_pfd *pf)
{
- NNI_ARG_UNUSED(node);
- return (0);
+ return (pf->fd);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+ nni_mtx_lock(&pf->mtx);
+ pf->cb = cb;
+ pf->data = arg;
+ nni_mtx_unlock(&pf->mtx);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pf, int events)
{
- nni_posix_pollq *pq = node->pq;
- struct kevent kevents[2];
- int nevents = 0;
+ struct kevent ev[2];
+ int nev = 0;
+ unsigned flags = EV_ENABLE | EV_DISPATCH;
+ nni_posix_pollq *pq = pf->pq;
+
+ nni_mtx_lock(&pf->mtx);
+ if (pf->closing) {
+ events = 0;
+ } else {
+ pf->events |= events;
+ events = pf->events;
+ }
+ nni_mtx_unlock(&pf->mtx);
- NNI_ASSERT(pq != NULL);
if (events == 0) {
- return;
+ // No events, and kqueue is oneshot, so nothing to do.
+ return (0);
}
- nni_mtx_lock(&pq->mtx);
-
- if (!(node->events & POLLIN) && (events & POLLIN)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLIN) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
-
- if (!(node->events & POLLOUT) && (events & POLLOUT)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLOUT) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
-
- if (nevents > 0) {
- // This call should never fail, really. The only possible
- // legitimate failure would be ENOMEM, but in that case
- // lots of other things are going to be failing, or ENOENT
- // or ESRCH, indicating we already lost interest; the
- // only consequence of ignoring these errors is that a given
- // descriptor might appear "stuck". This beats the alternative
- // of just blithely crashing the application with an assertion.
- (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
- node->events |= events;
+ while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
}
+ return (0);
+}
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pf;
+ nni_mtx_lock(&pq->mtx);
+ while ((pf = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pf);
+ pf->closed = true;
+ nni_cv_wake(&pf->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -209,117 +216,71 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
- struct kevent kevents[NNI_MAX_KQUEUE_EVENTS];
-
- nni_mtx_lock(&pq->mtx);
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
- nevents = kevent(
- pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL);
- nni_mtx_lock(&pq->mtx);
-
- if (nevents < 0) {
- continue;
+ for (;;) {
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd * pf;
+ nni_posix_pfd_cb cb;
+ void * cbarg;
+ int revents;
+ bool reap = false;
+
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
+ if (n < 0) {
+ if (errno == EBADF) {
+ nni_posix_pollq_reap(pq);
+ return;
+ }
+ reap = true;
}
- // dispatch events
- for (i = 0; i < nevents; ++i) {
- struct kevent ev_disable;
- const struct kevent * ev;
- nni_posix_pollq_node *node;
+ for (int i = 0; i < n; i++) {
+ struct kevent *ev = &evs[i];
- ev = &kevents[i];
- if (ev->filter == EVFILT_USER &&
- ev->ident == NNI_KQ_EV_EXIT_ID) {
- // we've woken up to exit the polling thread
+ switch (ev->filter) {
+ case EVFILT_READ:
+ revents = POLLIN;
break;
- }
-
- node = (nni_posix_pollq_node *) ev->udata;
- if (node->pq == NULL) {
- // node was removed while we were blocking
+ case EVFILT_WRITE:
+ revents = POLLOUT;
+ break;
+ case EVFILT_USER:
+ default:
+ reap = true;
continue;
}
- node->revents = 0;
-
+ pf = (void *) ev->udata;
if (ev->flags & (EV_ERROR | EV_EOF)) {
- node->revents |= POLLHUP;
- }
- if (ev->filter == EVFILT_WRITE) {
- node->revents |= POLLOUT;
- } else if (ev->filter == EVFILT_READ) {
- node->revents |= POLLIN;
- } else {
- NNI_ASSERT(false); // unhandled filter
- break;
+ revents |= POLLHUP;
}
- // explicitly disable this event. we'd ideally rely on
- // the behavior of EV_DISPATCH to do this,
- // but that only happens once we've acknowledged the
- // event by reading/or writing the fd. because there
- // can currently be some latency between the time we
- // receive this event and the time we read/write in
- // response, disable the event in the meantime to avoid
- // needless wakeups.
- // revisit if we're able to reduce/remove this latency.
- EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter,
- EV_DISABLE, 0, 0, NULL);
- // this will only fail if the fd is already
- // closed/invalid which we don't mind anyway,
- // so ignore return value.
- (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pf->mtx);
+ cb = pf->cb;
+ cbarg = pf->data;
+ pf->events &= ~(revents);
+ nni_mtx_unlock(&pf->mtx);
+
+ if (cb != NULL) {
+ cb(pf, revents, cbarg);
}
}
+ if (reap) {
+ nni_posix_pollq_reap(pq);
+ }
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE,
- NOTE_TRIGGER, 0, NULL);
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
-
if (pq->kq >= 0) {
close(pq->kq);
pq->kq = -1;
}
+ nni_thr_fini(&pq->thr);
+
+ nni_posix_pollq_reap(pq);
nni_mtx_fini(&pq->mtx);
}
@@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
static int
nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
{
- // add user event so we can wake ourself on exit
+ int rv;
struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL);
- return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL)));
+
+ EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ return (0);
}
static int
@@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (nni_plat_errno(errno));
}
- pq->close = false;
-
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
(rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
@@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (rv);
}
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
+
return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index efc9ff48..26753df6 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -11,8 +11,6 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-#ifdef NNG_USE_POSIX_POLLQ_POLL
-
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
@@ -32,306 +30,285 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
+
+// Locking strategy: We use the pollq lock to guard the lists on the pollq,
+// the nfds (which counts the number of items in the pollq), the pollq
+// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
+// use a lock on the pfd only to protect the the events field (which we treat
+// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
+// lock is therefore a leaf lock, which is sometimes acquired while holding
+// the pq lock. Every reasonable effort is made to minimize holding locks.
+// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
+// persists until the pfd is destroyed.)
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd * fds;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list polled; // polled nodes
- nni_list armed; // armed nodes
- nni_list idle; // idle nodes
- int nnodes; // num of nodes in nodes list
- int inpoll; // poller asleep in poll
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int nfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ bool closing; // request for worker to exit
+ bool closed;
+ nni_thr thr; // worker thread
+ nni_list pollq; // armed nodes
+ nni_list reapq;
};
-static int
-nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
-{
- int grow = pq->nnodes + 2; // one for us, one for waker
- struct pollfd *newfds;
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_list_node node;
+ nni_cv cv;
+ nni_mtx mtx;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
- if (grow < pq->nfds) {
- return (0);
+int
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+{
+ nni_posix_pfd * pfd;
+ nni_posix_pollq *pq = &nni_posix_global_pollq;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ // If this code is getting used, you really should be using the
+ // kqueue poller, or you need to upgrade your older system.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
+
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
+ NNI_LIST_NODE_INIT(&pfd->node);
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->pq = pq;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closing) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+ return (NNG_ECLOSED);
+ }
+ nni_list_append(&pq->pollq, pfd);
+ pq->nfds++;
+ nni_mtx_unlock(&pq->mtx);
+ *pfdp = pfd;
+ return (0);
+}
- grow = grow + 128;
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
+{
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
+}
- if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) {
- return (NNG_ENOMEM);
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
+{
+ return (pfd->fd);
+}
+
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
+
+ nni_mtx_lock(&pq->mtx);
+ if (nni_list_active(&pq->pollq, pfd)) {
+ nni_list_remove(&pq->pollq, pfd);
+ pq->nfds--;
}
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_list_append(&pq->reapq, pfd);
+ nni_plat_pipe_raise(pq->wakewfd);
+ while (nni_list_active(&pq->reapq, pfd)) {
+ nni_cv_wait(&pfd->cv);
+ }
}
- pq->fds = newfds;
- pq->nfds = grow;
+ nni_mtx_unlock(&pq->mtx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
+
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events |= events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if (!nni_thr_is_self(&pq->thr)) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq * pollq = arg;
- nni_posix_pollq_node *node;
+ nni_posix_pollq *pq = arg;
+ int nalloc = 0;
+ struct pollfd * fds = NULL;
+ nni_posix_pfd ** pfds = NULL;
- nni_mtx_lock(&pollq->mtx);
for (;;) {
- int rv;
int nfds;
- struct pollfd *fds;
+ int events;
+ nni_posix_pfd *pfd;
- if (pollq->close) {
- break;
+ nni_mtx_lock(&pq->mtx);
+ while (nalloc < (pq->nfds + 1)) {
+ int n = pq->nfds + 128;
+
+ // Drop the lock while we sleep or allocate. This
+ // allows additional items to be added or removed (!)
+ // while we wait.
+ nni_mtx_unlock(&pq->mtx);
+
+ // Toss the old ones first; avoids *doubling* memory
+ // consumption during alloc.
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
+ nalloc = 0;
+
+ if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
+ nni_msleep(10); // sleep for a bit, try later
+ } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
+ NNI_FREE_STRUCTS(pfds, n);
+ nni_msleep(10);
+ } else {
+ nalloc = n;
+ }
+ nni_mtx_lock(&pq->mtx);
}
- fds = pollq->fds;
- nfds = 0;
-
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
- fds[nfds].fd = pollq->wakerfd;
- fds[nfds].events = POLLIN;
- fds[nfds].revents = 0;
- nfds++;
+ fds[0].fd = pq->wakerfd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ pfds[0] = NULL;
+ nfds = 1;
+
+ // Also lets reap anything that was in the reaplist!
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ nni_cv_wake(&pfd->cv);
+ }
- // Set up the poll list.
- while ((node = nni_list_first(&pollq->armed)) != NULL) {
- nni_list_remove(&pollq->armed, node);
- nni_list_append(&pollq->polled, node);
- fds[nfds].fd = node->fd;
- fds[nfds].events = node->events;
- fds[nfds].revents = 0;
- node->index = nfds;
- nfds++;
+ // If we're closing down, bail now. This is done *after* we
+ // have ensured that the reapq is empty. Anything still in
+ // the pollq is not going to receive further callbacks.
+ if (pq->closing) {
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ break;
}
- // Now poll it. We block indefinitely, since we use separate
- // timeouts to wake and remove the elements from the list.
- pollq->inpoll = 1;
- nni_mtx_unlock(&pollq->mtx);
- rv = poll(fds, nfds, -1);
- nni_mtx_lock(&pollq->mtx);
- pollq->inpoll = 0;
-
- if (rv < 0) {
- // This shouldn't happen really. If it does, we
- // just try again. (EINTR is probably the only
- // reasonable failure here, unless internal memory
- // allocations fail in the kernel, leading to EAGAIN.)
- continue;
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pq->pollq, pfd) {
+
+ nni_mtx_lock(&pfd->mtx);
+ events = pfd->events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ if (events != 0) {
+ fds[nfds].fd = pfd->fd;
+ fds[nfds].events = events;
+ fds[nfds].revents = 0;
+ pfds[nfds] = pfd;
+ nfds++;
+ }
}
+ nni_mtx_unlock(&pq->mtx);
+
+ // We could get the result from poll, and avoid iterating
+ // over the entire set of pollfds, but since on average we
+ // will be walking half the list, doubling the work we do
+ // (the condition with a potential pipeline stall) seems like
+ // adding complexity with no real benefit. It also makes the
+ // worst case even worse.
+ (void) poll(fds, nfds, -1);
// If the waker pipe was signaled, read from it.
if (fds[0].revents & POLLIN) {
- NNI_ASSERT(fds[0].fd == pollq->wakerfd);
- nni_plat_pipe_clear(pollq->wakerfd);
+ NNI_ASSERT(fds[0].fd == pq->wakerfd);
+ nni_plat_pipe_clear(pq->wakerfd);
}
- while ((node = nni_list_first(&pollq->polled)) != NULL) {
- int index = node->index;
-
- // We remove ourselves from the polled list, and
- // then put it on either the idle or armed list
- // depending on whether it remains armed.
- node->index = 0;
- nni_list_remove(&pollq->polled, node);
- NNI_ASSERT(index > 0);
- if (fds[index].revents == 0) {
- // If still watching for events, return it
- // to the armed list.
- if (node->events) {
- nni_list_append(&pollq->armed, node);
- } else {
- nni_list_append(&pollq->idle, node);
- }
- continue;
- }
+ for (int i = 1; i < nfds; i++) {
+ if ((events = fds[i].revents) != 0) {
+ nni_posix_pfd_cb cb;
+ void * arg;
- // We are calling the callback, so disarm
- // all events; the node can rearm them in its
- // callback.
- node->revents = fds[index].revents;
- node->events &= ~node->revents;
- if (node->events == 0) {
- nni_list_append(&pollq->idle, node);
- } else {
- nni_list_append(&pollq->armed, node);
- }
+ pfd = pfds[i];
+
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->arg;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pollq->active = node;
-
- // Execute the callback -- without locks held.
- nni_mtx_unlock(&pollq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pollq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pollq->active = NULL;
- if (pollq->wait == node) {
- pollq->wait = NULL;
- nni_cv_wake(&pollq->cv);
+ if (cb) {
+ cb(pfd, events, arg);
+ }
}
}
}
- nni_mtx_unlock(&pollq->mtx);
-}
-
-int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
-{
- int rv;
- nni_posix_pollq *pq;
-
- NNI_ASSERT(!nni_list_node_active(&node->node));
- pq = nni_posix_pollq_get(node->fd);
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
-
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
- node->pq = pq;
- if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
- }
- pq->nnodes++;
- nni_list_append(&pq->idle, node);
- nni_mtx_unlock(&pq->mtx);
- return (0);
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
nni_mtx_lock(&pq->mtx);
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
+ pq->closing = 1;
nni_mtx_unlock(&pq->mtx);
-}
-
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
-}
-
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
-void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
-{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
-}
+ nni_plat_pipe_raise(pq->wakewfd);
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
-{
- nni_posix_pollq *pq = node->pq;
- int oevents;
-
- NNI_ASSERT(pq != NULL);
-
- nni_mtx_lock(&pq->mtx);
- oevents = node->events;
- node->events |= events;
-
- // We move this to the armed list if its not armed, or already
- // on the polled list. The polled list would be the case where
- // the index is set to a positive value.
- if ((oevents == 0) && (events != 0) && (node->index < 1)) {
- nni_list_node_remove(&node->node);
- nni_list_append(&pq->armed, node);
- }
- if ((events != 0) && (oevents != events)) {
- // Possibly wake up poller since we're looking for new events.
- if (pq->inpoll) {
- nni_plat_pipe_raise(pq->wakewfd);
- }
- }
- nni_mtx_unlock(&pq->mtx);
-}
-
-static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
-{
- if (pq->started) {
- nni_mtx_lock(&pq->mtx);
- pq->close = 1;
- pq->started = 0;
- nni_plat_pipe_raise(pq->wakewfd);
- nni_mtx_unlock(&pq->mtx);
- }
nni_thr_fini(&pq->thr);
-
- // All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->polled));
- NNI_ASSERT(nni_list_empty(&pq->armed));
- NNI_ASSERT(nni_list_empty(&pq->idle));
- NNI_ASSERT(pq->nnodes == 0);
-
- if (pq->wakewfd >= 0) {
- nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
- pq->wakewfd = pq->wakerfd = -1;
- }
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
- pq->fds = NULL;
- pq->nfds = 0;
- }
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
nni_mtx_fini(&pq->mtx);
}
@@ -340,50 +317,27 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
- pq->wakewfd = -1;
- pq->wakerfd = -1;
- pq->close = 0;
-
- nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ pq->closing = false;
+ pq->closed = false;
- if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
- ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);
}
- pq->started = 1;
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ return (rv);
+ }
+ nni_mtx_init(&pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
-// We use a single pollq for the entire system, which means only a single
-// thread is polling. This may be somewhat less than optimally efficient,
-// and it may be worth investigating having multiple threads to improve
-// efficiency and scalability. (This would shorten the linked lists,
-// improving C10K scalability, and also allow us to engage multiple cores.)
-// It's not entirely clear how many threads are "optimal".
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- // This is the point where we could choose a pollq based on FD.
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
- int rv;
-
- rv = nni_posix_pollq_create(&nni_posix_global_pollq);
- return (rv);
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
void
@@ -391,5 +345,3 @@ nni_posix_pollq_sysfini(void)
{
nni_posix_pollq_destroy(&nni_posix_global_pollq);
}
-
-#endif // NNG_USE_POSIX_POLLQ_POLL
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 7231cb27..63c1d1d1 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -1,7 +1,6 @@
//
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Liam Staskawicz <liam@stask.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -12,7 +11,9 @@
#ifdef NNG_HAVE_PORT_CREATE
#include <errno.h>
+#include <fcntl.h>
#include <port.h>
+#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -21,6 +22,9 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+#define NNI_MAX_PORTEV 64
+typedef struct nni_posix_pollq nni_posix_pollq;
+
// nni_posix_pollq is a work structure that manages state for the port-event
// based pollq implementation. We only really need to keep track of the
// single thread, and the associated port itself.
@@ -29,190 +33,178 @@ struct nni_posix_pollq {
nni_thr thr; // worker thread
};
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_mtx mtx;
+ nni_cv cv;
+ int events;
+ bool closed;
+ bool closing;
+ nni_posix_pfd_cb cb;
+ void * data;
+};
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
nni_posix_pollq *pq;
+ nni_posix_pfd * pfd;
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&node->mx);
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- nni_mtx_unlock(&node->mx);
- return (NNG_ESTATE);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- node->pq = pq;
- node->events = 0;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
-
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pfd->mtx);
+ pfd->closed = false;
+ pfd->closing = false;
+ pfd->fd = fd;
+ pfd->pq = pq;
+ pfd->cb = NULL;
+ pfd->data = NULL;
+ *pfdp = pfd;
return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ return (pfd->fd);
+}
- if (pq == NULL) {
- return;
- }
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- node->events = 0;
- if (node->armed) {
- // Failure modes that can occur are uninteresting.
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->closing = true;
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
}
- nni_mtx_unlock(&node->mx);
-}
+ nni_mtx_unlock(&pfd->mtx);
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- nni_mtx_init(&node->mx);
- nni_cv_init(&node->cv, &node->mx);
- node->pq = NULL;
- node->armed = false;
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
+ // Send the wake event to the poller to synchronize with it.
+ // Note that port_send should only really fail if out of memory
+ // or we run into a resource limit.
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the node is removed properly.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- if ((pq = node->pq) != NULL) {
- // Dissociate the port; if it isn't already associated we
- // don't care. (An extra syscall, but it should not matter.)
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_posix_pfd_close(pfd);
- for (;;) {
- if (port_send(pq->port, 0, node) == 0) {
- break;
- }
- switch (errno) {
- case EAGAIN:
- case ENOMEM:
- // Resource exhaustion.
- // Best bet in these cases is to sleep it off.
- // This may appear like a total application
- // hang, but by sleeping here maybe we give
- // a chance for things to clear up.
- nni_mtx_unlock(&node->mx);
- nni_msleep(5000);
- nni_mtx_lock(&node->mx);
- continue;
- case EBADFD:
- case EBADF:
- // Most likely these indicate that the pollq
- // itself has been closed. That's ok.
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ while (port_send(pq->port, 1, pfd) != 0) {
+ if ((errno == EBADF) || (errno == EBADFD)) {
+ pfd->closed = true;
break;
}
+ sched_yield(); // try again later...
}
- // Wait for the pollq thread to tell us with certainty that
- // they are done. This is needed to ensure that the pollq
- // thread isn't executing (or about to execute) the callback
- // before we destroy it.
- while (node->pq != NULL) {
- nni_cv_wait(&node->cv);
+
+ nni_mtx_lock(&pfd->mtx);
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
}
+ nni_mtx_unlock(&pfd->mtx);
}
- nni_mtx_unlock(&node->mx);
- nni_cv_fini(&node->cv);
- nni_mtx_fini(&node->mx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- nni_posix_pollq *pq = node->pq;
-
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->events |= events;
+ if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd,
+ pfd->events, pfd) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
-
- nni_mtx_lock(&node->mx);
- node->events |= events;
- node->armed = true;
- (void) port_associate(
- pq->port, PORT_SOURCE_FD, node->fd, node->events, node);
-
- // Possible errors here are:
- //
- // EBADF -- programming error on our part
- // EBADFD -- programming error on our part
- // ENOMEM -- not much we can do here
- // EAGAIN -- too many port events registered (65K!!)
- //
- // For now we ignore them all. (We need to be able to return
- // errors to our caller.) Effect on the application will appear
- // to be a stalled file descriptor (no notifications).
- nni_mtx_unlock(&node->mx);
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
-
for (;;) {
- nni_posix_pollq * pq = arg;
- port_event_t ev;
- nni_posix_pollq_node *node;
-
- if (port_get(pq->port, &ev, NULL) != 0) {
+ nni_posix_pollq *pq = arg;
+ port_event_t ev[NNI_MAX_PORTEV];
+ nni_posix_pfd * pfd;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ unsigned n;
+
+ n = 1; // wake us even on just one event
+ if (port_getn(pq->port, ev, NNI_MAX_PORTEV, &n, NULL) != 0) {
if (errno == EINTR) {
continue;
}
return;
}
- switch (ev.portev_source) {
- case PORT_SOURCE_ALERT:
- return;
-
- case PORT_SOURCE_FD:
- node = ev.portev_user;
+ // We run through the returned ports twice. First we
+ // get the callbacks. Then we do the reaps. This way
+ // we ensure that we only reap *after* callbacks have run.
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_FD) {
+ continue;
+ }
+ pfd = ev[i].portev_user;
+ events = ev[i].portev_events;
- nni_mtx_lock(&node->mx);
- node->revents = ev.portev_events;
- // mark events as cleared
- node->events &= ~node->revents;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->data;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- node->cb(node->data);
- break;
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
+ }
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_USER) {
+ continue;
+ }
- case PORT_SOURCE_USER:
// User event telling us to stop doing things.
- // We signal back to use this as a coordination event
- // between the pollq and the thread handler.
- // NOTE: It is absolutely critical that there is only
- // a single thread per pollq. Otherwise we cannot
- // be sure that we are blocked completely,
- node = ev.portev_user;
- nni_mtx_lock(&node->mx);
- node->pq = NULL;
- nni_cv_wake(&node->cv);
- nni_mtx_unlock(&node->mx);
+ // We signal back to use this as a coordination
+ // event between the pollq and the thread
+ // handler. NOTE: It is absolutely critical
+ // that there is only a single thread per
+ // pollq. Otherwise we cannot be sure that we
+ // are blocked completely,
+ pfd = ev[i].portev_user;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ nni_mtx_unlock(&pfd->mtx);
}
}
}
@@ -220,7 +212,6 @@ nni_posix_poll_thr(void *arg)
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- port_alert(pq->port, PORT_ALERT_SET, 1, NULL);
(void) close(pq->port);
nni_thr_fini(&pq->thr);
}
@@ -243,14 +234,15 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->data = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
int
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index d86a2008..96d8debd 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,50 +37,51 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_posix_resolv_tq = NULL;
-static nni_mtx nni_posix_resolv_mtx;
-
-typedef struct nni_posix_resolv_item nni_posix_resolv_item;
-struct nni_posix_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *resolv_tq = NULL;
+static nni_mtx resolv_mtx;
+
+typedef struct resolv_item resolv_item;
+struct resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv)
+resolv_finish(resolv_item *item, int rv)
{
nni_aio *aio;
- if ((aio = item->aio) != NULL) {
- if (nni_aio_get_prov_data(aio) == item) {
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
- }
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
}
}
static void
-nni_posix_resolv_cancel(nni_aio *aio, int rv)
+resolv_cancel(nni_aio *aio, int rv)
{
- nni_posix_resolv_item *item;
+ resolv_item *item;
- nni_mtx_lock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ nni_mtx_unlock(&resolv_mtx);
nni_aio_finish_error(aio, rv);
}
@@ -116,14 +117,23 @@ nni_posix_gai_errno(int rv)
}
static void
-nni_posix_resolv_task(void *arg)
+resolv_task(void *arg)
{
- nni_posix_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ resolv_item * item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
+
+ nni_mtx_lock(&resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&resolv_mtx);
results = NULL;
@@ -170,7 +180,7 @@ nni_posix_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nng_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nng_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -196,17 +206,18 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_posix_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ resolv_finish(item, rv);
+ nni_mtx_unlock(&resolv_mtx);
}
static void
-nni_posix_resolv_ip(const char *host, const char *serv, int passive,
- int family, int proto, nni_aio *aio)
+resolv_ip(const char *host, const char *serv, int passive, int family,
+ int proto, nni_aio *aio)
{
- nni_posix_resolv_item *item;
- sa_family_t fam;
+ resolv_item *item;
+ sa_family_t fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -231,10 +242,15 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
return;
}
- nni_task_init(
- nni_posix_resolv_tq, &item->task, nni_posix_resolv_task, item);
+ if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
+ 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ };
// NB: host and serv must remain valid until this is completed.
+ memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
item->name = host;
item->serv = serv;
@@ -242,24 +258,30 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -267,10 +289,10 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_posix_resolv_mtx);
+ nni_mtx_init(&resolv_mtx);
- if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&resolv_mtx);
return (rv);
}
return (0);
@@ -279,11 +301,11 @@ nni_posix_resolv_sysinit(void)
void
nni_posix_resolv_sysfini(void)
{
- if (nni_posix_resolv_tq != NULL) {
- nni_taskq_fini(nni_posix_resolv_tq);
- nni_posix_resolv_tq = NULL;
+ if (resolv_tq != NULL) {
+ nni_taskq_fini(resolv_tq);
+ resolv_tq = NULL;
}
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ nni_mtx_fini(&resolv_mtx);
}
#endif // NNG_USE_POSIX_RESOLV_GAI
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index cca8165c..0a521513 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -422,6 +422,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (pthread_self() == thr->tid);
+}
+
void
nni_atfork_child(void)
{
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 654d31e3..759bdb96 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -36,24 +36,29 @@
#endif
struct nni_plat_udp {
- nni_posix_pollq_node udp_pitem;
- int udp_fd;
- nni_list udp_recvq;
- nni_list udp_sendq;
- nni_mtx udp_mtx;
+ nni_posix_pfd *udp_pfd;
+ int udp_fd;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
};
static void
-nni_posix_udp_doclose(nni_plat_udp *udp)
+nni_posix_udp_doerror(nni_plat_udp *udp, int rv)
{
nni_aio *aio;
while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) ||
((aio = nni_list_first(&udp->udp_sendq)) != NULL)) {
nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, rv);
}
- // Underlying socket left open until close API called.
+}
+
+static void
+nni_posix_udp_doclose(nni_plat_udp *udp)
+{
+ nni_posix_udp_doerror(udp, NNG_ECLOSED);
}
static void
@@ -169,23 +174,22 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
// This function is called by the poller on activity on the FD.
static void
-nni_posix_udp_cb(void *arg)
+nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- int revents;
+ NNI_ASSERT(pfd == udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
- revents = udp->udp_pitem.revents;
- if (revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_udp_dorecv(udp);
}
- if (revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_udp_dosend(udp);
}
- if (revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_udp_doclose(udp);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&udp->udp_sendq)) {
events |= POLLOUT;
}
@@ -193,7 +197,11 @@ nni_posix_udp_cb(void *arg)
events |= POLLIN;
}
if (events) {
- nni_posix_pollq_arm(&udp->udp_pitem, events);
+ int rv;
+ rv = nni_posix_pfd_arm(udp->udp_pfd, events);
+ if (rv != 0) {
+ nni_posix_udp_doerror(udp, rv);
+ }
}
}
nni_mtx_unlock(&udp->udp_mtx);
@@ -232,22 +240,16 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
NNI_FREE_STRUCT(udp);
return (rv);
}
- udp->udp_pitem.fd = udp->udp_fd;
- udp->udp_pitem.cb = nni_posix_udp_cb;
- udp->udp_pitem.data = udp;
-
- (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK);
-
- nni_aio_list_init(&udp->udp_recvq);
- nni_aio_list_init(&udp->udp_sendq);
-
- if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) ||
- ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) {
+ if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) {
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
return (rv);
}
+ nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp);
+
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
*upp = udp;
return (0);
@@ -256,8 +258,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
void
nni_plat_udp_close(nni_plat_udp *udp)
{
- // We're no longer interested in events.
- nni_posix_pollq_fini(&udp->udp_pitem);
+ nni_posix_pfd_fini(udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
@@ -284,26 +285,46 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_list_first(&udp->udp_recvq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_list_first(&udp->udp_sendq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b3c4738f..0bd12b24 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -34,6 +34,7 @@ struct nni_plat_thr {
void (*func)(void *);
void * arg;
HANDLE handle;
+ DWORD id;
};
struct nni_plat_mtx {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index a3ae3748..5ead9cbc 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -155,11 +155,16 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&evt->mtx);
- nni_aio_schedule(aio, nni_win_event_cancel, evt);
+ if ((rv = nni_aio_schedule(aio, nni_win_event_cancel, evt)) != 0) {
+ nni_mtx_unlock(&evt->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c6376cc7..169a2e00 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -572,17 +572,22 @@ void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
- nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 070bf40d..2bc68c32 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -29,49 +29,55 @@
#define NNG_WIN_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_win_resolv_tq = NULL;
-static nni_mtx nni_win_resolv_mtx;
-
-typedef struct nni_win_resolv_item nni_win_resolv_item;
-struct nni_win_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *win_resolv_tq = NULL;
+static nni_mtx win_resolv_mtx;
+
+typedef struct win_resolv_item win_resolv_item;
+struct win_resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
+win_resolv_finish(win_resolv_item *item, int rv)
{
- nni_aio *aio = item->aio;
-
- nni_aio_set_prov_data(aio, NULL);
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
+ nni_aio *aio;
+
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nni_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ }
}
static void
-nni_win_resolv_cancel(nni_aio *aio, int rv)
+win_resolv_cancel(nni_aio *aio, int rv)
{
- nni_win_resolv_item *item;
+ win_resolv_item *item;
- nni_mtx_lock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_unlock(&win_resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ item->aio = NULL;
+ nni_mtx_unlock(&win_resolv_mtx);
nni_aio_finish_error(aio, rv);
}
static int
-nni_win_gai_errno(int rv)
+win_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -98,17 +104,26 @@ nni_win_gai_errno(int rv)
}
static void
-nni_win_resolv_task(void *arg)
+win_resolv_task(void *arg)
{
- nni_win_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ win_resolv_item *item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
results = NULL;
+ nni_mtx_lock(&win_resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&win_resolv_mtx);
+
// We treat these all as IP addresses. The service and the
// host part are split.
memset(&hints, 0, sizeof(hints));
@@ -124,7 +139,7 @@ nni_win_resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_win_gai_errno(rv);
+ rv = win_gai_errno(rv);
goto done;
}
@@ -142,7 +157,7 @@ nni_win_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nni_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -167,17 +182,18 @@ done:
if (results != NULL) {
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ win_resolv_finish(item, rv);
+ nni_mtx_unlock(&win_resolv_mtx);
}
static void
-nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
+win_resolv_ip(const char *host, const char *serv, int passive, int family,
int proto, nni_aio *aio)
{
- nni_win_resolv_item *item;
- int fam;
+ win_resolv_item *item;
+ int fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -202,8 +218,12 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_task_init(
- nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
+ rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
item->passive = passive;
item->name = host;
@@ -212,24 +232,30 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_aio_schedule(aio, nni_win_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&win_resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -237,10 +263,10 @@ nni_win_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_win_resolv_mtx);
+ nni_mtx_init(&win_resolv_mtx);
- if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_win_resolv_mtx);
+ if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&win_resolv_mtx);
return (rv);
}
return (0);
@@ -249,11 +275,11 @@ nni_win_resolv_sysinit(void)
void
nni_win_resolv_sysfini(void)
{
- if (nni_win_resolv_tq != NULL) {
- nni_taskq_fini(nni_win_resolv_tq);
- nni_win_resolv_tq = NULL;
+ if (win_resolv_tq != NULL) {
+ nni_taskq_fini(win_resolv_tq);
+ win_resolv_tq = NULL;
}
- nni_mtx_fini(&nni_win_resolv_mtx);
+ nni_mtx_fini(&win_resolv_mtx);
}
#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index a2ae72fe..2e9d58d7 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -107,6 +107,7 @@ static unsigned int __stdcall nni_plat_thr_main(void *arg)
{
nni_plat_thr *thr = arg;
+ thr->id = GetCurrentThreadId();
thr->func(thr->arg);
return (0);
}
@@ -138,6 +139,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (GetCurrentThreadId() == thr->id);
+}
+
static LONG plat_inited = 0;
int