aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--src/CMakeLists.txt6
-rw-r--r--src/platform/posix/posix_pipedesc.c2
-rw-r--r--src/platform/posix/posix_pollq.h7
-rw-r--r--src/platform/posix/posix_pollq_epoll.c30
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c34
-rw-r--r--src/platform/posix/posix_pollq_poll.c22
-rw-r--r--src/platform/posix/posix_pollq_port.c268
8 files changed, 281 insertions, 89 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c91e6a34..f277c653 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -390,6 +390,7 @@ else ()
nng_check_sym (alloca alloca.h NNG_HAVE_ALLOCA)
nng_check_struct_member(msghdr msg_control sys/socket.h NNG_HAVE_MSG_CONTROL)
nng_check_sym (kqueue sys/event.h NNG_HAVE_KQUEUE)
+ nng_check_sym (port_create port.h NNG_HAVE_PORT_CREATE)
nng_check_sym (epoll_wait sys/epoll.h NNG_HAVE_EPOLL)
nng_check_sym (getpeereid unistd.h NNG_HAVE_GETPEEREID)
nng_check_sym (SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a3e36ee8..96747bc3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -106,7 +106,11 @@ if (NNG_PLATFORM_POSIX)
)
endif()
-if (NNG_HAVE_KQUEUE)
+if (NNG_HAVE_PORT_CREATE)
+ set (NNG_SOURCES ${NNG_SOURCES}
+ platform/posix/posix_pollq_port.c
+ )
+elseif (NNG_HAVE_KQUEUE)
set (NNG_SOURCES ${NNG_SOURCES}
platform/posix/posix_pollq_kqueue.c
)
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index 3745f11f..e7225395 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -242,7 +242,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT);
+ nni_posix_pollq_remove(&pd->node);
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index bb441c7b..2c855da1 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -35,6 +35,8 @@ struct nni_posix_pollq_node {
int revents; // events received
void * data; // user data
nni_cb cb; // user callback on event
+ nni_mtx mx;
+ nni_cv cv;
};
extern nni_posix_pollq *nni_posix_pollq_get(int);
@@ -46,7 +48,6 @@ extern void nni_posix_pollq_fini(nni_posix_pollq_node *);
extern int nni_posix_pollq_add(nni_posix_pollq_node *);
extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
-extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index fe831ec1..0f6867da 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -84,6 +84,7 @@ nni_posix_pollq_add(nni_posix_pollq_node *node)
rv = nni_plat_errno(errno);
nni_idhash_remove(pq->nodes, id);
node->index = 0;
+ node->pq = NULL;
}
nni_mtx_unlock(&pq->mtx);
@@ -105,7 +106,7 @@ nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
ev.data.u64 = (uint64_t) node->index;
if (node->index != 0) {
- // This dereegisters the node. If the poller was blocked
+ // This deregisters the node. If the poller was blocked
// then this keeps it from coming back in to find us.
nni_idhash_remove(pq->nodes, (uint64_t) node->index);
}
@@ -197,33 +198,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
nni_mtx_unlock(&pq->mtx);
}
-void
-nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
-{
- struct epoll_event ev;
-
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
-
- node->events &= ~events;
- if (node->events == 0) {
- ev.events = 0;
- } else {
- ev.events = node->events | NNI_EPOLL_FLAGS;
- }
- ev.data.u64 = (uint64_t) node->index;
-
- if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev) != 0) {
- NNI_ASSERT(errno == EBADF || errno == ENOENT);
- }
-
- nni_mtx_unlock(&pq->mtx);
-}
-
static void
nni_posix_poll_thr(void *arg)
{
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index cff0dcf4..0f312170 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -205,40 +205,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
nni_mtx_unlock(&pq->mtx);
}
-void
-nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
-{
- struct kevent kevents[2];
- int nevents = 0;
-
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
-
- if ((node->events & POLLIN) && (events & POLLIN)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
- EV_DISABLE, 0, 0, (kevent_udata_t) node);
- }
-
- if ((node->events & POLLOUT) && (events & POLLOUT)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_DISABLE, 0, 0, (kevent_udata_t) node);
- }
-
- if (nevents > 0) {
- int rv = kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
- if (rv < 0 && errno != ENOENT && errno != EBADF) {
- NNI_ASSERT(false);
- }
- node->events &= ~events;
- }
-
- nni_mtx_unlock(&pq->mtx);
-}
-
static void
nni_posix_poll_thr(void *arg)
{
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 8e3bc741..efc9ff48 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -305,28 +305,6 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
nni_mtx_unlock(&pq->mtx);
}
-void
-nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
-{
- nni_posix_pollq *pq = node->pq;
- int oevents;
-
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- oevents = node->events;
- node->events &= ~events;
- if ((node->events == 0) && (oevents != 0)) {
- nni_list_node_remove(&node->node);
- nni_list_append(&pq->idle, node);
- }
- // No need to wake anything, we might get a spurious wake up but
- // that's harmless.
- nni_mtx_unlock(&pq->mtx);
-}
-
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
new file mode 100644
index 00000000..2afeb651
--- /dev/null
+++ b/src/platform/posix/posix_pollq_port.c
@@ -0,0 +1,268 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Liam Staskawicz <liam@stask.net>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifdef NNG_HAVE_PORT_CREATE
+
+#include <errno.h>
+#include <port.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h> /* for strerror() */
+#include <unistd.h>
+
+#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
+
+// nni_posix_pollq is a work structure that manages state for the port-event
+// based pollq implementation. We only really need to keep track of the
+// single thread, and the associated port itself.
+struct nni_posix_pollq {
+ int port; // port id (from port_create)
+ nni_thr thr; // worker thread
+};
+
+int
+nni_posix_pollq_add(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq;
+
+ pq = nni_posix_pollq_get(node->fd);
+ if (pq == NULL) {
+ return (NNG_EINVAL);
+ }
+
+ nni_mtx_lock(&node->mx);
+ // ensure node was not previously associated with a pollq
+ if (node->pq != NULL) {
+ nni_mtx_unlock(&node->mx);
+ return (NNG_ESTATE);
+ }
+
+ node->pq = pq;
+ node->events = 0;
+ node->armed = false;
+ nni_mtx_unlock(&node->mx);
+
+ return (0);
+}
+
+// nni_posix_pollq_remove removes the node from the pollq, but
+// does not ensure that the pollq node is safe to destroy. In particular,
+// this function can be called from a callback (the callback may be active).
+void
+nni_posix_pollq_remove(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&node->mx);
+ node->events = 0;
+ if (node->armed) {
+ // Failure modes that can occur are uninteresting.
+ (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
+ node->armed = false;
+ }
+ nni_mtx_unlock(&node->mx);
+}
+
+// nni_posix_pollq_init merely ensures that the node is ready for use.
+// It does not register the node with any pollq in particular.
+int
+nni_posix_pollq_init(nni_posix_pollq_node *node)
+{
+ nni_mtx_init(&node->mx);
+ nni_cv_init(&node->cv, &node->mx);
+ node->pq = NULL;
+ node->armed = false;
+ NNI_LIST_NODE_INIT(&node->node);
+ return (0);
+}
+
+// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
+// but it also ensures that the node is removed properly.
+void
+nni_posix_pollq_fini(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ nni_mtx_lock(&node->mx);
+ if ((pq = node->pq) != NULL) {
+ // Dissociate the port; if it isn't already associated we
+ // don't care. (An extra syscall, but it should not matter.)
+ (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
+ node->armed = false;
+
+ for (;;) {
+ if (port_send(pq->port, 0, node) == 0) {
+ break;
+ }
+ switch (errno) {
+ case EAGAIN:
+ case ENOMEM:
+ // Resource exhaustion.
+ // Best bet in these cases is to sleep it off.
+ // This may appear like a total application
+ // hang, but by sleeping here maybe we give
+ // a chance for things to clear up.
+ nni_mtx_unlock(&node->mx);
+ nni_msleep(5000);
+ nni_mtx_unlock(&node->mx);
+ continue;
+ case EBADFD:
+ case EBADF:
+ // Most likely these indicate that the pollq
+ // itself has been closed. That's ok.
+ break;
+ }
+ }
+ // Wait for the pollq thread to tell us with certainty that
+ // they are done. This is needed to ensure that the pollq
+ // thread isn't executing (or about to execute) the callback
+ // before we destroy it.
+ while (node->pq != NULL) {
+ nni_cv_wait(&node->cv);
+ }
+ }
+ nni_mtx_unlock(&node->mx);
+ nni_cv_fini(&node->cv);
+ nni_mtx_fini(&node->mx);
+}
+
+void
+nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ NNI_ASSERT(pq != NULL);
+ if (events == 0) {
+ return;
+ }
+
+ nni_mtx_lock(&node->mx);
+ node->events |= events;
+ node->armed = true;
+ (void) port_associate(
+ pq->port, PORT_SOURCE_FD, node->fd, node->events, node);
+
+ // Possible errors here are:
+ //
+ // EBADF -- programming error on our part
+ // EBADFD -- programming error on our part
+ // ENOMEM -- not much we can do here
+ // EAGAIN -- too many port events registered (65K!!)
+ //
+ // For now we ignore them all. (We need to be able to return
+ // errors to our caller.) Effect on the application will appear
+ // to be a stalled file descriptor (no notifications).
+ nni_mtx_unlock(&node->mx);
+}
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+
+ for (;;) {
+ nni_posix_pollq * pq = arg;
+ port_event_t ev;
+ nni_posix_pollq_node *node;
+
+ if (port_get(pq->port, &ev, NULL) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return;
+ }
+
+ switch (ev.portev_source) {
+ case PORT_SOURCE_ALERT:
+ return;
+
+ case PORT_SOURCE_FD:
+ node = ev.portev_user;
+
+ nni_mtx_lock(&node->mx);
+ node->revents = ev.portev_events;
+ // mark events as cleared
+ node->events &= ~node->revents;
+ node->armed = false;
+ nni_mtx_unlock(&node->mx);
+
+ node->cb(node->data);
+ break;
+
+ case PORT_SOURCE_USER:
+ // User event telling us to stop doing things.
+ // We signal back to use this as a coordination event
+ // between the pollq and the thread handler.
+ // NOTE: It is absolutely critical that there is only
+ // a single thread per pollq. Otherwise we cannot
+ // be sure that we are blocked completely,
+ node = ev.portev_user;
+ nni_mtx_lock(&node->mx);
+ node->pq = NULL;
+ nni_cv_wake(&node->cv);
+ nni_mtx_unlock(&node->mx);
+ }
+ }
+}
+
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
+{
+ port_alert(pq->port, PORT_ALERT_SET, 1, NULL);
+ (void) close(pq->port);
+ nni_thr_fini(&pq->thr);
+}
+
+static int
+nni_posix_pollq_create(nni_posix_pollq *pq)
+{
+ int rv;
+
+ if ((pq->port = port_create()) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ nni_posix_pollq_destroy(pq);
+ return (rv);
+ }
+
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
+nni_posix_pollq *
+nni_posix_pollq_get(int fd)
+{
+ NNI_ARG_UNUSED(fd);
+ return (&nni_posix_global_pollq);
+}
+
+int
+nni_posix_pollq_sysinit(void)
+{
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
+}
+
+void
+nni_posix_pollq_sysfini(void)
+{
+ nni_posix_pollq_destroy(&nni_posix_global_pollq);
+}
+
+#endif // NNG_HAVE_PORT_CREATE