aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
commit5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch)
tree41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix
parent3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff)
downloadnng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the pollq. This triggers an assertion failure when it occurs. I am still trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_epdesc.c127
-rw-r--r--src/platform/posix/posix_pipedesc.c120
-rw-r--r--src/platform/posix/posix_pollq.h5
-rw-r--r--src/platform/posix/posix_pollq_poll.c144
4 files changed, 198 insertions, 198 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 8cae2565..b89af982 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -33,11 +33,10 @@
#endif
struct nni_posix_epdesc {
- int fd;
+ nni_posix_pollq_node node;
nni_list connectq;
nni_list acceptq;
- nni_posix_pollq_node node;
- nni_posix_pollq * pq;
+ int closed;
struct sockaddr_storage locaddr;
struct sockaddr_storage remaddr;
socklen_t loclen;
@@ -53,6 +52,7 @@ nni_posix_epdesc_cancel(nni_aio *aio)
nni_mtx_lock(&ed->mtx);
nni_aio_list_remove(aio);
+ NNI_ASSERT(aio->a_pipe == NULL);
nni_mtx_unlock(&ed->mtx);
}
@@ -60,20 +60,17 @@ static void
nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
{
nni_posix_epdesc * ed = aio->a_prov_data;
- nni_posix_pipedesc *pd;
+ nni_posix_pipedesc *pd = NULL;
// acceptq or connectq.
nni_aio_list_remove(aio);
if (rv == 0) {
- rv = nni_posix_pipedesc_init(&pd, newfd);
- if (rv != 0) {
+ if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) {
(void) close(newfd);
- } else {
- aio->a_pipe = pd;
}
}
- if ((nni_aio_finish(aio, rv, 0) != 0) && (rv == 0)) {
+ if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) {
nni_posix_pipedesc_fini(pd);
}
}
@@ -94,14 +91,15 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
rv = -1;
sz = sizeof(rv);
- if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) <
+ 0) {
rv = errno;
}
switch (rv) {
case 0:
// Success!
- nni_posix_epdesc_finish(aio, 0, ed->fd);
- ed->fd = -1;
+ nni_posix_epdesc_finish(aio, 0, ed->node.fd);
+ ed->node.fd = -1;
continue;
case EINPROGRESS:
@@ -113,8 +111,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
rv = ECONNREFUSED;
}
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
- close(ed->fd);
- ed->fd = -1;
+ (void) close(ed->node.fd);
+ ed->node.fd = -1;
continue;
}
}
@@ -134,12 +132,12 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
// do getpeername().
#ifdef NNG_USE_ACCEPT4
- newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC);
+ newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC);
if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(ed->fd, NULL, NULL);
+ newfd = accept(ed->node.fd, NULL, NULL);
}
#else
- newfd = accept(ed->fd, NULL, NULL);
+ newfd = accept(ed->node.fd, NULL, NULL);
#endif
if (newfd >= 0) {
@@ -176,7 +174,7 @@ nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
int rv = 1;
socklen_t sz = sizeof(rv);
- if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
if (rv == 0) {
@@ -198,27 +196,30 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
nni_aio * aio;
struct sockaddr_un *sun;
- if (ed->fd != -1) {
- (void) shutdown(ed->fd, SHUT_RDWR);
- sun = (void *) &ed->locaddr;
- if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
- (void) unlink(sun->sun_path);
- }
- (void) close(ed->fd);
- ed->fd = -1;
- }
+ ed->closed = 1;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
+
+ if (ed->node.fd != -1) {
+ (void) shutdown(ed->node.fd, SHUT_RDWR);
+ sun = (void *) &ed->locaddr;
+ if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
+ (void) unlink(sun->sun_path);
+ }
+ (void) close(ed->node.fd);
+ ed->node.fd = -1;
+ }
}
static void
nni_posix_epdesc_cb(void *arg)
{
nni_posix_epdesc *ed = arg;
+ int events;
nni_mtx_lock(&ed->mtx);
@@ -234,22 +235,22 @@ nni_posix_epdesc_cb(void *arg)
if (ed->node.revents & POLLNVAL) {
nni_posix_epdesc_doclose(ed);
}
- ed->node.revents = 0;
- ed->node.events = 0;
+ events = 0;
if (!nni_list_empty(&ed->connectq)) {
- ed->node.events |= POLLOUT;
+ events |= POLLOUT;
}
if (!nni_list_empty(&ed->acceptq)) {
- ed->node.events |= POLLIN;
+ events |= POLLIN;
}
+ nni_posix_pollq_arm(&ed->node, events);
nni_mtx_unlock(&ed->mtx);
}
void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
- nni_posix_pollq_cancel(ed->pq, &ed->node);
+ nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT);
nni_mtx_lock(&ed->mtx);
nni_posix_epdesc_doclose(ed);
@@ -343,7 +344,6 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- ed->fd = fd;
ed->node.fd = fd;
nni_mtx_unlock(&ed->mtx);
return (0);
@@ -365,22 +365,14 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
return;
}
- if (ed->fd < 0) {
+ if (ed->closed) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
return;
}
nni_aio_list_append(&ed->acceptq, aio);
- if ((ed->node.events & POLLIN) == 0) {
- ed->node.events |= POLLIN;
- rv = nni_posix_pollq_submit(ed->pq, &ed->node);
- if (rv != 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -389,6 +381,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
{
// NB: We assume that the FD is already set to nonblocking mode.
int rv;
+ int fd;
nni_mtx_lock(&ed->mtx);
// If we can't start, it means that the AIO was stopped.
@@ -397,35 +390,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
return;
}
- ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (ed->fd < 0) {
+ fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
+ if (fd < 0) {
nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
return;
}
- ed->node.fd = ed->fd;
// Possibly bind.
if (ed->loclen != 0) {
- rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen);
+ rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
if (rv != 0) {
- (void) close(ed->fd);
- ed->fd = -1;
+ (void) close(fd);
nni_posix_epdesc_finish(aio, rv, 0);
nni_mtx_unlock(&ed->mtx);
return;
}
}
- (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
- nni_posix_epdesc_finish(aio, 0, ed->fd);
- ed->fd = -1;
+ nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
}
@@ -435,26 +426,16 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno == ENOENT) {
errno = ECONNREFUSED;
}
- (void) close(ed->fd);
- ed->fd = -1;
+ (void) close(fd);
nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
nni_mtx_unlock(&ed->mtx);
return;
}
// We have to submit to the pollq, because the connection is pending.
+ ed->node.fd = fd;
nni_aio_list_append(&ed->connectq, aio);
- if ((ed->node.events & POLLOUT) == 0) {
- ed->node.events |= POLLOUT;
- rv = nni_posix_pollq_submit(ed->pq, &ed->node);
- if (rv != 0) {
- (void) close(ed->fd);
- ed->fd = -1;
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
}
@@ -462,6 +443,7 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
{
nni_posix_epdesc *ed;
+ nni_posix_pollq * pq;
int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
@@ -478,8 +460,6 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
// one. For now we just have a global pollq. Note that by tying
// the ed to a single pollq we may get some kind of cache warmth.
- ed->pq = nni_posix_pollq_get((int) nni_random());
- ed->fd = -1;
ed->node.index = 0;
ed->node.cb = nni_posix_epdesc_cb;
ed->node.data = ed;
@@ -488,6 +468,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
+ pq = nni_posix_pollq_get(nni_random() % 0xffff);
+ if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) {
+ nni_mtx_fini(&ed->mtx);
+ NNI_FREE_STRUCT(ed);
+ return (rv);
+ }
*edp = ed;
return (0);
}
@@ -525,9 +511,10 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- if (ed->fd >= 0) {
- (void) close(ed->fd);
+ if (ed->node.fd >= 0) {
+ (void) close(ed->node.fd);
}
+ nni_posix_pollq_remove(&ed->node);
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index faab91ad..4e61c2c4 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -27,11 +27,10 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq * pq;
- int fd;
+ nni_posix_pollq_node node;
nni_list readq;
nni_list writeq;
- nni_posix_pollq_node node;
+ int closed;
nni_mtx mtx;
};
@@ -39,7 +38,25 @@ static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ (void) nni_aio_finish(aio, rv, aio->a_count);
+}
+
+static void
+nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+
+ pd->closed = 1;
+ if (pd->node.fd != -1) {
+ // Let any peer know we are closing.
+ (void) shutdown(pd->node.fd, SHUT_RDWR);
+ }
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
}
static void
@@ -60,7 +77,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
iovp = &iovec[0];
rv = 0;
- n = writev(pd->fd, iovp, aio->a_niov);
+ n = writev(pd->node.fd, iovp, aio->a_niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
@@ -70,6 +87,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
rv = nni_plat_errno(errno);
nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_doclose(pd);
return;
}
@@ -121,7 +139,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
iovp = &iovec[0];
rv = 0;
- n = readv(pd->fd, iovp, aio->a_niov);
+ n = readv(pd->node.fd, iovp, aio->a_niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
@@ -131,6 +149,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
rv = nni_plat_errno(errno);
nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_doclose(pd);
return;
}
@@ -171,28 +190,10 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- if (pd->fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->fd, SHUT_RDWR);
- close(pd->fd);
- pd->fd = -1;
- }
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
-}
-
-static void
nni_posix_pipedesc_cb(void *arg)
{
- nni_posix_pipedesc *pd = arg;
+ nni_posix_pipedesc *pd = arg;
+ int events = 0;
nni_mtx_lock(&pd->mtx);
if (pd->node.revents & POLLIN) {
@@ -203,21 +204,16 @@ nni_posix_pipedesc_cb(void *arg)
}
if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
- }
-
- pd->node.revents = 0;
- pd->node.events = 0;
-
- if (!nni_list_empty(&pd->writeq)) {
- pd->node.events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- pd->node.events |= POLLIN;
- }
-
- // If we still have uncompleted operations, resubmit us.
- if (pd->node.events != 0) {
- nni_posix_pollq_submit(pd->pq, &pd->node);
+ } else {
+ if (!nni_list_empty(&pd->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&pd->readq)) {
+ events |= POLLIN;
+ }
+ if (events) {
+ nni_posix_pollq_arm(&pd->node, events);
+ }
}
nni_mtx_unlock(&pd->mtx);
}
@@ -225,7 +221,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_cancel(pd->pq, &pd->node);
+ nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT);
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
@@ -252,22 +248,14 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->fd < 0) {
+ if (pd->closed) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
}
nni_aio_list_append(&pd->readq, aio);
- if ((pd->node.events & POLLIN) == 0) {
- pd->node.events |= POLLIN;
- rv = nni_posix_pollq_submit(pd->pq, &pd->node);
- if (rv != 0) {
- nni_posix_pipedesc_finish(aio, rv);
- nni_mtx_unlock(&pd->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&pd->node, POLLIN);
nni_mtx_unlock(&pd->mtx);
}
@@ -281,22 +269,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->fd < 0) {
+ if (pd->closed < 0) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
}
nni_aio_list_append(&pd->writeq, aio);
- if ((pd->node.events & POLLOUT) == 0) {
- pd->node.events |= POLLOUT;
- rv = nni_posix_pollq_submit(pd->pq, &pd->node);
- if (rv != 0) {
- nni_posix_pipedesc_finish(aio, rv);
- nni_mtx_unlock(&pd->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&pd->node, POLLOUT);
nni_mtx_unlock(&pd->mtx);
}
@@ -309,7 +289,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
return (NNG_ENOMEM);
}
- memset(pd, 0, sizeof(*pd));
// We could randomly choose a different pollq, or for efficiencies
// sake we could take a modulo of the file desc number to choose
@@ -320,17 +299,22 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
NNI_FREE_STRUCT(pd);
return (rv);
}
- pd->pq = nni_posix_pollq_get(fd);
- pd->fd = fd;
+ pd->closed = 0;
pd->node.fd = fd;
pd->node.cb = nni_posix_pipedesc_cb;
pd->node.data = pd;
- (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
+ rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node);
+ if (rv != 0) {
+ nni_mtx_fini(&pd->mtx);
+ NNI_FREE_STRUCT(pd);
+ return (rv);
+ }
*pdp = pd;
return (0);
}
@@ -340,6 +324,10 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
// Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
+ nni_posix_pollq_remove(&pd->node);
+ if (pd->node.fd >= 0) {
+ (void) close(pd->node.fd);
+ }
nni_mtx_fini(&pd->mtx);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 9fa7c92e..acffb975 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -32,9 +32,8 @@ struct nni_posix_pollq_node {
int fd; // file descriptor to poll
int events; // events to watch for
int revents; // events received
- nni_taskq_ent task;
- void * data; // user data
- nni_cb cb; // user callback on event
+ void * data; // user data
+ nni_cb cb; // user callback on event
};
extern nni_posix_pollq *nni_posix_pollq_get(int);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d378c45a..1bc814bf 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -32,20 +32,21 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd *fds;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list armed; // armed nodes
- nni_list idle; // idle nodes
- int nnodes; // num of nodes in nodes list
- int inpoll; // poller asleep in poll
-
+ nni_mtx mtx;
+ nni_cv cv;
+ struct pollfd * fds;
+ int nfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ int close; // request for worker to exit
+ int started;
+ nni_thr thr; // worker thread
+ nni_list polled; // polled nodes
+ nni_list armed; // armed nodes
+ nni_list idle; // idle nodes
+ int nnodes; // num of nodes in nodes list
+ int inpoll; // poller asleep in poll
+ nni_posix_pollq_node *wait; // cancel waiting on this
nni_posix_pollq_node *active; // active node (in callback)
};
@@ -78,7 +79,7 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq * pollq = arg;
- nni_posix_pollq_node *node, *nextnode;
+ nni_posix_pollq_node *node;
nni_mtx_lock(&pollq->mtx);
for (;;) {
@@ -101,7 +102,9 @@ nni_posix_poll_thr(void *arg)
nfds++;
// Set up the poll list.
- NNI_LIST_FOREACH (&pollq->armed, node) {
+ while ((node = nni_list_first(&pollq->armed)) != NULL) {
+ nni_list_remove(&pollq->armed, node);
+ nni_list_append(&pollq->polled, node);
fds[nfds].fd = node->fd;
fds[nfds].events = node->events;
fds[nfds].revents = 0;
@@ -131,48 +134,53 @@ nni_posix_poll_thr(void *arg)
nni_plat_pipe_clear(pollq->wakerfd);
}
- // Now we iterate through all the nodes. Note that one
- // may have been added or removed. New pipedescs will have
- // their index set to -1. Removed ones will just be absent.
- // Note that we may remove the pipedesc from the list, so we
- // have to use a custom iterator.
- nextnode = nni_list_first(&pollq->armed);
- while ((node = nextnode) != NULL) {
- int index;
-
- // Save the next node, so that we can remove this
- // one if needed.
- nextnode = nni_list_next(&pollq->armed, node);
-
- // If index is less than 1, then we have just added
- // this and there is no FD for it in the pollfds.
- if ((index = node->index) < 1) {
- continue;
- }
- // Was there any activity?
+ while ((node = nni_list_first(&pollq->polled)) != NULL) {
+ int index = node->index;
+
+ // We remove ourselves from the polled list, and
+ // then put it on either the idle or armed list
+ // depending on whether it remains armed.
+ node->index = 0;
+ nni_list_remove(&pollq->polled, node);
+ NNI_ASSERT(index > 0);
if (fds[index].revents == 0) {
+ // If still watching for events, return it
+ // to the armed list.
+ if (node->events) {
+ nni_list_append(&pollq->armed, node);
+ } else {
+ nni_list_append(&pollq->idle, node);
+ }
continue;
}
- // Clear the index for the next time around.
- node->index = 0;
+ // We are calling the callback, so disarm
+ // all events; the node can rearm them in its
+ // callback.
node->revents = fds[index].revents;
-
- // Execute callbacks. Note that these occur with
- // the lock held.
- if (node->cb != NULL) {
- node->cb(node->data);
+ node->events &= ~node->revents;
+ if (node->events == 0) {
+ nni_list_append(&pollq->idle, node);
} else {
- // No further events for you!
- node->events = 0;
+ nni_list_append(&pollq->armed, node);
}
- // Callback should clear events. If none were
- // rearmed, then move to the idle list so we won't
- // keep looking at it.
- if (node->events == 0) {
- nni_list_remove(&pollq->armed, node);
- nni_list_append(&pollq->idle, node);
+ // Save the active node; we can notice this way
+ // when it is busy, and avoid freeing it until
+ // we are sure that it is not in use.
+ pollq->active = node;
+
+ // Execute the callback -- without locks held.
+ nni_mtx_unlock(&pollq->mtx);
+ node->cb(node->data);
+ nni_mtx_lock(&pollq->mtx);
+
+ // We finished with this node. If something
+ // was blocked waiting for that, wake it up.
+ pollq->active = NULL;
+ if (pollq->wait == node) {
+ pollq->wait = NULL;
+ nni_cv_wake(&pollq->cv);
}
}
}
@@ -188,6 +196,10 @@ nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node)
int rv;
NNI_ASSERT(!nni_list_node_active(&node->node));
+ if (pq->close) {
+ // This shouldn't happen!
+ return (NNG_ECLOSED);
+ }
node->pq = pq;
if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
return (rv);
@@ -221,9 +233,17 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node)
return;
}
nni_mtx_lock(&pq->mtx);
- NNI_ASSERT(nni_list_node_active(&node->node));
- nni_list_node_remove(&node->node);
- pq->nnodes--;
+ while (pq->active == node) {
+ pq->wait = node;
+ nni_cv_wait(&pq->cv);
+ }
+ if (nni_list_node_active(&node->node)) {
+ nni_list_node_remove(&node->node);
+ pq->nnodes--;
+ }
+ if (pq->close) {
+ nni_cv_wake(&pq->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -241,7 +261,10 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
oevents = node->events;
node->events |= events;
- if ((oevents == 0) && (events != 0)) {
+ // We move this to the armed list if its not armed, or already
+ // on the polled list. The polled list would be the case where
+ // the index is set to a positive value.
+ if ((oevents == 0) && (events != 0) && (node->index < 1)) {
if (nni_list_node_active(&node->node)) {
nni_list_node_remove(&node->node);
}
@@ -289,15 +312,17 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
pq->close = 1;
pq->started = 0;
nni_plat_pipe_raise(pq->wakewfd);
-
- // All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->armed));
- NNI_ASSERT(nni_list_empty(&pq->idle));
- NNI_ASSERT(pq->nnodes == 0);
nni_mtx_unlock(&pq->mtx);
}
nni_thr_fini(&pq->thr);
+
+ // All pipes should have been closed before this is called.
+ NNI_ASSERT(nni_list_empty(&pq->polled));
+ NNI_ASSERT(nni_list_empty(&pq->armed));
+ NNI_ASSERT(nni_list_empty(&pq->idle));
+ NNI_ASSERT(pq->nnodes == 0);
+
if (pq->wakewfd >= 0) {
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
pq->wakewfd = pq->wakerfd = -1;
@@ -313,6 +338,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
{
int rv;
+ NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node);
NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
pq->wakewfd = -1;