diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
| commit | 5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch) | |
| tree | 41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix/posix_pipedesc.c | |
| parent | 3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff) | |
| download | nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2 nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip | |
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the
pollq. This triggers an assertion failure when it occurs. I am still
trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix/posix_pipedesc.c')
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 120 |
1 files changed, 54 insertions, 66 deletions
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index faab91ad..4e61c2c4 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -27,11 +27,10 @@ // file descriptor for TCP socket, etc.) This contains the list of pending // aios for that underlying socket, as well as the socket itself. struct nni_posix_pipedesc { - nni_posix_pollq * pq; - int fd; + nni_posix_pollq_node node; nni_list readq; nni_list writeq; - nni_posix_pollq_node node; + int closed; nni_mtx mtx; }; @@ -39,7 +38,25 @@ static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, aio->a_count); + (void) nni_aio_finish(aio, rv, aio->a_count); +} + +static void +nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + + pd->closed = 1; + if (pd->node.fd != -1) { + // Let any peer know we are closing. + (void) shutdown(pd->node.fd, SHUT_RDWR); + } + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } } static void @@ -60,7 +77,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) iovp = &iovec[0]; rv = 0; - n = writev(pd->fd, iovp, aio->a_niov); + n = writev(pd->node.fd, iovp, aio->a_niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done @@ -70,6 +87,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) rv = nni_plat_errno(errno); nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_doclose(pd); return; } @@ -121,7 +139,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) iovp = &iovec[0]; rv = 0; - n = readv(pd->fd, iovp, aio->a_niov); + n = readv(pd->node.fd, iovp, aio->a_niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done @@ -131,6 +149,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) rv = nni_plat_errno(errno); nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_doclose(pd); return; } @@ -171,28 +190,10 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - - if (pd->fd != -1) { - // Let any peer know we are closing. - (void) shutdown(pd->fd, SHUT_RDWR); - close(pd->fd); - pd->fd = -1; - } - while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } -} - -static void nni_posix_pipedesc_cb(void *arg) { - nni_posix_pipedesc *pd = arg; + nni_posix_pipedesc *pd = arg; + int events = 0; nni_mtx_lock(&pd->mtx); if (pd->node.revents & POLLIN) { @@ -203,21 +204,16 @@ nni_posix_pipedesc_cb(void *arg) } if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_pipedesc_doclose(pd); - } - - pd->node.revents = 0; - pd->node.events = 0; - - if (!nni_list_empty(&pd->writeq)) { - pd->node.events |= POLLOUT; - } - if (!nni_list_empty(&pd->readq)) { - pd->node.events |= POLLIN; - } - - // If we still have uncompleted operations, resubmit us. - if (pd->node.events != 0) { - nni_posix_pollq_submit(pd->pq, &pd->node); + } else { + if (!nni_list_empty(&pd->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&pd->readq)) { + events |= POLLIN; + } + if (events) { + nni_posix_pollq_arm(&pd->node, events); + } } nni_mtx_unlock(&pd->mtx); } @@ -225,7 +221,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_cancel(pd->pq, &pd->node); + nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT); nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); @@ -252,22 +248,14 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) nni_mtx_unlock(&pd->mtx); return; } - if (pd->fd < 0) { + if (pd->closed) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); return; } nni_aio_list_append(&pd->readq, aio); - if ((pd->node.events & POLLIN) == 0) { - pd->node.events |= POLLIN; - rv = nni_posix_pollq_submit(pd->pq, &pd->node); - if (rv != 0) { - nni_posix_pipedesc_finish(aio, rv); - nni_mtx_unlock(&pd->mtx); - return; - } - } + nni_posix_pollq_arm(&pd->node, POLLIN); nni_mtx_unlock(&pd->mtx); } @@ -281,22 +269,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) nni_mtx_unlock(&pd->mtx); return; } - if (pd->fd < 0) { + if (pd->closed < 0) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); return; } nni_aio_list_append(&pd->writeq, aio); - if ((pd->node.events & POLLOUT) == 0) { - pd->node.events |= POLLOUT; - rv = nni_posix_pollq_submit(pd->pq, &pd->node); - if (rv != 0) { - nni_posix_pipedesc_finish(aio, rv); - nni_mtx_unlock(&pd->mtx); - return; - } - } + nni_posix_pollq_arm(&pd->node, POLLOUT); nni_mtx_unlock(&pd->mtx); } @@ -309,7 +289,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { return (NNG_ENOMEM); } - memset(pd, 0, sizeof(*pd)); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose @@ -320,17 +299,22 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) NNI_FREE_STRUCT(pd); return (rv); } - pd->pq = nni_posix_pollq_get(fd); - pd->fd = fd; + pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; pd->node.data = pd; - (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); + rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node); + if (rv != 0) { + nni_mtx_fini(&pd->mtx); + NNI_FREE_STRUCT(pd); + return (rv); + } *pdp = pd; return (0); } @@ -340,6 +324,10 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); + nni_posix_pollq_remove(&pd->node); + if (pd->node.fd >= 0) { + (void) close(pd->node.fd); + } nni_mtx_fini(&pd->mtx); |
