diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-18 19:52:08 -0700 |
| commit | 5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch) | |
| tree | 41c306c297911d740e92f38b98685207f77758c6 /src | |
| parent | 3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff) | |
| download | nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2 nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip | |
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the
pollq. This triggers an assertion failure when it occurs. I am still
trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 26 | ||||
| -rw-r--r-- | src/core/aio.h | 1 | ||||
| -rw-r--r-- | src/core/endpt.c | 12 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 127 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 120 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 144 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 9 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 3 |
9 files changed, 239 insertions, 208 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index f9927e9d..238522b0 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -228,6 +228,32 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) return (0); } +int +nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe) +{ + nni_mtx_lock(&aio->a_lk); + if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) { + // Operation already done (canceled or timed out?) + nni_mtx_unlock(&aio->a_lk); + return (NNG_ESTATE); + } + aio->a_flags |= NNI_AIO_DONE; + aio->a_result = result; + aio->a_count = 0; + aio->a_prov_cancel = NULL; + aio->a_prov_data = NULL; + aio->a_pipe = pipe; + + // This is guaranteed to just be a list operation at this point, + // because done wasn't set. + nni_aio_expire_remove(aio); + aio->a_expire = NNI_TIME_NEVER; + + nni_taskq_dispatch(NULL, &aio->a_tqe); + nni_mtx_unlock(&aio->a_lk); + return (0); +} + void nni_aio_list_init(nni_list *list) { diff --git a/src/core/aio.h b/src/core/aio.h index 4f190aa1..ad5b560e 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -110,6 +110,7 @@ extern int nni_aio_list_active(nni_aio *); // prevent resources (new pipes for example) from accidentally leaking // during close operations. extern int nni_aio_finish(nni_aio *, int, size_t); +extern int nni_aio_finish_pipe(nni_aio *, int, void *); // nni_aio_cancel is used to cancel an operation. Any pending I/O or // timeouts are canceled if possible, and the callback will be returned diff --git a/src/core/endpt.c b/src/core/endpt.c index 8048de2b..596329ab 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -59,16 +59,22 @@ nni_ep_destroy(nni_ep *ep) if (ep == NULL) { return; } + + // Remove us form the table so we cannot be found. + if (ep->ep_id != 0) { + nni_idhash_remove(nni_eps, ep->ep_id); + } + nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); nni_aio_fini(&ep->ep_con_syn); nni_aio_fini(&ep->ep_backoff); + + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } - if (ep->ep_id != 0) { - nni_idhash_remove(nni_eps, ep->ep_id); - } + nni_mtx_unlock(&ep->ep_mtx); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 8cae2565..b89af982 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -33,11 +33,10 @@ #endif struct nni_posix_epdesc { - int fd; + nni_posix_pollq_node node; nni_list connectq; nni_list acceptq; - nni_posix_pollq_node node; - nni_posix_pollq * pq; + int closed; struct sockaddr_storage locaddr; struct sockaddr_storage remaddr; socklen_t loclen; @@ -53,6 +52,7 @@ nni_posix_epdesc_cancel(nni_aio *aio) nni_mtx_lock(&ed->mtx); nni_aio_list_remove(aio); + NNI_ASSERT(aio->a_pipe == NULL); nni_mtx_unlock(&ed->mtx); } @@ -60,20 +60,17 @@ static void nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) { nni_posix_epdesc * ed = aio->a_prov_data; - nni_posix_pipedesc *pd; + nni_posix_pipedesc *pd = NULL; // acceptq or connectq. nni_aio_list_remove(aio); if (rv == 0) { - rv = nni_posix_pipedesc_init(&pd, newfd); - if (rv != 0) { + if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) { (void) close(newfd); - } else { - aio->a_pipe = pd; } } - if ((nni_aio_finish(aio, rv, 0) != 0) && (rv == 0)) { + if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) { nni_posix_pipedesc_fini(pd); } } @@ -94,14 +91,15 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) while ((aio = nni_list_first(&ed->connectq)) != NULL) { rv = -1; sz = sizeof(rv); - if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < + 0) { rv = errno; } switch (rv) { case 0: // Success! - nni_posix_epdesc_finish(aio, 0, ed->fd); - ed->fd = -1; + nni_posix_epdesc_finish(aio, 0, ed->node.fd); + ed->node.fd = -1; continue; case EINPROGRESS: @@ -113,8 +111,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) rv = ECONNREFUSED; } nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); - close(ed->fd); - ed->fd = -1; + (void) close(ed->node.fd); + ed->node.fd = -1; continue; } } @@ -134,12 +132,12 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) // do getpeername(). #ifdef NNG_USE_ACCEPT4 - newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC); + newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC); if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(ed->fd, NULL, NULL); + newfd = accept(ed->node.fd, NULL, NULL); } #else - newfd = accept(ed->fd, NULL, NULL); + newfd = accept(ed->node.fd, NULL, NULL); #endif if (newfd >= 0) { @@ -176,7 +174,7 @@ nni_posix_epdesc_doerror(nni_posix_epdesc *ed) int rv = 1; socklen_t sz = sizeof(rv); - if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { rv = errno; } if (rv == 0) { @@ -198,27 +196,30 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) nni_aio * aio; struct sockaddr_un *sun; - if (ed->fd != -1) { - (void) shutdown(ed->fd, SHUT_RDWR); - sun = (void *) &ed->locaddr; - if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { - (void) unlink(sun->sun_path); - } - (void) close(ed->fd); - ed->fd = -1; - } + ed->closed = 1; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } while ((aio = nni_list_first(&ed->connectq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } + + if (ed->node.fd != -1) { + (void) shutdown(ed->node.fd, SHUT_RDWR); + sun = (void *) &ed->locaddr; + if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { + (void) unlink(sun->sun_path); + } + (void) close(ed->node.fd); + ed->node.fd = -1; + } } static void nni_posix_epdesc_cb(void *arg) { nni_posix_epdesc *ed = arg; + int events; nni_mtx_lock(&ed->mtx); @@ -234,22 +235,22 @@ nni_posix_epdesc_cb(void *arg) if (ed->node.revents & POLLNVAL) { nni_posix_epdesc_doclose(ed); } - ed->node.revents = 0; - ed->node.events = 0; + events = 0; if (!nni_list_empty(&ed->connectq)) { - ed->node.events |= POLLOUT; + events |= POLLOUT; } if (!nni_list_empty(&ed->acceptq)) { - ed->node.events |= POLLIN; + events |= POLLIN; } + nni_posix_pollq_arm(&ed->node, events); nni_mtx_unlock(&ed->mtx); } void nni_posix_epdesc_close(nni_posix_epdesc *ed) { - nni_posix_pollq_cancel(ed->pq, &ed->node); + nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT); nni_mtx_lock(&ed->mtx); nni_posix_epdesc_doclose(ed); @@ -343,7 +344,6 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) (void) fcntl(fd, F_SETFL, O_NONBLOCK); - ed->fd = fd; ed->node.fd = fd; nni_mtx_unlock(&ed->mtx); return (0); @@ -365,22 +365,14 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) return; } - if (ed->fd < 0) { + if (ed->closed) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); nni_mtx_unlock(&ed->mtx); return; } nni_aio_list_append(&ed->acceptq, aio); - if ((ed->node.events & POLLIN) == 0) { - ed->node.events |= POLLIN; - rv = nni_posix_pollq_submit(ed->pq, &ed->node); - if (rv != 0) { - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&ed->mtx); - return; - } - } + nni_posix_pollq_arm(&ed->node, POLLIN); nni_mtx_unlock(&ed->mtx); } @@ -389,6 +381,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) { // NB: We assume that the FD is already set to nonblocking mode. int rv; + int fd; nni_mtx_lock(&ed->mtx); // If we can't start, it means that the AIO was stopped. @@ -397,35 +390,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) return; } - ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); - if (ed->fd < 0) { + fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); + if (fd < 0) { nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); return; } - ed->node.fd = ed->fd; // Possibly bind. if (ed->loclen != 0) { - rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen); + rv = bind(fd, (void *) &ed->locaddr, ed->loclen); if (rv != 0) { - (void) close(ed->fd); - ed->fd = -1; + (void) close(fd); nni_posix_epdesc_finish(aio, rv, 0); nni_mtx_unlock(&ed->mtx); return; } } - (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); - rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + rv = connect(fd, (void *) &ed->remaddr, ed->remlen); if (rv == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. - nni_posix_epdesc_finish(aio, 0, ed->fd); - ed->fd = -1; + nni_posix_epdesc_finish(aio, 0, fd); nni_mtx_unlock(&ed->mtx); return; } @@ -435,26 +426,16 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) if (errno == ENOENT) { errno = ECONNREFUSED; } - (void) close(ed->fd); - ed->fd = -1; + (void) close(fd); nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); nni_mtx_unlock(&ed->mtx); return; } // We have to submit to the pollq, because the connection is pending. + ed->node.fd = fd; nni_aio_list_append(&ed->connectq, aio); - if ((ed->node.events & POLLOUT) == 0) { - ed->node.events |= POLLOUT; - rv = nni_posix_pollq_submit(ed->pq, &ed->node); - if (rv != 0) { - (void) close(ed->fd); - ed->fd = -1; - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&ed->mtx); - return; - } - } + nni_posix_pollq_arm(&ed->node, POLLOUT); nni_mtx_unlock(&ed->mtx); } @@ -462,6 +443,7 @@ int nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) { nni_posix_epdesc *ed; + nni_posix_pollq * pq; int rv; if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { @@ -478,8 +460,6 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) // one. For now we just have a global pollq. Note that by tying // the ed to a single pollq we may get some kind of cache warmth. - ed->pq = nni_posix_pollq_get((int) nni_random()); - ed->fd = -1; ed->node.index = 0; ed->node.cb = nni_posix_epdesc_cb; ed->node.data = ed; @@ -488,6 +468,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); + pq = nni_posix_pollq_get(nni_random() % 0xffff); + if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) { + nni_mtx_fini(&ed->mtx); + NNI_FREE_STRUCT(ed); + return (rv); + } *edp = ed; return (0); } @@ -525,9 +511,10 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - if (ed->fd >= 0) { - (void) close(ed->fd); + if (ed->node.fd >= 0) { + (void) close(ed->node.fd); } + nni_posix_pollq_remove(&ed->node); nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index faab91ad..4e61c2c4 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -27,11 +27,10 @@ // file descriptor for TCP socket, etc.) This contains the list of pending // aios for that underlying socket, as well as the socket itself. struct nni_posix_pipedesc { - nni_posix_pollq * pq; - int fd; + nni_posix_pollq_node node; nni_list readq; nni_list writeq; - nni_posix_pollq_node node; + int closed; nni_mtx mtx; }; @@ -39,7 +38,25 @@ static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { nni_aio_list_remove(aio); - nni_aio_finish(aio, rv, aio->a_count); + (void) nni_aio_finish(aio, rv, aio->a_count); +} + +static void +nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) +{ + nni_aio *aio; + + pd->closed = 1; + if (pd->node.fd != -1) { + // Let any peer know we are closing. + (void) shutdown(pd->node.fd, SHUT_RDWR); + } + while ((aio = nni_list_first(&pd->readq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&pd->writeq)) != NULL) { + nni_posix_pipedesc_finish(aio, NNG_ECLOSED); + } } static void @@ -60,7 +77,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) iovp = &iovec[0]; rv = 0; - n = writev(pd->fd, iovp, aio->a_niov); + n = writev(pd->node.fd, iovp, aio->a_niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done @@ -70,6 +87,7 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) rv = nni_plat_errno(errno); nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_doclose(pd); return; } @@ -121,7 +139,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) iovp = &iovec[0]; rv = 0; - n = readv(pd->fd, iovp, aio->a_niov); + n = readv(pd->node.fd, iovp, aio->a_niov); if (n < 0) { if ((errno == EAGAIN) || (errno == EINTR)) { // Can't write more right now. We're done @@ -131,6 +149,7 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) rv = nni_plat_errno(errno); nni_posix_pipedesc_finish(aio, rv); + nni_posix_pipedesc_doclose(pd); return; } @@ -171,28 +190,10 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) -{ - nni_aio *aio; - - if (pd->fd != -1) { - // Let any peer know we are closing. - (void) shutdown(pd->fd, SHUT_RDWR); - close(pd->fd); - pd->fd = -1; - } - while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } -} - -static void nni_posix_pipedesc_cb(void *arg) { - nni_posix_pipedesc *pd = arg; + nni_posix_pipedesc *pd = arg; + int events = 0; nni_mtx_lock(&pd->mtx); if (pd->node.revents & POLLIN) { @@ -203,21 +204,16 @@ nni_posix_pipedesc_cb(void *arg) } if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_pipedesc_doclose(pd); - } - - pd->node.revents = 0; - pd->node.events = 0; - - if (!nni_list_empty(&pd->writeq)) { - pd->node.events |= POLLOUT; - } - if (!nni_list_empty(&pd->readq)) { - pd->node.events |= POLLIN; - } - - // If we still have uncompleted operations, resubmit us. - if (pd->node.events != 0) { - nni_posix_pollq_submit(pd->pq, &pd->node); + } else { + if (!nni_list_empty(&pd->writeq)) { + events |= POLLOUT; + } + if (!nni_list_empty(&pd->readq)) { + events |= POLLIN; + } + if (events) { + nni_posix_pollq_arm(&pd->node, events); + } } nni_mtx_unlock(&pd->mtx); } @@ -225,7 +221,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_cancel(pd->pq, &pd->node); + nni_posix_pollq_disarm(&pd->node, POLLIN | POLLOUT); nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); @@ -252,22 +248,14 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) nni_mtx_unlock(&pd->mtx); return; } - if (pd->fd < 0) { + if (pd->closed) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); return; } nni_aio_list_append(&pd->readq, aio); - if ((pd->node.events & POLLIN) == 0) { - pd->node.events |= POLLIN; - rv = nni_posix_pollq_submit(pd->pq, &pd->node); - if (rv != 0) { - nni_posix_pipedesc_finish(aio, rv); - nni_mtx_unlock(&pd->mtx); - return; - } - } + nni_posix_pollq_arm(&pd->node, POLLIN); nni_mtx_unlock(&pd->mtx); } @@ -281,22 +269,14 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) nni_mtx_unlock(&pd->mtx); return; } - if (pd->fd < 0) { + if (pd->closed < 0) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); nni_mtx_unlock(&pd->mtx); return; } nni_aio_list_append(&pd->writeq, aio); - if ((pd->node.events & POLLOUT) == 0) { - pd->node.events |= POLLOUT; - rv = nni_posix_pollq_submit(pd->pq, &pd->node); - if (rv != 0) { - nni_posix_pipedesc_finish(aio, rv); - nni_mtx_unlock(&pd->mtx); - return; - } - } + nni_posix_pollq_arm(&pd->node, POLLOUT); nni_mtx_unlock(&pd->mtx); } @@ -309,7 +289,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { return (NNG_ENOMEM); } - memset(pd, 0, sizeof(*pd)); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose @@ -320,17 +299,22 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) NNI_FREE_STRUCT(pd); return (rv); } - pd->pq = nni_posix_pollq_get(fd); - pd->fd = fd; + pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; pd->node.data = pd; - (void) fcntl(pd->fd, F_SETFL, O_NONBLOCK); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); + rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node); + if (rv != 0) { + nni_mtx_fini(&pd->mtx); + NNI_FREE_STRUCT(pd); + return (rv); + } *pdp = pd; return (0); } @@ -340,6 +324,10 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); + nni_posix_pollq_remove(&pd->node); + if (pd->node.fd >= 0) { + (void) close(pd->node.fd); + } nni_mtx_fini(&pd->mtx); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 9fa7c92e..acffb975 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -32,9 +32,8 @@ struct nni_posix_pollq_node { int fd; // file descriptor to poll int events; // events to watch for int revents; // events received - nni_taskq_ent task; - void * data; // user data - nni_cb cb; // user callback on event + void * data; // user data + nni_cb cb; // user callback on event }; extern nni_posix_pollq *nni_posix_pollq_get(int); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d378c45a..1bc814bf 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -32,20 +32,21 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - struct pollfd *fds; - int nfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - int close; // request for worker to exit - int started; - nni_thr thr; // worker thread - nni_list armed; // armed nodes - nni_list idle; // idle nodes - int nnodes; // num of nodes in nodes list - int inpoll; // poller asleep in poll - + nni_mtx mtx; + nni_cv cv; + struct pollfd * fds; + int nfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + int close; // request for worker to exit + int started; + nni_thr thr; // worker thread + nni_list polled; // polled nodes + nni_list armed; // armed nodes + nni_list idle; // idle nodes + int nnodes; // num of nodes in nodes list + int inpoll; // poller asleep in poll + nni_posix_pollq_node *wait; // cancel waiting on this nni_posix_pollq_node *active; // active node (in callback) }; @@ -78,7 +79,7 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq * pollq = arg; - nni_posix_pollq_node *node, *nextnode; + nni_posix_pollq_node *node; nni_mtx_lock(&pollq->mtx); for (;;) { @@ -101,7 +102,9 @@ nni_posix_poll_thr(void *arg) nfds++; // Set up the poll list. - NNI_LIST_FOREACH (&pollq->armed, node) { + while ((node = nni_list_first(&pollq->armed)) != NULL) { + nni_list_remove(&pollq->armed, node); + nni_list_append(&pollq->polled, node); fds[nfds].fd = node->fd; fds[nfds].events = node->events; fds[nfds].revents = 0; @@ -131,48 +134,53 @@ nni_posix_poll_thr(void *arg) nni_plat_pipe_clear(pollq->wakerfd); } - // Now we iterate through all the nodes. Note that one - // may have been added or removed. New pipedescs will have - // their index set to -1. Removed ones will just be absent. - // Note that we may remove the pipedesc from the list, so we - // have to use a custom iterator. - nextnode = nni_list_first(&pollq->armed); - while ((node = nextnode) != NULL) { - int index; - - // Save the next node, so that we can remove this - // one if needed. - nextnode = nni_list_next(&pollq->armed, node); - - // If index is less than 1, then we have just added - // this and there is no FD for it in the pollfds. - if ((index = node->index) < 1) { - continue; - } - // Was there any activity? + while ((node = nni_list_first(&pollq->polled)) != NULL) { + int index = node->index; + + // We remove ourselves from the polled list, and + // then put it on either the idle or armed list + // depending on whether it remains armed. + node->index = 0; + nni_list_remove(&pollq->polled, node); + NNI_ASSERT(index > 0); if (fds[index].revents == 0) { + // If still watching for events, return it + // to the armed list. + if (node->events) { + nni_list_append(&pollq->armed, node); + } else { + nni_list_append(&pollq->idle, node); + } continue; } - // Clear the index for the next time around. - node->index = 0; + // We are calling the callback, so disarm + // all events; the node can rearm them in its + // callback. node->revents = fds[index].revents; - - // Execute callbacks. Note that these occur with - // the lock held. - if (node->cb != NULL) { - node->cb(node->data); + node->events &= ~node->revents; + if (node->events == 0) { + nni_list_append(&pollq->idle, node); } else { - // No further events for you! - node->events = 0; + nni_list_append(&pollq->armed, node); } - // Callback should clear events. If none were - // rearmed, then move to the idle list so we won't - // keep looking at it. - if (node->events == 0) { - nni_list_remove(&pollq->armed, node); - nni_list_append(&pollq->idle, node); + // Save the active node; we can notice this way + // when it is busy, and avoid freeing it until + // we are sure that it is not in use. + pollq->active = node; + + // Execute the callback -- without locks held. + nni_mtx_unlock(&pollq->mtx); + node->cb(node->data); + nni_mtx_lock(&pollq->mtx); + + // We finished with this node. If something + // was blocked waiting for that, wake it up. + pollq->active = NULL; + if (pollq->wait == node) { + pollq->wait = NULL; + nni_cv_wake(&pollq->cv); } } } @@ -188,6 +196,10 @@ nni_posix_pollq_add_cb(nni_posix_pollq *pq, nni_posix_pollq_node *node) int rv; NNI_ASSERT(!nni_list_node_active(&node->node)); + if (pq->close) { + // This shouldn't happen! + return (NNG_ECLOSED); + } node->pq = pq; if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) { return (rv); @@ -221,9 +233,17 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node) return; } nni_mtx_lock(&pq->mtx); - NNI_ASSERT(nni_list_node_active(&node->node)); - nni_list_node_remove(&node->node); - pq->nnodes--; + while (pq->active == node) { + pq->wait = node; + nni_cv_wait(&pq->cv); + } + if (nni_list_node_active(&node->node)) { + nni_list_node_remove(&node->node); + pq->nnodes--; + } + if (pq->close) { + nni_cv_wake(&pq->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -241,7 +261,10 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) oevents = node->events; node->events |= events; - if ((oevents == 0) && (events != 0)) { + // We move this to the armed list if its not armed, or already + // on the polled list. The polled list would be the case where + // the index is set to a positive value. + if ((oevents == 0) && (events != 0) && (node->index < 1)) { if (nni_list_node_active(&node->node)) { nni_list_node_remove(&node->node); } @@ -289,15 +312,17 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) pq->close = 1; pq->started = 0; nni_plat_pipe_raise(pq->wakewfd); - - // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_empty(&pq->armed)); - NNI_ASSERT(nni_list_empty(&pq->idle)); - NNI_ASSERT(pq->nnodes == 0); nni_mtx_unlock(&pq->mtx); } nni_thr_fini(&pq->thr); + + // All pipes should have been closed before this is called. + NNI_ASSERT(nni_list_empty(&pq->polled)); + NNI_ASSERT(nni_list_empty(&pq->armed)); + NNI_ASSERT(nni_list_empty(&pq->idle)); + NNI_ASSERT(pq->nnodes == 0); + if (pq->wakewfd >= 0) { nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); pq->wakewfd = pq->wakerfd = -1; @@ -313,6 +338,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq) { int rv; + NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node); NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node); NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node); pq->wakewfd = -1; diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index 85e3dfca..bd9ce26d 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -333,9 +333,8 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - aio->a_pipe = pipe; // What if the pipe is already finished? - if (nni_aio_finish(aio, 0, 0) != 0) { + if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { nni_plat_ipc_pipe_fini(pipe); } } @@ -391,7 +390,6 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) void nni_plat_ipc_ep_accept(nni_plat_ipc_ep *ep, nni_aio *aio) { - aio->a_pipe = NULL; nni_win_event_submit(&ep->acc_ev, aio); } @@ -470,8 +468,9 @@ nni_win_ipc_conn_thr(void *arg) ((rv = nni_win_iocp_register(p)) != 0)) { goto fail; } - aio->a_pipe = pipe; - nni_aio_finish(aio, 0, 0); + if (rv = nni_aio_finish_pipe(aio, 0, pipe) != 0) { + nni_plat_ipc_pipe_fini(pipe); + } continue; fail: diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 59fc0986..633dd256 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -544,8 +544,7 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - aio->a_pipe = pipe; - if (nni_aio_finish(aio, 0, 0) != 0) { + if (nni_aio_finish_pipe(aio, 0, pipe) != 0) { nni_plat_tcp_pipe_fini(pipe); } } |
