From 5fb832e06fd4ded6ccc45f943837fd374a9cea7a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 18 Jul 2017 19:52:08 -0700 Subject: Fixes most of the raaces in posix; but at least one remains outstanding. Apparently there are circumstances when a pipedesc may get orphaned form the pollq. This triggers an assertion failure when it occurs. I am still trying to understand how this can occur. Stay tuned. --- src/platform/posix/posix_epdesc.c | 127 +++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 70 deletions(-) (limited to 'src/platform/posix/posix_epdesc.c') diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 8cae2565..b89af982 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -33,11 +33,10 @@ #endif struct nni_posix_epdesc { - int fd; + nni_posix_pollq_node node; nni_list connectq; nni_list acceptq; - nni_posix_pollq_node node; - nni_posix_pollq * pq; + int closed; struct sockaddr_storage locaddr; struct sockaddr_storage remaddr; socklen_t loclen; @@ -53,6 +52,7 @@ nni_posix_epdesc_cancel(nni_aio *aio) nni_mtx_lock(&ed->mtx); nni_aio_list_remove(aio); + NNI_ASSERT(aio->a_pipe == NULL); nni_mtx_unlock(&ed->mtx); } @@ -60,20 +60,17 @@ static void nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) { nni_posix_epdesc * ed = aio->a_prov_data; - nni_posix_pipedesc *pd; + nni_posix_pipedesc *pd = NULL; // acceptq or connectq. nni_aio_list_remove(aio); if (rv == 0) { - rv = nni_posix_pipedesc_init(&pd, newfd); - if (rv != 0) { + if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) { (void) close(newfd); - } else { - aio->a_pipe = pd; } } - if ((nni_aio_finish(aio, rv, 0) != 0) && (rv == 0)) { + if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) { nni_posix_pipedesc_fini(pd); } } @@ -94,14 +91,15 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) while ((aio = nni_list_first(&ed->connectq)) != NULL) { rv = -1; sz = sizeof(rv); - if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < + 0) { rv = errno; } switch (rv) { case 0: // Success! - nni_posix_epdesc_finish(aio, 0, ed->fd); - ed->fd = -1; + nni_posix_epdesc_finish(aio, 0, ed->node.fd); + ed->node.fd = -1; continue; case EINPROGRESS: @@ -113,8 +111,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) rv = ECONNREFUSED; } nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); - close(ed->fd); - ed->fd = -1; + (void) close(ed->node.fd); + ed->node.fd = -1; continue; } } @@ -134,12 +132,12 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) // do getpeername(). #ifdef NNG_USE_ACCEPT4 - newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC); + newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC); if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(ed->fd, NULL, NULL); + newfd = accept(ed->node.fd, NULL, NULL); } #else - newfd = accept(ed->fd, NULL, NULL); + newfd = accept(ed->node.fd, NULL, NULL); #endif if (newfd >= 0) { @@ -176,7 +174,7 @@ nni_posix_epdesc_doerror(nni_posix_epdesc *ed) int rv = 1; socklen_t sz = sizeof(rv); - if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { rv = errno; } if (rv == 0) { @@ -198,27 +196,30 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) nni_aio * aio; struct sockaddr_un *sun; - if (ed->fd != -1) { - (void) shutdown(ed->fd, SHUT_RDWR); - sun = (void *) &ed->locaddr; - if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { - (void) unlink(sun->sun_path); - } - (void) close(ed->fd); - ed->fd = -1; - } + ed->closed = 1; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } while ((aio = nni_list_first(&ed->connectq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } + + if (ed->node.fd != -1) { + (void) shutdown(ed->node.fd, SHUT_RDWR); + sun = (void *) &ed->locaddr; + if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { + (void) unlink(sun->sun_path); + } + (void) close(ed->node.fd); + ed->node.fd = -1; + } } static void nni_posix_epdesc_cb(void *arg) { nni_posix_epdesc *ed = arg; + int events; nni_mtx_lock(&ed->mtx); @@ -234,22 +235,22 @@ nni_posix_epdesc_cb(void *arg) if (ed->node.revents & POLLNVAL) { nni_posix_epdesc_doclose(ed); } - ed->node.revents = 0; - ed->node.events = 0; + events = 0; if (!nni_list_empty(&ed->connectq)) { - ed->node.events |= POLLOUT; + events |= POLLOUT; } if (!nni_list_empty(&ed->acceptq)) { - ed->node.events |= POLLIN; + events |= POLLIN; } + nni_posix_pollq_arm(&ed->node, events); nni_mtx_unlock(&ed->mtx); } void nni_posix_epdesc_close(nni_posix_epdesc *ed) { - nni_posix_pollq_cancel(ed->pq, &ed->node); + nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT); nni_mtx_lock(&ed->mtx); nni_posix_epdesc_doclose(ed); @@ -343,7 +344,6 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) (void) fcntl(fd, F_SETFL, O_NONBLOCK); - ed->fd = fd; ed->node.fd = fd; nni_mtx_unlock(&ed->mtx); return (0); @@ -365,22 +365,14 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) return; } - if (ed->fd < 0) { + if (ed->closed) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&ed->mtx); return; } nni_aio_list_append(&ed->acceptq, aio); - if ((ed->node.events & POLLIN) == 0) { - ed->node.events |= POLLIN; - rv = nni_posix_pollq_submit(ed->pq, &ed->node); - if (rv != 0) { - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&ed->mtx); - return; - } - } + nni_posix_pollq_arm(&ed->node, POLLIN); nni_mtx_unlock(&ed->mtx); } @@ -389,6 +381,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) { // NB: We assume that the FD is already set to nonblocking mode. int rv; + int fd; nni_mtx_lock(&ed->mtx); // If we can't start, it means that the AIO was stopped. @@ -397,35 +390,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) return; } - ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); - if (ed->fd < 0) { + fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); + if (fd < 0) { nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); return; } - ed->node.fd = ed->fd; // Possibly bind. if (ed->loclen != 0) { - rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen); + rv = bind(fd, (void *) &ed->locaddr, ed->loclen); if (rv != 0) { - (void) close(ed->fd); - ed->fd = -1; + (void) close(fd); nni_posix_epdesc_finish(aio, rv, 0); nni_mtx_unlock(&ed->mtx); return; } } - (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); - rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + rv = connect(fd, (void *) &ed->remaddr, ed->remlen); if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. - nni_posix_epdesc_finish(aio, 0, ed->fd); - ed->fd = -1; + nni_posix_epdesc_finish(aio, 0, fd); nni_mtx_unlock(&ed->mtx); return; } @@ -435,26 +426,16 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno == ENOENT) { errno = ECONNREFUSED; } - (void) close(ed->fd); - ed->fd = -1; + (void) close(fd); nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); nni_mtx_unlock(&ed->mtx); return; } // We have to submit to the pollq, because the connection is pending. + ed->node.fd = fd; nni_aio_list_append(&ed->connectq, aio); - if ((ed->node.events & POLLOUT) == 0) { - ed->node.events |= POLLOUT; - rv = nni_posix_pollq_submit(ed->pq, &ed->node); - if (rv != 0) { - (void) close(ed->fd); - ed->fd = -1; - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&ed->mtx); - return; - } - } + nni_posix_pollq_arm(&ed->node, POLLOUT); nni_mtx_unlock(&ed->mtx); } @@ -462,6 +443,7 @@ int nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) { nni_posix_epdesc *ed; + nni_posix_pollq * pq; int rv; if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { @@ -478,8 +460,6 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) // one. For now we just have a global pollq. Note that by tying // the ed to a single pollq we may get some kind of cache warmth. - ed->pq = nni_posix_pollq_get((int) nni_random()); - ed->fd = -1; ed->node.index = 0; ed->node.cb = nni_posix_epdesc_cb; ed->node.data = ed; @@ -488,6 +468,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); + pq = nni_posix_pollq_get(nni_random() % 0xffff); + if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) { + nni_mtx_fini(&ed->mtx); + NNI_FREE_STRUCT(ed); + return (rv); + } *edp = ed; return (0); } @@ -525,9 +511,10 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - if (ed->fd >= 0) { - (void) close(ed->fd); + if (ed->node.fd >= 0) { + (void) close(ed->node.fd); } + nni_posix_pollq_remove(&ed->node); nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } -- cgit v1.2.3-70-g09d2