aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-18 17:29:39 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-18 17:29:39 -0700
commit3eb60946ae8b5ad7d8a95233ffe946432acdb837 (patch)
tree6bf6c678411a0e63731a624929fcb979781a9758 /src
parentab7772be3e3c208a48408b67924d3b58fca7f474 (diff)
downloadnng-3eb60946ae8b5ad7d8a95233ffe946432acdb837.tar.gz
nng-3eb60946ae8b5ad7d8a95233ffe946432acdb837.tar.bz2
nng-3eb60946ae8b5ad7d8a95233ffe946432acdb837.zip
Sometimes providers don't clear the prov data details. (Backoff).
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c3
-rw-r--r--src/platform/posix/posix_pollq.h18
-rw-r--r--src/platform/posix/posix_pollq_poll.c218
3 files changed, 127 insertions, 112 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index c6512eb4..f9927e9d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -137,9 +137,6 @@ nni_aio_wait(nni_aio *aio)
int
nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
{
- NNI_ASSERT(aio->a_prov_data == NULL);
- NNI_ASSERT(aio->a_prov_cancel == NULL);
-
nni_mtx_lock(&aio->a_lk);
aio->a_flags &= ~(NNI_AIO_DONE | NNI_AIO_WAKE);
if (aio->a_flags & NNI_AIO_FINI) {
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index e42b120d..9fa7c92e 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -25,22 +25,26 @@ typedef struct nni_posix_pollq_node nni_posix_pollq_node;
typedef struct nni_posix_pollq nni_posix_pollq;
struct nni_posix_pollq_node {
- nni_posix_pollq *pq; // associated pollq
nni_list_node node; // linkage into the pollq list
+ nni_posix_pollq *pq; // associated pollq
int index; // used by the poller impl
int armed; // used by the poller impl
int fd; // file descriptor to poll
int events; // events to watch for
int revents; // events received
- void * data; // user data
- nni_cb cb; // user callback on event
+ nni_taskq_ent task;
+ void * data; // user data
+ nni_cb cb; // user callback on event
};
extern nni_posix_pollq *nni_posix_pollq_get(int);
-extern int nni_posix_pollq_submit(nni_posix_pollq *, nni_posix_pollq_node *);
-extern void nni_posix_pollq_cancel(nni_posix_pollq *, nni_posix_pollq_node *);
-extern int nni_posix_pollq_sysinit(void);
-extern void nni_posix_pollq_sysfini(void);
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
+
+extern int nni_posix_pollq_add(nni_posix_pollq *, nni_posix_pollq_node *);
+extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
+extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
+extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int);
#endif // PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index fe1359ec..d378c45a 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -35,18 +35,15 @@ struct nni_posix_pollq {
nni_mtx mtx;
nni_cv cv;
struct pollfd *fds;
- struct pollfd *newfds;
int nfds;
- int nnewfds;
int wakewfd; // write side of waker pipe
int wakerfd; // read side of waker pipe
int close; // request for worker to exit
int started;
nni_thr thr; // worker thread
- nni_list nodes; // poll list
- nni_list notify; // notify list
+ nni_list armed; // armed nodes
+ nni_list idle; // idle nodes
int nnodes; // num of nodes in nodes list
- int cancel; // waiters for cancellation
int inpoll; // poller asleep in poll
nni_posix_pollq_node *active; // active node (in callback)
@@ -56,32 +53,24 @@ static int
nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
{
int grow = pq->nnodes + 2; // one for us, one for waker
- int noldfds;
- struct pollfd *oldfds;
struct pollfd *newfds;
- if ((grow < pq->nfds) || (grow < pq->nnewfds)) {
+ if (grow < pq->nfds) {
return (0);
}
grow = grow + 128;
- // Maybe we are adding a *lot* of pipes at once, and have to grow
- // multiple times before the poller gets scheduled. In that case
- // toss the old array before we finish.
- oldfds = pq->newfds;
- noldfds = pq->nnewfds;
-
- if ((newfds = nni_alloc(grow * sizeof(struct pollfd))) == NULL) {
+ if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) {
return (NNG_ENOMEM);
}
- pq->newfds = newfds;
- pq->nnewfds = grow;
-
- if (noldfds != 0) {
- nni_free(oldfds, noldfds * sizeof(struct pollfd));
+ if (pq->nfds != 0) {
+ NNI_FREE_STRUCTS(pq->fds, pq->nfds);
}
+ pq->fds = newfds;
+ pq->nfds = grow;
+
return (0);
}
@@ -101,15 +90,6 @@ nni_posix_poll_thr(void *arg)
break;
}
- if (pollq->newfds != NULL) {
- // We have "grown" by the caller. Free up the old
- // space, and start using the new.
- nni_free(
- pollq->fds, pollq->nfds * sizeof(struct pollfd));
- pollq->fds = pollq->newfds;
- pollq->nfds = pollq->nnewfds;
- pollq->newfds = NULL;
- }
fds = pollq->fds;
nfds = 0;
@@ -121,9 +101,9 @@ nni_posix_poll_thr(void *arg)
nfds++;
// Set up the poll list.
- NNI_LIST_FOREACH (&pollq->nodes, node) {
+ NNI_LIST_FOREACH (&pollq->armed, node) {
fds[nfds].fd = node->fd;
- fds[nfds].events = node->armed;
+ fds[nfds].events = node->events;
fds[nfds].revents = 0;
node->index = nfds;
nfds++;
@@ -156,22 +136,20 @@ nni_posix_poll_thr(void *arg)
// their index set to -1. Removed ones will just be absent.
// Note that we may remove the pipedesc from the list, so we
// have to use a custom iterator.
- nextnode = nni_list_first(&pollq->nodes);
+ nextnode = nni_list_first(&pollq->armed);
while ((node = nextnode) != NULL) {
int index;
- // Save the nextpd for our next iteration. This
- // way we can remove the PD from the list without
- // breaking the iteration.
-
- nextnode = nni_list_next(&pollq->nodes, node);
+ // Save the next node, so that we can remove this
+ // one if needed.
+ nextnode = nni_list_next(&pollq->armed, node);
// If index is less than 1, then we have just added
// this and there is no FD for it in the pollfds.
if ((index = node->index) < 1) {
continue;
}
-
+ // Was there any activity?
if (fds[index].revents == 0) {
continue;
}
@@ -180,94 +158,127 @@ nni_posix_poll_thr(void *arg)
node->index = 0;
node->revents = fds[index].revents;
- // Now we move this node to the callback list.
- node->armed = 0;
- nni_list_remove(&pollq->nodes, node);
- nni_list_append(&pollq->notify, node);
- pollq->nnodes--;
- }
-
- // Finally we can call the callbacks. We record the
- // active callback so any attempt to cancel blocks until
- // the callback is finished.
- while ((node = nni_list_first(&pollq->notify)) != NULL) {
- nni_list_remove(&pollq->notify, node);
+ // Execute callbacks. Note that these occur with
+ // the lock held.
if (node->cb != NULL) {
- pollq->active = node;
- nni_mtx_unlock(&pollq->mtx);
node->cb(node->data);
- nni_mtx_lock(&pollq->mtx);
- pollq->active = NULL;
+ } else {
+ // No further events for you!
+ node->events = 0;
}
- }
- // Wake any cancelers.
- if (pollq->cancel != 0) {
- pollq->cancel = 0;
- nni_cv_wake(&pollq->cv);
+ // Callback should clear events. If none were
+ // rearmed, then move to the idle list so we won't
+ // keep looking at it.
+ if (node->events == 0) {
+ nni_list_remove(&pollq->armed, node);
+ nni_list_append(&pollq->idle, node);
+ }
}
}
nni_mtx_unlock(&pollq->mtx);
}
-void
-nni_posix_pollq_cancel(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+// nni_posix_pollq_add_cb is intended to be uesd during endpoint
+// operations that create new pollq nodes in callbacks. The lock
+// for the pollq must be held.
+int
+nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node)
{
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->cancel++;
- nni_cv_wait(&pq->cv);
+ int rv;
+ NNI_ASSERT(!nni_list_node_active(&node->node));
+
+ node->pq = pq;
+ if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
+ return (rv);
}
- if (nni_list_active(&pq->nodes, node)) {
- node->armed = 0;
- nni_list_remove(&pq->nodes, node);
+ pq->nnodes++;
+ if (node->events != 0) {
+ nni_list_append(&pq->armed, node);
+ } else {
+ nni_list_append(&pq->idle, node);
}
- // Since we're not removing the fd from the outstanding poll, we
- // may get an event. In that case, we'll wake and rebuild the
- // pollset without it, with no further action. Otherwise having the
- // poll present does no harm beyond the "spurious" wake of the poller
- // thread. (If we had port_disassociate or somesuch, this would be
- // a great time for that.)
- nni_mtx_unlock(&pq->mtx);
+ return (0);
}
int
-nni_posix_pollq_submit(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node)
{
- int wake;
int rv;
- int evs;
nni_mtx_lock(&pq->mtx);
+ rv = nni_posix_pollq_add_cb(pq, node);
+ nni_mtx_unlock(&pq->mtx);
+ return (rv);
+}
- if (node->events == 0) {
- // Nothing to schedule?
- nni_mtx_unlock(&pq->mtx);
- return (0);
+void
+nni_posix_pollq_remove(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ if (pq == NULL) {
+ return;
}
+ nni_mtx_lock(&pq->mtx);
+ NNI_ASSERT(nni_list_node_active(&node->node));
+ nni_list_node_remove(&node->node);
+ pq->nnodes--;
+ nni_mtx_unlock(&pq->mtx);
+}
- if (node->armed == 0) {
- NNI_ASSERT(!nni_list_active(&pq->nodes, node));
+void
+nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+{
+ nni_posix_pollq *pq = node->pq;
+ int oevents;
- rv = nni_posix_pollq_poll_grow(pq);
- if (rv != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
- }
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_mtx_lock(&pq->mtx);
+ oevents = node->events;
+ node->events |= events;
- nni_list_append(&pq->nodes, node);
- pq->nnodes++;
+ if ((oevents == 0) && (events != 0)) {
+ if (nni_list_node_active(&node->node)) {
+ nni_list_node_remove(&node->node);
+ }
+ nni_list_append(&pq->armed, node);
}
+ if ((events != 0) && (oevents != events)) {
+ // Possibly wake up the poller since we're looking for
+ // new events.
+ if (pq->inpoll) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
- node->armed |= node->events;
+void
+nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
+{
+ nni_posix_pollq *pq = node->pq;
+ int oevents;
- // Wake up the poller since we're adding a new poll, but only bother
- // if it's already asleep. (Frequently it will *not* be.)
- if (pq->inpoll) {
- nni_plat_pipe_raise(pq->wakewfd);
+ if (pq == NULL) {
+ return;
}
+
+ nni_mtx_lock(&pq->mtx);
+ oevents = node->events;
+ node->events &= ~events;
+ if ((node->events == 0) && (oevents != 0)) {
+ if (nni_list_node_active(&node->node)) {
+ nni_list_node_remove(&node->node);
+ }
+ nni_list_append(&pq->idle, node);
+ }
+ // No need to wake anything, we might get a spurious wake up but
+ // that's harmless.
nni_mtx_unlock(&pq->mtx);
- return (0);
}
static void
@@ -280,7 +291,9 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
nni_plat_pipe_raise(pq->wakewfd);
// All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->nodes));
+ NNI_ASSERT(nni_list_empty(&pq->armed));
+ NNI_ASSERT(nni_list_empty(&pq->idle));
+ NNI_ASSERT(pq->nnodes == 0);
nni_mtx_unlock(&pq->mtx);
}
@@ -289,8 +302,9 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
pq->wakewfd = pq->wakerfd = -1;
}
- nni_free(pq->newfds, pq->nnewfds * sizeof(struct pollfd));
- nni_free(pq->fds, pq->nfds * sizeof(struct pollfd));
+ if (pq->nfds != 0) {
+ NNI_FREE_STRUCTS(pq->fds, pq->nfds);
+ }
nni_mtx_fini(&pq->mtx);
}
@@ -299,8 +313,8 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
{
int rv;
- NNI_LIST_INIT(&pq->nodes, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->notify, nni_posix_pollq_node, node);
+ NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
+ NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
pq->wakewfd = -1;
pq->wakerfd = -1;
pq->close = 0;