aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pipedesc.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
commit5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch)
tree41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix/posix_pipedesc.c
parent3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff)
downloadnng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the pollq. This triggers an assertion failure when it occurs. I am still trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix/posix_pipedesc.c')
-rw-r--r--src/platform/posix/posix_pipedesc.c120
1 files changed, 54 insertions, 66 deletions
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index faab91ad..4e61c2c4 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -27,11 +27,10 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq * pq;
- int fd;
+ nni_posix_pollq_node node;
nni_list readq;
nni_list writeq;
- nni_posix_pollq_node node;
+ int closed;
nni_mtx mtx;
};
@@ -39,7 +38,25 @@ static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
nni_aio_list_remove(aio);
- nni_aio_finish(aio, rv, aio->a_count);
+ (void) nni_aio_finish(aio, rv, aio->a_count);
+}
+
+static void
+nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
+{
+ nni_aio *aio;
+
+ pd->closed = 1;
+ if (pd->node.fd != -1) {
+ // Let any peer know we are closing.
+ (void) shutdown(pd->node.fd, SHUT_RDWR);
+ }
+ while ((aio = nni_list_first(&pd->readq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&pd->writeq)) != NULL) {
+ nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
+ }
}
static void
@@ -60,7 +77,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
iovp = &iovec[0];
rv = 0;
- n = writev(pd->fd, iovp, aio->a_niov);
+ n = writev(pd->node.fd, iovp, aio->a_niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
@@ -70,6 +87,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
rv = nni_plat_errno(errno);
nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_doclose(pd);
return;
}
@@ -121,7 +139,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
iovp = &iovec[0];
rv = 0;
- n = readv(pd->fd, iovp, aio->a_niov);
+ n = readv(pd->node.fd, iovp, aio->a_niov);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EINTR)) {
// Can't write more right now. We're done
@@ -131,6 +149,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
rv = nni_plat_errno(errno);
nni_posix_pipedesc_finish(aio, rv);
+ nni_posix_pipedesc_doclose(pd);
return;
}
@@ -171,28 +190,10 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
-{
- nni_aio *aio;
-
- if (pd->fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->fd, SHUT_RDWR);
- close(pd->fd);
- pd->fd = -1;
- }
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
-}
-
-static void
nni_posix_pipedesc_cb(void *arg)
{
- nni_posix_pipedesc *pd = arg;
+ nni_posix_pipedesc *pd = arg;
+ int events = 0;
nni_mtx_lock(&pd->mtx);
if (pd->node.revents & POLLIN) {
@@ -203,21 +204,16 @@ nni_posix_pipedesc_cb(void *arg)
}
if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
- }
-
- pd->node.revents = 0;
- pd->node.events = 0;
-
- if (!nni_list_empty(&pd->writeq)) {
- pd->node.events |= POLLOUT;
- }
- if (!nni_list_empty(&pd->readq)) {
- pd->node.events |= POLLIN;
- }
-
- // If we still have uncompleted operations, resubmit us.
- if (pd->node.events != 0) {
- nni_posix_pollq_submit(pd->pq, &pd->node);
+ } else {
+ if (!nni_list_empty(&pd->writeq)) {
+ events |= POLLOUT;
+ }
+ if (!nni_list_empty(&pd->readq)) {
+ events |= POLLIN;
+ }
+ if (events) {
+ nni_posix_pollq_arm(&pd->node, events);
+ }
}
nni_mtx_unlock(&pd->mtx);
}
@@ -225,7 +221,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_cancel(pd->pq, &pd->node);
+ nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT);
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
@@ -252,22 +248,14 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->fd < 0) {
+ if (pd->closed) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
}
nni_aio_list_append(&pd->readq, aio);
- if ((pd->node.events & POLLIN) == 0) {
- pd->node.events |= POLLIN;
- rv = nni_posix_pollq_submit(pd->pq, &pd->node);
- if (rv != 0) {
- nni_posix_pipedesc_finish(aio, rv);
- nni_mtx_unlock(&pd->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&pd->node, POLLIN);
nni_mtx_unlock(&pd->mtx);
}
@@ -281,22 +269,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->fd < 0) {
+ if (pd->closed < 0) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
}
nni_aio_list_append(&pd->writeq, aio);
- if ((pd->node.events & POLLOUT) == 0) {
- pd->node.events |= POLLOUT;
- rv = nni_posix_pollq_submit(pd->pq, &pd->node);
- if (rv != 0) {
- nni_posix_pipedesc_finish(aio, rv);
- nni_mtx_unlock(&pd->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&pd->node, POLLOUT);
nni_mtx_unlock(&pd->mtx);
}
@@ -309,7 +289,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
return (NNG_ENOMEM);
}
- memset(pd, 0, sizeof(*pd));
// We could randomly choose a different pollq, or for efficiencies
// sake we could take a modulo of the file desc number to choose
@@ -320,17 +299,22 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
NNI_FREE_STRUCT(pd);
return (rv);
}
- pd->pq = nni_posix_pollq_get(fd);
- pd->fd = fd;
+ pd->closed = 0;
pd->node.fd = fd;
pd->node.cb = nni_posix_pipedesc_cb;
pd->node.data = pd;
- (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
+ rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node);
+ if (rv != 0) {
+ nni_mtx_fini(&pd->mtx);
+ NNI_FREE_STRUCT(pd);
+ return (rv);
+ }
*pdp = pd;
return (0);
}
@@ -340,6 +324,10 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
// Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
+ nni_posix_pollq_remove(&pd->node);
+ if (pd->node.fd >= 0) {
+ (void) close(pd->node.fd);
+ }
nni_mtx_fini(&pd->mtx);