aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-04 16:15:02 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-04 16:15:02 -0700
commit16a43040ef29f77375d226f669770e64a42d278c (patch)
tree1fd80d0d761fbdf812c257817a3b23ae1dc6519c
parent58c5fbb731f50a952864bc500a8efd3b7077ee65 (diff)
downloadnng-16a43040ef29f77375d226f669770e64a42d278c.tar.gz
nng-16a43040ef29f77375d226f669770e64a42d278c.tar.bz2
nng-16a43040ef29f77375d226f669770e64a42d278c.zip
Separate out poller/pollq from basic socket operations.
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/platform/posix/posix_aio.h4
-rw-r--r--src/platform/posix/posix_config.h2
-rw-r--r--src/platform/posix/posix_epdesc.c347
-rw-r--r--src/platform/posix/posix_impl.h6
-rw-r--r--src/platform/posix/posix_pipedesc.c382
-rw-r--r--src/platform/posix/posix_pollq.h43
-rw-r--r--src/platform/posix/posix_pollq_poll.c367
-rw-r--r--src/platform/posix/posix_thread.c6
9 files changed, 1153 insertions, 8 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index cfe66dd5..4a8aa8ef 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -80,15 +80,19 @@ set (NNG_SOURCES
platform/posix/posix_impl.h
platform/posix/posix_config.h
platform/posix/posix_aio.h
+ platform/posix/posix_pollq.h
platform/posix/posix_socket.h
platform/posix/posix_alloc.c
platform/posix/posix_clock.c
platform/posix/posix_debug.c
+ platform/posix/posix_epdesc.c
platform/posix/posix_ipc.c
platform/posix/posix_net.c
platform/posix/posix_pipe.c
+ platform/posix/posix_pipedesc.c
platform/posix/posix_poll.c
+ platform/posix/posix_pollq_poll.c
platform/posix/posix_rand.c
platform/posix/posix_socket.c
platform/posix/posix_thread.c
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 01d042f8..186f586f 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,12 +18,12 @@
#include "core/nng_impl.h"
+typedef struct nni_posix_pollq nni_posix_pollq;
+
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
-extern int nni_posix_pipedesc_sysinit(void);
-extern void nni_posix_pipedesc_sysfini(void);
extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h
index bf121243..7fc2f5d5 100644
--- a/src/platform/posix/posix_config.h
+++ b/src/platform/posix/posix_config.h
@@ -58,4 +58,4 @@
#define NNG_USE_CLOCKID CLOCK_REALTIME
#endif // CLOCK_REALTIME
-#define NNG_USE_POSIX_AIOPOLL 1
+#define NNG_USE_POSIX_POLLQ_POLL 1
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
new file mode 100644
index 00000000..9d7cf538
--- /dev/null
+++ b/src/platform/posix/posix_epdesc.c
@@ -0,0 +1,347 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_pollq.h"
+
+#ifdef PLATFORM_POSIX_EPDESC
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+struct nni_posix_epdesc {
+ int fd;
+ int index;
+ nni_list connectq;
+ nni_list acceptq;
+ nni_posix_pollq_node node;
+ nni_posix_pollq * pq;
+ struct sockaddr_storage locaddr;
+ struct sockaddr_storage remaddr;
+ socklen_t loclen;
+ socklen_t remlen;
+ nni_mtx mtx;
+};
+
+
+#if 0
+static void
+nni_posix_epdesc_cancel(nni_aio *aio)
+{
+ nni_posix_epdesc *ed;
+ nni_posix_pollq *pq;
+
+ ed = aio->a_prov_data;
+ pq = ed->pq;
+
+ nni_mtx_lock(&pq->mtx);
+ nni_list_node_remove(&aio->a_prov_node);
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+static void
+nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
+{
+ nni_posix_epdesc *ed;
+ nni_posix_pipedesc *pd;
+
+ ed = aio->a_prov_data;
+
+ // acceptq or connectq.
+ if (nni_list_active(&ed->connectq, aio)) {
+ nni_list_remove(&ed->connectq, aio);
+ }
+
+ if (rv == 0) {
+ rv = nni_posix_pipedesc_init(&pd, newfd);
+ if (rv != 0) {
+ (void) close(newfd);
+ } else {
+ aio->a_pipe = pipe;
+ }
+ }
+ // Abuse the count to hold our new fd. This is only for accept.
+ nni_aio_finish(aio, rv, 0);
+}
+
+
+static void
+nni_posix_poll_connect(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+ socklen_t sz;
+ int rv;
+
+ // Note that normally there will only be a single connect AIO...
+ // A socket that is here will have *initiated* with a connect()
+ // call, which returned EINPROGRESS. When the connection attempt
+ // is done, either way, the descriptor will be noted as writable.
+ // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual
+ // status of the connection attempt...
+ while ((aio = nni_list_first(&ed->connectq)) != NULL) {
+ rv = -1;
+ sz = sizeof (rv);
+ if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+ switch (rv) {
+ case 0:
+ // Success!
+ nni_posix_epdesc_finish(aio, 0, ed->fd);
+ continue;
+
+ case EINPROGRESS:
+ // Still in progress... keep trying
+ return;
+
+ default:
+ nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
+ continue;
+ }
+ }
+}
+
+
+static void
+nni_posix_poll_accept(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+ int newfd;
+ struct sockaddr_storage ss;
+ socklen_t slen;
+
+ while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
+ // We could argue that knowing the remote peer address would
+ // be nice. But frankly if someone wants it, they can just
+ // do getpeername().
+
+#ifdef NNG_USE_ACCEPT4
+ newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC);
+ if ((newfd < 0) &&
+ ((errno == ENOSYS) || (errno == ENOTSUP))) {
+ newfd = accept(ed->fd, NULL, NULL);
+ }
+#else
+ newfd = accept(ed->fd, NULL, NULL);
+#endif
+
+ if (newfd >= 0) {
+ // successful connection request!
+ nni_posix_epdesc_finish(aio, 0, newfd);
+ continue;
+ }
+
+ if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
+ // Well, let's try later. Note that EWOULDBLOCK
+ // is required by standards, but some platforms may
+ // use EAGAIN. The values may be the same, so we
+ // can't use switch.
+ return;
+ }
+
+ if (errno == ECONNABORTED) {
+ // Let's just eat this one. Perhaps it may be
+ // better to report it to the application, but we
+ // think most applications don't want to see this.
+ // Only someone with a packet trace is going to
+ // notice this.
+ continue;
+ }
+
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ }
+}
+
+
+static void
+nni_posix_poll_epclose(nni_posix_epdesc *ed)
+{
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ }
+ while ((aio = nni_list_first(&ed->connectq)) != NULL) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ }
+}
+
+
+static int
+nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed)
+{
+ int rv;
+
+ // Add epdesc to the pollq if it isn't already there.
+ if (!nni_list_active(&pq->eds, ed)) {
+ if ((rv = nni_posix_poll_grow(pq)) != 0) {
+ return (rv);
+ }
+ nni_list_append(&pq->eds, ed);
+ pq->neds++;
+ }
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ nni_posix_pollq *pq = ed->pq;
+ int wake;
+
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (ed->fd < 0) {
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ if (rv == 0) {
+ // Immediate connect, cool! This probably only happens on
+ // loopback, and probably not on every platform.
+ nni_posix_epdesc_finish(aio, 0, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+ if (errno != EINPROGRESS) {
+ // Some immediate failure occurred.
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ NNI_ASSERT(!nni_list_active(&ed->connectq, aio));
+ wake = nni_list_empty(&ed->connectq);
+ nni_aio_list_append(&ed->connectq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+void
+nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+ int wake;
+ nni_posix_pollq *pq = ed->pq;
+
+ // Accept is simpler than the connect case. With accept we just
+ // need to wait for the socket to be readable to indicate an incoming
+ // connection is ready for us. There isn't anything else for us to
+ // do really, as that will have been done in listen.
+ nni_mtx_lock(&pq->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
+
+ if (ed->fd < 0) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_lock(&pq->mtx);
+ }
+ NNI_ASSERT(!nni_list_active(&ed->acceptq, aio));
+ wake = nni_list_empty(&ed->acceptq);
+ nni_aio_list_append(&ed->acceptq, aio);
+ if (wake) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+#endif
+
+int
+nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
+{
+ nni_posix_epdesc *ed;
+ int rv;
+
+ if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((rv = nni_mtx_init(&ed->mtx)) != 0) {
+ NNI_FREE_STRUCT(ed);
+ return (rv);
+ }
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the ed to a single pollq we may get some kind of cache warmth.
+
+ ed->pq = nni_posix_pollq_get(fd);
+ ed->fd = fd;
+ ed->node.index = 0;
+ ed->node.cb = NULL; // XXXX:
+ ed->node.data = ed;
+
+ // Ensure we are in non-blocking mode.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ nni_aio_list_init(&ed->connectq);
+ nni_aio_list_init(&ed->acceptq);
+
+ *edp = ed;
+ return (0);
+}
+
+
+void
+nni_posix_epdesc_fini(nni_posix_epdesc *ed)
+{
+ // XXX: MORE WORK HERE.
+ nni_mtx_fini(&ed->mtx);
+ NNI_FREE_STRUCT(ed);
+}
+
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_epdesc_not_used = 0;
+
+#endif // PLATFORM_POSIX_EPDESC
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 83b4914e..3a2e29e1 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -26,6 +26,8 @@
#define PLATFORM_POSIX_RANDOM
#define PLATFORM_POSIX_SOCKET
#define PLATFORM_POSIX_THREAD
+#define PLATFORM_POSIX_PIPEDESC
+#define PLATFORM_POSIX_EPDESC
#include "platform/posix/posix_config.h"
#endif
@@ -65,7 +67,7 @@ struct nni_plat_cv {
#endif
-extern int nni_posix_pipedesc_sysinit(void);
-extern void nni_posix_pipedesc_sysfini(void);
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
#endif // PLATFORM_POSIX_IMPL_H
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
new file mode 100644
index 00000000..14a5c8ab
--- /dev/null
+++ b/src/platform/posix/posix_pipedesc.c
@@ -0,0 +1,382 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
+#include "platform/posix/posix_pollq.h"
+
+#ifdef PLATFORM_POSIX_PIPEDESC
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+// nni_posix_pipedesc is a descriptor kept one per transport pipe (i.e. open
+// file descriptor for TCP socket, etc.) This contains the list of pending
+// aios for that underlying socket, as well as the socket itself.
+struct nni_posix_pipedesc {
+ nni_posix_pollq * pq;
+ int fd;
+ nni_list readq;
+ nni_list writeq;
+ nni_posix_pollq_node node;
+ nni_mtx mtx;
+};
+
+
+static void
+nni_posix_pipedesc_finish(nni_aio *aio, int rv)
+{
+ nni_aio_list_remove(aio);
+ nni_aio_finish(aio, rv, aio->a_count);
+}
+
+
+static void
+nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = writev(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+
+ nni_posix_pipedesc_finish(aio, rv);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_list_remove(&pd->writeq, aio);
+ nni_posix_pipedesc_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
+{
+ int n;
+ int rv;
+ int i;
+ struct iovec iovec[4];
+ struct iovec *iovp;
+ nni_aio *aio;
+
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ for (i = 0; i < aio->a_niov; i++) {
+ iovec[i].iov_len = aio->a_iov[i].iov_len;
+ iovec[i].iov_base = aio->a_iov[i].iov_buf;
+ }
+ iovp = &iovec[0];
+ rv = 0;
+
+ n = readv(pd->fd, iovp, aio->a_niov);
+ if (n < 0) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ // Can't write more right now. We're done
+ // on this fd for now.
+ return;
+ }
+ rv = nni_plat_errno(errno);
+
+ nni_posix_pipedesc_finish(aio, rv);
+ return;
+ }
+
+ if (n == 0) {
+ // No bytes indicates a closed descriptor.
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ return;
+ }
+
+ aio->a_count += n;
+
+ while (n > 0) {
+ // If we didn't write the first full iov,
+ // then we're done for now. Record progress
+ // and return to caller.
+ if (n < aio->a_iov[0].iov_len) {
+ aio->a_iov[0].iov_buf += n;
+ aio->a_iov[0].iov_len -= n;
+ return;
+ }
+
+ // We consumed the full iovec, so just move the
+ // remaininng ones up, and decrement count handled.
+ n -= aio->a_iov[0].iov_len;
+ for (i = 1; i < aio->a_niov; i++) {
+ aio->a_iov[i-1] = aio->a_iov[i];
+ }
+ NNI_ASSERT(aio->a_niov > 0);
+ aio->a_niov--;
+ }
+
+ // We completed the entire operation on this aioq.
+ nni_posix_pipedesc_finish(aio, 0);
+
+ // Go back to start of loop to see if there is another
+ // aioq ready for us to process.
+ }
+}
+
+
+static void
+nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+
+ if (pd->fd != -1) {
+ // Let any peer know we are closing.
+ (void) shutdown(pd->fd, SHUT_RDWR);
+ }
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+}
+
+
+static void
+nni_posix_pipedesc_cb(void *arg)
+{
+ nni_posix_pipedesc *pd = arg;
+
+ nni_mtx_lock(&pd->mtx);
+ if (pd->node.revents & POLLIN) {
+ nni_posix_pipedesc_doread(pd);
+ }
+ if (pd->node.revents & POLLOUT) {
+ nni_posix_pipedesc_dowrite(pd);
+ }
+ if (pd->node.revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ nni_posix_pipedesc_doclose(pd);
+ }
+
+ pd->node.revents = 0;
+ pd->node.events = 0;
+
+ if (!nni_list_empty(&pd->writeq)) {
+ pd->node.events |= POLLOUT;
+ }
+ if (!nni_list_empty(&pd->readq)) {
+ pd->node.events |= POLLIN;
+ }
+
+ // If we still have uncompleted operations, resubmit us.
+ if (pd->node.events != 0) {
+ nni_posix_pollq_submit(pd->pq, &pd->node);
+ }
+ nni_mtx_unlock(&pd->mtx);
+}
+
+
+void
+nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
+{
+ nni_posix_pollq *pq;
+ nni_aio *aio;
+
+ pq = pd->pq;
+
+ nni_posix_pollq_cancel(pq, &pd->node);
+
+ nni_mtx_lock(&pd->mtx);
+ if (pd->fd != -1) {
+ // Let any peer know we are closing.
+ shutdown(pd->fd, SHUT_RDWR);
+ }
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&pd->mtx);
+}
+
+
+static void
+nni_posix_pipedesc_cancel(nni_aio *aio)
+{
+ nni_posix_pipedesc *pd;
+
+ pd = aio->a_prov_data;
+
+ nni_mtx_lock(&pd->mtx);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&pd->mtx);
+}
+
+
+void
+nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ int rv;
+
+ nni_mtx_lock(&pd->mtx);
+ if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+ if (pd->fd < 0) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+
+ nni_aio_list_append(&pd->readq, aio);
+ if ((pd->node.events & POLLIN) == 0) {
+ pd->node.events |= POLLIN;
+ rv = nni_posix_pollq_submit(pd->pq, &pd->node);
+ if (rv != 0) {
+ nni_posix_pipedesc_finish(aio, rv);
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+ }
+ nni_mtx_unlock(&pd->mtx);
+}
+
+
+void
+nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
+{
+ int rv;
+
+ nni_mtx_lock(&pd->mtx);
+ if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+ if (pd->fd < 0) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+
+ nni_aio_list_append(&pd->writeq, aio);
+ if ((pd->node.events & POLLOUT) == 0) {
+ pd->node.events |= POLLOUT;
+ rv = nni_posix_pollq_submit(pd->pq, &pd->node);
+ if (rv != 0) {
+ nni_posix_pipedesc_finish(aio, rv);
+ nni_mtx_unlock(&pd->mtx);
+ return;
+ }
+ }
+ nni_mtx_unlock(&pd->mtx);
+}
+
+
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
+{
+ nni_posix_pipedesc *pd;
+ int rv;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ memset(pd, 0, sizeof (*pd));
+
+ // We could randomly choose a different pollq, or for efficiencies
+ // sake we could take a modulo of the file desc number to choose
+ // one. For now we just have a global pollq. Note that by tying
+ // the pd to a single pollq we may get some kind of cache warmth.
+
+ if ((rv = nni_mtx_init(&pd->mtx)) != 0) {
+ NNI_FREE_STRUCT(pd);
+ return (rv);
+ }
+ pd->pq = nni_posix_pollq_get(fd);
+ pd->fd = fd;
+ pd->node.fd = fd;
+ pd->node.cb = nni_posix_pipedesc_cb;
+ pd->node.data = pd;
+
+ (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ *pdp = pd;
+ return (0);
+}
+
+
+void
+nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
+{
+ // Make sure no other polling activity is pending.
+ nni_posix_pipedesc_close(pd);
+
+ nni_mtx_fini(&pd->mtx);
+
+ NNI_FREE_STRUCT(pd);
+}
+
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_pipedesc_not_used = 0;
+
+#endif // PLATFORM_POSIX_PIPEDESC
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
new file mode 100644
index 00000000..74b39704
--- /dev/null
+++ b/src/platform/posix/posix_pollq.h
@@ -0,0 +1,43 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef PLATFORM_POSIX_POLLQ_H
+#define PLATFORM_POSIX_POLLQ_H
+
+// This file defines structures we will use for emulating asynchronous I/O
+// on POSIX. POSIX lacks the support for callback based asynchronous I/O
+// that we have on Windows, although it has a non-widely support aio layer
+// that is not very performant on many systems. So we emulate this using
+// one of several possible different backends.
+
+#include "core/nng_impl.h"
+#include <poll.h>
+
+typedef struct nni_posix_pollq_node nni_posix_pollq_node;
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+struct nni_posix_pollq_node {
+ nni_posix_pollq * pq; // associated pollq
+ nni_list_node node; // linkage into the pollq list
+ int index; // used by the poller impl
+ int armed; // used by the poller impl
+ int fd; // file descriptor to poll
+ int events; // events to watch for
+ int revents; // events received
+ void * data; // user data
+ nni_cb cb; // user callback on event
+};
+
+extern nni_posix_pollq *nni_posix_pollq_get(int);
+extern int nni_posix_pollq_submit(nni_posix_pollq *, nni_posix_pollq_node *);
+extern void nni_posix_pollq_cancel(nni_posix_pollq *, nni_posix_pollq_node *);
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
+
+#endif // PLATFORM_POSIX_POLLQ_H
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
new file mode 100644
index 00000000..a70c902d
--- /dev/null
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -0,0 +1,367 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
+
+#ifdef NNG_USE_POSIX_POLLQ_POLL
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <poll.h>
+
+
+// POSIX AIO using poll(). We use a single poll thread to perform
+// I/O operations for the entire system. This isn't entirely scalable,
+// and it might be a good idea to create a few threads and group the
+// I/O operations into separate pollers to limit the amount of work each
+// thread does, and to scale across multiple cores. For now we don't
+// worry about it.
+
+// nni_posix_pollq is a work structure used by the poller thread, that keeps
+// track of all the underlying pipe handles and so forth being used by poll().
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ nni_cv cv;
+ struct pollfd * fds;
+ struct pollfd * newfds;
+ int nfds;
+ int nnewfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list nodes; // poll list
+ nni_list notify; // notify list
+ int nnodes; // num of nodes in nodes list
+ int cancel; // waiters for cancellation
+ int inpoll; // poller asleep in poll
+
+ nni_posix_pollq_node * active; // active node (in callback)
+};
+
+
+
+static int
+nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
+{
+ int grow = pq->nnodes + 2; // one for us, one for waker
+ int noldfds;
+ struct pollfd *oldfds;
+ struct pollfd *newfds;
+
+ if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ return (0);
+ }
+
+ grow = grow + 128;
+
+ // Maybe we are adding a *lot* of pipes at once, and have to grow
+ // multiple times before the poller gets scheduled. In that case
+ // toss the old array before we finish.
+ oldfds = pq->newfds;
+ noldfds = pq->nnewfds;
+
+ if ((newfds = nni_alloc(grow * sizeof (struct pollfd))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+
+ pq->newfds = newfds;
+ pq->nnewfds = grow;
+
+ if (noldfds != 0) {
+ nni_free(oldfds, noldfds * sizeof (struct pollfd));
+ }
+ return (0);
+}
+
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pollq = arg;
+ nni_posix_pollq_node *node, *nextnode;
+
+ nni_mtx_lock(&pollq->mtx);
+ for (;;) {
+ int rv;
+ int nfds;
+ struct pollfd *fds;
+
+ if (pollq->close) {
+ break;
+ }
+
+ if (pollq->newfds != NULL) {
+ // We have "grown" by the caller. Free up the old
+ // space, and start using the new.
+ nni_free(pollq->fds,
+ pollq->nfds * sizeof (struct pollfd));
+ pollq->fds = pollq->newfds;
+ pollq->nfds = pollq->nnewfds;
+ pollq->newfds = NULL;
+ }
+ fds = pollq->fds;
+ nfds = 0;
+
+ // The waker pipe is set up so that we will be woken
+ // when it is written (this allows us to be signaled).
+ fds[nfds].fd = pollq->wakerfd;
+ fds[nfds].events = POLLIN;
+ fds[nfds].revents = 0;
+ nfds++;
+
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pollq->nodes, node) {
+ fds[nfds].fd = node->fd;
+ fds[nfds].events = node->armed;
+ fds[nfds].revents = 0;
+ node->index = nfds;
+ nfds++;
+ }
+
+ // Now poll it. We block indefinitely, since we use separate
+ // timeouts to wake and remove the elements from the list.
+ pollq->inpoll = 1;
+ nni_mtx_unlock(&pollq->mtx);
+ rv = poll(fds, nfds, -1);
+ nni_mtx_lock(&pollq->mtx);
+ pollq->inpoll = 0;
+
+ if (rv < 0) {
+ // This shouldn't happen really. If it does, we
+ // just try again. (EINTR is probably the only
+ // reasonable failure here, unless internal memory
+ // allocations fail in the kernel, leading to EAGAIN.)
+ continue;
+ }
+
+
+ // If the waker pipe was signaled, read from it.
+ if (fds[0].revents & POLLIN) {
+ NNI_ASSERT(fds[0].fd == pollq->wakerfd);
+ nni_plat_pipe_clear(pollq->wakerfd);
+ }
+
+ // Now we iterate through all the nodes. Note that one
+ // may have been added or removed. New pipedescs will have
+ // their index set to -1. Removed ones will just be absent.
+ // Note that we may remove the pipedesc from the list, so we
+ // have to use a custom iterator.
+ nextnode = nni_list_first(&pollq->nodes);
+ while ((node = nextnode) != NULL) {
+ int index;
+
+ // Save the nextpd for our next iteration. This
+ // way we can remove the PD from the list without
+ // breaking the iteration.
+
+ nextnode = nni_list_next(&pollq->nodes, node);
+
+ // If index is less than 1, then we have just added
+ // this and there is no FD for it in the pollfds.
+ if ((index = node->index) < 1) {
+ continue;
+ }
+
+ // Clear the index for the next time around.
+ node->index = 0;
+
+ node->revents = fds[index].revents;
+
+ // Now we move this node to the callback list.
+ node->armed = 0;
+ nni_list_remove(&pollq->nodes, node);
+ nni_list_append(&pollq->notify, node);
+ pollq->nnodes--;
+ }
+
+ // Finally we can call the callbacks. We record the
+ // active callback so any attempt to cancel blocks until
+ // the callback is finished.
+ while ((node = nni_list_first(&pollq->notify)) != NULL) {
+ nni_list_remove(&pollq->notify, node);
+ if (node->cb != NULL) {
+ pollq->active = node;
+ nni_mtx_unlock(&pollq->mtx);
+ node->cb(node->data);
+ nni_mtx_lock(&pollq->mtx);
+ pollq->active = NULL;
+ }
+ }
+
+ // Wake any cancelers.
+ if (pollq->cancel != 0) {
+ pollq->cancel = 0;
+ nni_cv_wake(&pollq->cv);
+ }
+ }
+ nni_mtx_unlock(&pollq->mtx);
+}
+
+
+void
+nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+{
+ nni_mtx_lock(&pq->mtx);
+ while (pq->active == node) {
+ pq->cancel++;
+ nni_cv_wait(&pq->cv);
+ }
+ if (nni_list_active(&pq->nodes, node)) {
+ node->armed = 0;
+ nni_list_remove(&pq->nodes, node);
+ }
+ // Since we're not removing the fd from the outstanding poll, we
+ // may get an event. In that case, we'll wake and rebuild the
+ // pollset without it, with no further action. Otherwise having the
+ // poll present does no harm beyond the "spurious" wake of the poller
+ // thread. (If we had port_disassociate or somesuch, this would be
+ // a great time for that.)
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
+int
+nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+{
+ int wake;
+ int rv;
+ int evs;
+
+ nni_mtx_lock(&pq->mtx);
+
+ if (node->events == 0) {
+ // Nothing to schedule?
+ nni_mtx_unlock(&pq->mtx);
+ return (0);
+ }
+
+ if (node->armed == 0) {
+ NNI_ASSERT(!nni_list_active(&pq->nodes, node));
+
+ rv = nni_posix_pollq_poll_grow(pq);
+ if (rv != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return (rv);
+ }
+
+ nni_list_append(&pq->nodes, node);
+ pq->nnodes++;
+ }
+
+ node->armed |= node->events;
+
+ // Wake up the poller since we're adding a new poll, but only bother
+ // if it's already asleep. (Frequently it will *not* be.)
+ if (pq->inpoll) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+ return (0);
+}
+
+
+static void
+nni_posix_pollq_fini(nni_posix_pollq *pq)
+{
+ if (pq->started) {
+ nni_mtx_lock(&pq->mtx);
+ pq->close = 1;
+ pq->started = 0;
+ nni_plat_pipe_raise(pq->wakewfd);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_empty(&pq->nodes));
+ nni_mtx_unlock(&pq->mtx);
+ }
+
+ nni_thr_fini(&pq->thr);
+ if (pq->wakewfd >= 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ pq->wakewfd = pq->wakerfd = -1;
+ }
+ nni_free(pq->newfds, pq->nnewfds * sizeof (struct pollfd));
+ nni_free(pq->fds, pq->nfds * sizeof (struct pollfd));
+ nni_mtx_fini(&pq->mtx);
+}
+
+
+static int
+nni_posix_pollq_init(nni_posix_pollq *pq)
+{
+ int rv;
+
+ NNI_LIST_INIT(&pq->nodes, nni_posix_pollq_node, node);
+ NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node);
+ pq->wakewfd = -1;
+ pq->wakerfd = -1;
+ pq->close = 0;
+
+ if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
+ ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
+ ((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
+ ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
+ ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
+ nni_posix_pollq_fini(pq);
+ return (rv);
+ }
+ pq->started = 1;
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+
+// We use a single pollq for the entire system, which means only a single
+// thread is polling. This may be somewhat less than optimally efficient,
+// and it may be worth investigating having multiple threads to improve
+// efficiency and scalability. (This would shorten the linked lists,
+// improving C10K scalability, and also allow us to engage multiple cores.)
+// It's not entirely clear how many threads are "optimal".
+static nni_posix_pollq nni_posix_global_pollq;
+
+nni_posix_pollq *
+nni_posix_pollq_get(int fd)
+{
+ // This is the point where we could choose a pollq based on FD.
+ return (&nni_posix_global_pollq);
+}
+
+
+int
+nni_posix_pollq_sysinit(void)
+{
+ int rv;
+
+ rv = nni_posix_pollq_init(&nni_posix_global_pollq);
+ return (rv);
+}
+
+
+void
+nni_posix_pollq_sysfini(void)
+{
+ nni_posix_pollq_fini(&nni_posix_global_pollq);
+}
+
+
+#else
+
+// Suppress empty symbols warnings in ranlib.
+int nni_posix_pollq_poll_used = 0;
+
+#endif // NNG_USE_POSIX_POLLQ_POLL
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index b052ed53..171a87b9 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -299,7 +299,7 @@ nni_plat_init(int (*helper)(void))
// as scalable / thrifty with our use of VM.
(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
- if ((rv = nni_posix_pipedesc_sysinit()) != 0) {
+ if ((rv = nni_posix_pollq_sysinit()) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
(void) close(nni_plat_devnull);
pthread_mutexattr_destroy(&nni_mxattr);
@@ -310,7 +310,7 @@ nni_plat_init(int (*helper)(void))
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
- nni_posix_pipedesc_sysfini();
+ nni_posix_pollq_sysfini();
(void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
@@ -332,7 +332,7 @@ nni_plat_fini(void)
{
pthread_mutex_lock(&nni_plat_lock);
if (nni_plat_inited) {
- nni_posix_pipedesc_sysfini();
+ nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
pthread_attr_destroy(&nni_pthread_attr);