diff options
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 476 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 212 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 38 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 445 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 432 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 532 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_port.c | 288 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 136 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 87 |
11 files changed, 1323 insertions, 1332 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 15d91db2..08ae99ca 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -18,11 +18,12 @@ // one of several possible different backends. #include "core/nng_impl.h" +#include "posix_pollq.h" typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; -extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int); +extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, nni_posix_pfd *); extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *); extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *); diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 7431dedf..0065806d 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -39,7 +39,7 @@ #endif struct nni_posix_epdesc { - nni_posix_pollq_node node; + nni_posix_pfd * pfd; nni_list connectq; nni_list acceptq; bool closed; @@ -53,10 +53,14 @@ struct nni_posix_epdesc { nni_mtx mtx; }; +static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *); +static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *); + static void -nni_posix_epdesc_cancel(nni_aio *aio, int rv) +nni_epdesc_cancel(nni_aio *aio, int rv) { - nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); + nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); + nni_posix_pfd * pfd = NULL; NNI_ASSERT(rv != 0); nni_mtx_lock(&ed->mtx); @@ -64,200 +68,136 @@ nni_posix_epdesc_cancel(nni_aio *aio, int rv) nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } + if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) && + ((pfd = ed->pfd) != NULL)) { + nni_posix_pfd_close(pfd); + } nni_mtx_unlock(&ed->mtx); } static void -nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) +nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd) { nni_posix_pipedesc *pd = NULL; // acceptq or connectq. nni_aio_list_remove(aio); - if (rv == 0) { - if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) { - (void) close(newfd); - } - } if (rv != 0) { + NNI_ASSERT(newpfd == NULL); nni_aio_finish_error(aio, rv); - } else { - nni_aio_set_output(aio, 0, pd); - nni_aio_finish(aio, 0, 0); + return; } -} - -static void -nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) -{ - nni_aio * aio; - socklen_t sz; - int rv; - - // Note that normally there will only be a single connect AIO... - // A socket that is here will have *initiated* with a connect() - // call, which returned EINPROGRESS. When the connection attempt - // is done, either way, the descriptor will be noted as writable. - // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual - // status of the connection attempt... - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - rv = -1; - sz = sizeof(rv); - if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < - 0) { - rv = errno; - } - switch (rv) { - case 0: - // Success! - nni_posix_pollq_remove(&ed->node); - nni_posix_epdesc_finish(aio, 0, ed->node.fd); - ed->node.fd = -1; - continue; - - case EINPROGRESS: - // Still in progress... keep trying - return; - default: - if (rv == ENOENT) { - rv = ECONNREFUSED; - } - nni_posix_pollq_remove(&ed->node); - nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); - (void) close(ed->node.fd); - ed->node.fd = -1; - continue; - } + NNI_ASSERT(newpfd != NULL); + if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) { + nni_posix_pfd_fini(newpfd); + nni_aio_finish_error(aio, rv); + return; } + nni_aio_set_output(aio, 0, pd); + nni_aio_finish(aio, 0, 0); } static void -nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) +nni_epdesc_doaccept(nni_posix_epdesc *ed) { nni_aio *aio; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - int newfd; + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + + fd = nni_posix_pfd_fd(ed->pfd); #ifdef NNG_USE_ACCEPT4 - newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC); + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(ed->node.fd, NULL, NULL); + newfd = accept(fd, NULL, NULL); } #else - newfd = accept(ed->node.fd, NULL, NULL); + newfd = accept(fd, NULL, NULL); #endif - - if (newfd >= 0) { - // successful connection request! - nni_posix_epdesc_finish(aio, 0, newfd); - continue; - } - - if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { - // Well, let's try later. Note that EWOULDBLOCK - // is required by standards, but some platforms may - // use EAGAIN. The values may be the same, so we - // can't use switch. - return; + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(ed->pfd, POLLIN); + if (rv != 0) { + nni_epdesc_finish(aio, rv, NULL); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + nni_epdesc_finish(aio, rv, NULL); + continue; + } } - if ((errno == ECONNABORTED) || (errno == ECONNRESET)) { - // Let's just eat this one. Perhaps it may be - // better to report it to the application, but we - // think most applications don't want to see this. - // Only someone with a packet trace is going to - // notice this. + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_epdesc_finish(aio, rv, NULL); continue; } - nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); - } -} - -static void -nni_posix_epdesc_doerror(nni_posix_epdesc *ed) -{ - nni_aio * aio; - int rv = 1; - socklen_t sz = sizeof(rv); - - if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - rv = errno; - } - if (rv == 0) { - return; - } - rv = nni_plat_errno(rv); - - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_posix_epdesc_finish(aio, rv, 0); - } - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_posix_epdesc_finish(aio, rv, 0); + nni_epdesc_finish(aio, 0, pfd); } } static void -nni_posix_epdesc_doclose(nni_posix_epdesc *ed) +nni_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; - int fd; ed->closed = true; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_epdesc_finish(aio, NNG_ECLOSED, 0); } while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_epdesc_finish(aio, NNG_ECLOSED, 0); } - nni_posix_pollq_remove(&ed->node); + if (ed->pfd != NULL) { - if ((fd = ed->node.fd) != -1) { + nni_posix_pfd_close(ed->pfd); + } + + // clean up stale UNIX socket when closing the server. + if ((ed->mode == NNI_EP_MODE_LISTEN) && (ed->loclen != 0) && + (ed->locaddr.ss_family == AF_UNIX)) { struct sockaddr_un *sun = (void *) &ed->locaddr; - ed->node.fd = -1; - (void) shutdown(fd, SHUT_RDWR); - (void) close(fd); - if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { - (void) unlink(sun->sun_path); - } + (void) unlink(sun->sun_path); } } static void -nni_posix_epdesc_cb(void *arg) +nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; - int events; nni_mtx_lock(&ed->mtx); - - if (ed->node.revents & POLLIN) { - nni_posix_epdesc_doaccept(ed); - } - if (ed->node.revents & POLLOUT) { - nni_posix_epdesc_doconnect(ed); - } - if (ed->node.revents & (POLLERR | POLLHUP)) { - nni_posix_epdesc_doerror(ed); - } - if (ed->node.revents & POLLNVAL) { - nni_posix_epdesc_doclose(ed); + if (events & POLLNVAL) { + nni_epdesc_doclose(ed); + nni_mtx_unlock(&ed->mtx); + return; } + NNI_ASSERT(pfd == ed->pfd); - events = 0; - if (!nni_list_empty(&ed->connectq)) { - events |= POLLOUT; - } - if (!nni_list_empty(&ed->acceptq)) { - events |= POLLIN; - } - if ((!ed->closed) && (events != 0)) { - nni_posix_pollq_arm(&ed->node, events); - } + // Anything else will turn up in accept. + nni_epdesc_doaccept(ed); nni_mtx_unlock(&ed->mtx); } @@ -265,7 +205,7 @@ void nni_posix_epdesc_close(nni_posix_epdesc *ed) { nni_mtx_lock(&ed->mtx); - nni_posix_epdesc_doclose(ed); + nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); } @@ -276,9 +216,23 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) struct sockaddr_storage *ss; int rv; int fd; + nni_posix_pfd * pfd; nni_mtx_lock(&ed->mtx); + if (ed->started) { + nni_mtx_unlock(&ed->mtx); + return (NNG_ESTATE); + } + if (ed->closed) { + nni_mtx_unlock(&ed->mtx); + return (NNG_ECLOSED); + } + if ((len = ed->loclen) == 0) { + nni_mtx_unlock(&ed->mtx); + return (NNG_EADDRINVAL); + } + ss = &ed->locaddr; len = ed->loclen; @@ -286,18 +240,17 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) nni_mtx_unlock(&ed->mtx); return (nni_plat_errno(errno)); } - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#ifdef SO_NOSIGPIPE - // Darwin lacks MSG_NOSIGNAL, but has a socket option. - int one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); -#endif + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&ed->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } if (bind(fd, (struct sockaddr *) ss, len) < 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - (void) close(fd); + nni_posix_pfd_fini(pfd); return (rv); } @@ -314,7 +267,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) if ((rv = chmod(sun->sun_path, perms)) != 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - close(fd); + nni_posix_pfd_fini(pfd); return (rv); } } @@ -324,27 +277,24 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) if (listen(fd, 128) != 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - (void) close(fd); + nni_posix_pfd_fini(pfd); return (rv); } - (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed); - ed->node.fd = fd; - if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { - (void) close(fd); - ed->node.fd = -1; - nni_mtx_unlock(&ed->mtx); - return (rv); - } + ed->pfd = pfd; ed->started = true; nni_mtx_unlock(&ed->mtx); + return (0); } void nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) { + int rv; + // Accept is simpler than the connect case. With accept we just // need to wait for the socket to be readable to indicate an incoming // connection is ready for us. There isn't anything else for us to @@ -354,14 +304,25 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) } nni_mtx_lock(&ed->mtx); + if (!ed->started) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } if (ed->closed) { nni_mtx_unlock(&ed->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&ed->acceptq, aio); - nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); - nni_posix_pollq_arm(&ed->node, POLLIN); + if (nni_list_first(&ed->acceptq) == aio) { + nni_epdesc_doaccept(ed); + } nni_mtx_unlock(&ed->mtx); } @@ -370,79 +331,171 @@ nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = -1; + + nni_mtx_lock(&ed->mtx); + if (ed->pfd != NULL) { + fd = nni_posix_pfd_fd(ed->pfd); + } + nni_mtx_unlock(&ed->mtx); - if (getsockname(ed->node.fd, (void *) &ss, &sslen) != 0) { + if (getsockname(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); } -void -nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +static void +nni_epdesc_connect_start(nni_posix_epdesc *ed) { - // NB: We assume that the FD is already set to nonblocking mode. - int rv; - int fd; + nni_posix_pfd *pfd; + int fd; + int rv; + nni_aio * aio; - if (nni_aio_begin(aio) != 0) { +loop: + if ((aio = nni_list_first(&ed->connectq)) == NULL) { return; } - nni_mtx_lock(&ed->mtx); + + NNI_ASSERT(ed->pfd == NULL); + if (ed->closed) { + nni_epdesc_finish(aio, NNG_ECLOSED, NULL); + goto loop; + } + ed->started = true; if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; + nni_epdesc_finish(aio, rv, NULL); + goto loop; } + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_epdesc_finish(aio, rv, NULL); + goto loop; + } // Possibly bind. if ((ed->loclen != 0) && (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) { rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - (void) close(fd); - nni_aio_finish_error(aio, rv); - return; + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; } - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. - ed->started = true; - nni_posix_epdesc_finish(aio, 0, fd); - nni_mtx_unlock(&ed->mtx); - return; + nni_epdesc_finish(aio, 0, pfd); + goto loop; } if (errno != EINPROGRESS) { // Some immediate failure occurred. if (errno == ENOENT) { // For UNIX domain sockets - errno = ECONNREFUSED; + rv = NNG_ECONNREFUSED; + } else { + rv = nni_plat_errno(errno); } - rv = nni_plat_errno(errno); + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; + } + nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed); + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; + } + ed->pfd = pfd; + // all done... wait for this to signal via callback +} + +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ed->mtx); + if (ed->closed) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); - (void) close(fd); nni_aio_finish_error(aio, rv); return; } - // We have to submit to the pollq, because the connection is pending. - ed->node.fd = fd; ed->started = true; - if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { - ed->node.fd = -1; + nni_list_append(&ed->connectq, aio); + if (nni_list_first(&ed->connectq) == aio) { + // If there was a stale pfd (probably from an aborted or + // canceled connect attempt), discard it so we start fresh. + if (ed->pfd != NULL) { + nni_posix_pfd_fini(ed->pfd); + ed->pfd = NULL; + } + nni_epdesc_connect_start(ed); + } + nni_mtx_unlock(&ed->mtx); +} + +static void +nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_posix_epdesc *ed = arg; + nni_aio * aio; + socklen_t sz; + int rv; + int fd; + + nni_mtx_lock(&ed->mtx); + if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) || + (pfd != ed->pfd)) { + // Spurious completion. Ignore it, but discard the PFD. + if (ed->pfd == pfd) { + ed->pfd = NULL; + } + nni_posix_pfd_fini(pfd); + nni_mtx_unlock(&ed->mtx); + return; + } + + fd = nni_posix_pfd_fd(pfd); + sz = sizeof(rv); + + if ((events & POLLNVAL) != 0) { + rv = EBADF; + + } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + + switch (rv) { + case 0: + // Good connect! + ed->pfd = NULL; + nni_epdesc_finish(aio, 0, pfd); + break; + case EINPROGRESS: // still connecting... come back later nni_mtx_unlock(&ed->mtx); - (void) close(fd); - nni_aio_finish_error(aio, rv); return; + default: + ed->pfd = NULL; + nni_epdesc_finish(aio, nni_plat_errno(rv), NULL); + nni_posix_pfd_fini(pfd); + break; } - nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); - nni_aio_list_append(&ed->connectq, aio); - nni_posix_pollq_arm(&ed->node, POLLOUT); + // Start another connect running, if any is waiting. + nni_epdesc_connect_start(ed); nni_mtx_unlock(&ed->mtx); } @@ -450,7 +503,6 @@ int nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) { nni_posix_epdesc *ed; - int rv; if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { return (NNG_ENOMEM); @@ -458,28 +510,14 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) nni_mtx_init(&ed->mtx); - // We could randomly choose a different pollq, or for efficiencies - // sake we could take a modulo of the file desc number to choose - // one. For now we just have a global pollq. Note that by tying - // the ed to a single pollq we may get some kind of cache warmth. - - ed->node.index = 0; - ed->node.cb = nni_posix_epdesc_cb; - ed->node.data = ed; - ed->node.fd = -1; - ed->closed = false; - ed->started = false; - ed->perms = 0; // zero means use default (no change) - ed->mode = mode; + ed->pfd = NULL; + ed->closed = false; + ed->started = false; + ed->perms = 0; // zero means use default (no change) + ed->mode = mode; nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); - - if ((rv = nni_posix_pollq_init(&ed->node)) != 0) { - nni_mtx_fini(&ed->mtx); - NNI_FREE_STRUCT(ed); - return (rv); - } *edp = ed; return (0); } @@ -532,14 +570,16 @@ nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - int fd; + nni_posix_pfd *pfd; + nni_mtx_lock(&ed->mtx); - if ((fd = ed->node.fd) != -1) { - (void) close(ed->node.fd); - nni_posix_epdesc_doclose(ed); - } + nni_epdesc_doclose(ed); + pfd = ed->pfd; nni_mtx_unlock(&ed->mtx); - nni_posix_pollq_fini(&ed->node); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index e7225395..b11036ea 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -40,11 +40,11 @@ // file descriptor for TCP socket, etc.) This contains the list of pending // aios for that underlying socket, as well as the socket itself. struct nni_posix_pipedesc { - nni_posix_pollq_node node; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; + nni_posix_pfd *pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; }; static void @@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) while ((aio = nni_list_first(&pd->writeq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); } - if (pd->node.fd != -1) { - // Let any peer know we are closing. - (void) shutdown(pd->node.fd, SHUT_RDWR); - } + nni_posix_pfd_close(pd->pfd); } static void nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->writeq)) != NULL) { unsigned i; @@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) hdr.msg_iovlen = niov; hdr.msg_iov = iovec; - n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } nni_aio_bump_count(aio, n); // We completed the entire operation on this aioq. + // (Sendmsg never returns a partial result.) nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another @@ -147,6 +158,12 @@ static void nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->readq)) != NULL) { unsigned i; @@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } } - n = readv(pd->node.fd, iovec, niov); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } if (n == 0) { @@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_cb(void *arg) +nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_pipedesc *pd = arg; nni_mtx_lock(&pd->mtx); - if (pd->node.revents & POLLIN) { + if (events & POLLIN) { nni_posix_pipedesc_doread(pd); } - if (pd->node.revents & POLLOUT) { + if (events & POLLOUT) { nni_posix_pipedesc_dowrite(pd); } - if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) { + if (events & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_pipedesc_doclose(pd); } else { - int events = 0; + events = 0; if (!nni_list_empty(&pd->writeq)) { events |= POLLOUT; } @@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg) events |= POLLIN; } if ((!pd->closed) && (events != 0)) { - nni_posix_pollq_arm(&pd->node, events); + nni_posix_pfd_arm(pfd, events); } } nni_mtx_unlock(&pd->mtx); @@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_remove(&pd->node); - + // NB: Events may still occur. nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); nni_mtx_unlock(&pd->mtx); @@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv) void nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->readq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); - // If we are only job on the list, go ahead and try to do an immediate - // transfer. This allows for faster completions in many cases. We - // also need not arm a list if it was already armed. + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. if (nni_list_first(&pd->readq) == aio) { nni_posix_pipedesc_doread(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->readq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLIN); + nni_posix_pfd_arm(pd->pfd, POLLIN); } } nni_mtx_unlock(&pd->mtx); @@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) void nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->writeq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); if (nni_list_first(&pd->writeq) == aio) { nni_posix_pipedesc_dowrite(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLOUT); + nni_posix_pfd_arm(pd->pfd, POLLOUT); } } nni_mtx_unlock(&pd->mtx); @@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getpeername(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getsockname(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -349,9 +384,9 @@ int nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay) { int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); @@ -361,61 +396,19 @@ int nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep) { int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); } int -nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) -{ - nni_posix_pipedesc *pd; - int rv; - - if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { - return (NNG_ENOMEM); - } - - // We could randomly choose a different pollq, or for efficiencies - // sake we could take a modulo of the file desc number to choose - // one. For now we just have a global pollq. Note that by tying - // the pd to a single pollq we may get some kind of cache warmth. - - pd->closed = false; - pd->node.fd = fd; - pd->node.cb = nni_posix_pipedesc_cb; - pd->node.data = pd; - - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - -#ifdef SO_NOSIGPIPE - // Darwin lacks MSG_NOSIGNAL, but has a socket option. - int one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); -#endif - - nni_mtx_init(&pd->mtx); - nni_aio_list_init(&pd->readq); - nni_aio_list_init(&pd->writeq); - - if (((rv = nni_posix_pollq_init(&pd->node)) != 0) || - ((rv = nni_posix_pollq_add(&pd->node)) != 0)) { - nni_mtx_fini(&pd->mtx); - NNI_FREE_STRUCT(pd); - return (rv); - } - *pdp = pd; - return (0); -} - -int nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid) { - int fd = pd->node.fd; + int fd = nni_posix_pfd_fd(pd->pfd); #if defined(NNG_HAVE_GETPEEREID) uid_t uid; gid_t gid; @@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, } *euid = xu.cr_uid; *egid = xu.cr_gid; - *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID... + *prid = (uint64_t) -1; // XXX: macOS has undocumented + // LOCAL_PEERPID... *znid = (uint64_t) -1; return (0); #else @@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, #endif } +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd) +{ + nni_posix_pipedesc *pd; + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + + pd->closed = false; + pd->pfd = pfd; + + nni_mtx_init(&pd->mtx); + nni_aio_list_init(&pd->readq); + nni_aio_list_init(&pd->writeq); + + nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd); + + *pdp = pd; + return (0); +} + void nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { - // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); - nni_posix_pollq_fini(&pd->node); - if (pd->node.fd >= 0) { - (void) close(pd->node.fd); - } - + nni_posix_pfd_fini(pd->pfd); + pd->pfd = NULL; nni_mtx_fini(&pd->mtx); NNI_FREE_STRUCT(pd); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 2c855da1..b9786330 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -22,32 +22,18 @@ #include "core/nng_impl.h" #include <poll.h> -typedef struct nni_posix_pollq_node nni_posix_pollq_node; -typedef struct nni_posix_pollq nni_posix_pollq; - -struct nni_posix_pollq_node { - nni_list_node node; // linkage into the pollq list - nni_posix_pollq *pq; // associated pollq - int index; // used by the poller impl - int armed; // used by the poller impl - int fd; // file descriptor to poll - int events; // events to watch for - int revents; // events received - void * data; // user data - nni_cb cb; // user callback on event - nni_mtx mx; - nni_cv cv; -}; - -extern nni_posix_pollq *nni_posix_pollq_get(int); -extern int nni_posix_pollq_sysinit(void); -extern void nni_posix_pollq_sysfini(void); - -extern int nni_posix_pollq_init(nni_posix_pollq_node *); -extern void nni_posix_pollq_fini(nni_posix_pollq_node *); -extern int nni_posix_pollq_add(nni_posix_pollq_node *); -extern void nni_posix_pollq_remove(nni_posix_pollq_node *); -extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int); +typedef struct nni_posix_pfd nni_posix_pfd; +typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, int, void *); + +extern int nni_posix_pollq_sysinit(void); +extern void nni_posix_pollq_sysfini(void); + +extern int nni_posix_pfd_init(nni_posix_pfd **, int); +extern void nni_posix_pfd_fini(nni_posix_pfd *); +extern int nni_posix_pfd_arm(nni_posix_pfd *, int); +extern int nni_posix_pfd_fd(nni_posix_pfd *); +extern void nni_posix_pfd_close(nni_posix_pfd *); +extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 0f6867da..a8d8693a 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -12,6 +12,7 @@ #ifdef NNG_HAVE_EPOLL #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ @@ -22,179 +23,215 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" +typedef struct nni_posix_pollq nni_posix_pollq; + +#ifndef EFD_CLOEXEC +#define EFD_CLOEXEC 0 +#endif +#ifndef EFD_NONBLOCK +#define EFD_NONBLOCK 0 +#endif + #define NNI_MAX_EPOLL_EVENTS 64 // flags we always want enabled as long as at least one event is active #define NNI_EPOLL_FLAGS (EPOLLONESHOT | EPOLLERR | EPOLLHUP) +// Locking strategy: +// +// The pollq mutex protects its own reapq, close state, and the close +// state of the individual pfds. It also protects the pfd cv, which is +// only signaled when the pfd is closed. This mutex is only acquired +// when shutting down the pollq, or closing a pfd. For normal hot-path +// operations we don't need it. +// +// The pfd mutex protects the pfd's own "closing" flag (test and set), +// the callback and arg, and its event mask. This mutex is used a lot, +// but it should be uncontended excepting possibly when closing. + // nni_posix_pollq is a work structure that manages state for the epoll-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int epfd; // epoll handle - int evfd; // event fd - bool close; // request for worker to exit - bool started; - nni_idhash * nodes; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int epfd; // epoll handle + int evfd; // event fd (to wake us for other stuff) + bool close; // request for worker to exit + nni_thr thr; // worker thread + nni_list reapq; +}; + +struct nni_posix_pfd { + nni_posix_pollq *pq; + nni_list_node node; + int fd; + nni_posix_pfd_cb cb; + void * arg; + bool closed; + bool closing; + bool reap; + int events; + nni_mtx mtx; + nni_cv cv; }; +// single global instance for now. +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { - int rv; + nni_posix_pfd * pfd; nni_posix_pollq * pq; struct epoll_event ev; - uint64_t id; - - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + int rv; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); - } + pq = &nni_posix_global_pollq; - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((rv = nni_idhash_alloc(pq->nodes, &id, node)) != 0) { - nni_mtx_unlock(&pq->mtx); - return (rv); + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } - node->index = (int) id; - node->pq = pq; - node->events = 0; + pfd->pq = pq; + pfd->fd = fd; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->events = 0; + pfd->closing = false; + pfd->closed = false; + + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pfd->node); // notifications disabled to begin with ev.events = 0; - ev.data.u64 = id; + ev.data.ptr = pfd; - rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, node->fd, &ev); - if (rv != 0) { + if ((rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev)) != 0) { rv = nni_plat_errno(errno); - nni_idhash_remove(pq->nodes, id); - node->index = 0; - node->pq = NULL; + nni_cv_fini(&pfd->cv); + NNI_FREE_STRUCT(pfd); + return (rv); } - nni_mtx_unlock(&pq->mtx); - return (rv); + *pfdp = pfd; + return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) { - int rv; - struct epoll_event ev; - - node->events = 0; - node->pq = NULL; - - ev.events = 0; - ev.data.u64 = (uint64_t) node->index; - - if (node->index != 0) { - // This deregisters the node. If the poller was blocked - // then this keeps it from coming back in to find us. - nni_idhash_remove(pq->nodes, (uint64_t) node->index); - } - - // NB: EPOLL_CTL_DEL actually *ignores* the event, but older Linux - // versions need it to be non-NULL. - rv = epoll_ctl(pq->epfd, EPOLL_CTL_DEL, node->fd, &ev); - if (rv != 0) { - NNI_ASSERT(errno == EBADF || errno == ENOENT); + nni_posix_pollq *pq = pfd->pq; + + // NB: We depend on epoll event flags being the same as their POLLIN + // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so + // forth. This turns out to be true both for Linux and the illumos + // epoll implementation. + + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + struct epoll_event ev; + pfd->events |= events; + events = pfd->events; + + ev.events = events | NNI_EPOLL_FLAGS; + ev.data.ptr = pfd; + + if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) { + int rv = nni_plat_errno(errno); + nni_mtx_unlock(&pfd->mtx); + return (rv); + } } + nni_mtx_unlock(&pfd->mtx); + return (0); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; - - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + return (pfd->fd); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) { - node->index = 0; - return (0); + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_close(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + nni_posix_pollq * pq = pfd->pq; + struct epoll_event ev; // Not actually used. + pfd->closing = true; + + (void) shutdown(pfd->fd, SHUT_RDWR); + (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); } - nni_mtx_unlock(&pq->mtx); + nni_mtx_unlock(&pfd->mtx); } void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +nni_posix_pfd_fini(nni_posix_pfd *pfd) { - int rv; - struct epoll_event ev; - nni_posix_pollq * pq = node->pq; + nni_posix_pollq *pq = pfd->pq; + + nni_posix_pfd_close(pfd); - NNI_ASSERT(pq != NULL); - if (events == 0) { - return; + // We have to synchronize with the pollq thread (unless we are + // on that thread!) + if (!nni_thr_is_self(&pq->thr)) { + + uint64_t one = 1; + + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pfd); + + // Wake the remote side. For now we assume this always + // succeeds. The only failure modes here occur when we + // have already excessively signaled this (2^64 times + // with no read!!), or when the evfd is closed, or some + // kernel bug occurs. Those errors would manifest as + // a hang waiting for the poller to reap the pfd in fini, + // if it were possible for them to occur. (Barring other + // bugs, it isn't.) + (void) write(pq->evfd, &one, sizeof(one)); + + while (!pfd->closed) { + nni_cv_wait(&pfd->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_lock(&pq->mtx); + // We're exclusive now. - node->events |= events; - ev.events = node->events | NNI_EPOLL_FLAGS; - ev.data.u64 = (uint64_t) node->index; + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} - rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev); - NNI_ASSERT(rv == 0); +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pfd; + nni_mtx_lock(&pq->mtx); + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pfd); + // Let fini know we're done with it, and it's safe to + // remove. + pfd->closed = true; + nni_cv_wake(&pfd->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -204,101 +241,74 @@ nni_posix_poll_thr(void *arg) nni_posix_pollq * pq = arg; struct epoll_event events[NNI_MAX_EPOLL_EVENTS]; - nni_mtx_lock(&pq->mtx); - - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - - nevents = - epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1); + for (;;) { + int n; + bool reap = false; - nni_mtx_lock(&pq->mtx); - - if (nevents <= 0) { - continue; + n = epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1); + if ((n < 0) && (errno == EBADF)) { + // Epoll fd closed, bail. + return; } // dispatch events - for (i = 0; i < nevents; ++i) { + for (int i = 0; i < n; ++i) { const struct epoll_event *ev; - nni_posix_pollq_node * node; ev = &events[i]; // If the waker pipe was signaled, read from it. - if ((ev->data.u64 == 0) && (ev->events & POLLIN)) { - int rv; + if ((ev->data.ptr == NULL) && (ev->events & POLLIN)) { uint64_t clear; - rv = read(pq->evfd, &clear, sizeof(clear)); - NNI_ASSERT(rv == sizeof(clear)); - continue; - } - - if (nni_idhash_find(pq->nodes, ev->data.u64, - (void **) &node) != 0) { - // node was removed while we were blocking - continue; + (void) read(pq->evfd, &clear, sizeof(clear)); + reap = true; + } else { + nni_posix_pfd * pfd = ev->data.ptr; + nni_posix_pfd_cb cb; + void * arg; + int events; + + events = ev->events & + (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP); + + nni_mtx_lock(&pfd->mtx); + pfd->events &= ~events; + cb = pfd->cb; + arg = pfd->arg; + nni_mtx_unlock(&pfd->mtx); + + // Execute the callback with lock released + if (cb != NULL) { + cb(pfd, events, arg); + } } + } - node->revents = ev->events & - (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); + if (reap) { + nni_posix_pollq_reap(pq); nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + if (pq->close) { + nni_mtx_unlock(&pq->mtx); + return; } + nni_mtx_unlock(&pq->mtx); } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - int rv; - uint64_t wakeval = 1; + uint64_t one = 1; - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - rv = write(pq->evfd, &wakeval, sizeof(wakeval)); - NNI_ASSERT(rv == sizeof(wakeval)); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); + nni_mtx_lock(&pq->mtx); + pq->close = true; + (void) write(pq->evfd, &one, sizeof(one)); + nni_mtx_unlock(&pq->mtx); - if (pq->evfd >= 0) { - close(pq->evfd); - pq->evfd = -1; - } + nni_thr_fini(&pq->thr); + close(pq->evfd); close(pq->epfd); - pq->epfd = -1; - - if (pq->nodes != NULL) { - nni_idhash_fini(pq->nodes); - } nni_mtx_fini(&pq->mtx); } @@ -308,22 +318,25 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq) { // add event fd so we can wake ourself on exit struct epoll_event ev; - int rv; + int fd; memset(&ev, 0, sizeof(ev)); - pq->evfd = eventfd(0, EFD_NONBLOCK); - if (pq->evfd == -1) { + if ((fd = eventfd(0, EFD_NONBLOCK)) < 0) { return (nni_plat_errno(errno)); } + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + // This is *NOT* one shot. We want to wake EVERY single time. ev.events = EPOLLIN; - ev.data.u64 = 0; + ev.data.ptr = 0; - rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pq->evfd, &ev); - if (rv != 0) { + if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { + (void) close(fd); return (nni_plat_errno(errno)); } + pq->evfd = fd; return (0); } @@ -332,40 +345,30 @@ nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; - if ((pq->epfd = epoll_create1(0)) < 0) { + if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) { return (nni_plat_errno(errno)); } - pq->evfd = -1; pq->close = false; + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); - if (((rv = nni_idhash_init(&pq->nodes)) != 0) || - ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || - ((rv = nni_posix_pollq_add_eventfd(pq)) != 0)) { - nni_posix_pollq_destroy(pq); + if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) { + (void) close(pq->epfd); + nni_mtx_fini(&pq->mtx); + return (rv); + } + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + (void) close(pq->epfd); + (void) close(pq->evfd); + nni_mtx_fini(&pq->mtx); return (rv); } - - // Positive values only for node indices. (0 is reserved for eventfd). - nni_idhash_set_limits(pq->nodes, 1, 0x7FFFFFFFu, 1); - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 0f312170..36ced3ff 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -12,196 +12,203 @@ #ifdef NNG_HAVE_KQUEUE #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ #include <sys/event.h> +#include <sys/socket.h> #include <unistd.h> #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -// TODO: can this be feature detected in cmake, -// rather than relying on platform? -#if defined NNG_PLATFORM_NETBSD -#define kevent_udata_t intptr_t -#else -#define kevent_udata_t void * -#endif - -#define NNI_MAX_KQUEUE_EVENTS 64 - -// user event id used to shutdown the polling thread -#define NNI_KQ_EV_EXIT_ID 0xF +typedef struct nni_posix_pollq nni_posix_pollq; // nni_posix_pollq is a work structure that manages state for the kqueue-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int kq; // kqueue handle - bool close; // request for worker to exit - bool started; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; +struct nni_posix_pfd { + nni_list_node node; // linkage into the reap list + nni_posix_pollq *pq; // associated pollq + int fd; // file descriptor to poll + void * data; // user data + nni_posix_pfd_cb cb; // user callback on event + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; + int events; + bool closing; + bool closed; +}; + +#define NNI_MAX_KQUEUE_EVENTS 64 + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { + nni_posix_pfd * pf; nni_posix_pollq *pq; - struct kevent kevents[2]; + struct kevent ev[2]; + unsigned flags = EV_ADD | EV_DISABLE; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); + if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - - node->pq = pq; - node->events = 0; - - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); + // Create entries in the kevent queue, without enabling them. + EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); + EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); - - if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) { - nni_mtx_unlock(&pq->mtx); + // We update the kqueue list, without polling for events. + if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + NNI_FREE_STRUCT(pf); return (nni_plat_errno(errno)); } + pf->fd = fd; + pf->cb = NULL; + pf->pq = pq; + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; - nni_mtx_unlock(&pq->mtx); return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +void +nni_posix_pfd_close(nni_posix_pfd *pf) { - struct kevent kevents[2]; - - node->events = 0; - node->pq = NULL; + nni_posix_pollq *pq = pf->pq; - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0, - (kevent_udata_t) node); - - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0, - 0, (kevent_udata_t) node); - - // So it turns out that we can get EBADF, ENOENT, and apparently - // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting - // an event, and its harmless if the event removal fails (worst - // case would be a spurious wakeup), so lets ignore it. - (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL); + nni_mtx_lock(&pq->mtx); + if (!pf->closing) { + struct kevent ev[2]; + pf->closing = true; + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + } + nni_mtx_unlock(&pq->mtx); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pf) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq; - if (pq == NULL) { - return; - } + pq = pf->pq; - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); + nni_posix_pfd_close(pf); - if (pq->close) { - nni_cv_wake(&pq->cv); + if (!nni_thr_is_self(&pq->thr)) { + struct kevent ev; + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pf); + EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL); + + // If this fails, the cleanup will stall. That should + // only occur in a memory pressure situation, and it + // will self-heal when the next event comes in. + (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); + while (!pf->closed) { + nni_cv_wait(&pf->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_unlock(&pq->mtx); + + (void) close(pf->fd); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); + NNI_FREE_STRUCT(pf); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. int -nni_posix_pollq_init(nni_posix_pollq_node *node) +nni_posix_pfd_fd(nni_posix_pfd *pf) { - NNI_ARG_UNUSED(node); - return (0); + return (pf->fd); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + NNI_ASSERT(cb != NULL); // must not be null when established. + nni_mtx_lock(&pf->mtx); + pf->cb = cb; + pf->data = arg; + nni_mtx_unlock(&pf->mtx); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pf, int events) { - nni_posix_pollq *pq = node->pq; - struct kevent kevents[2]; - int nevents = 0; + struct kevent ev[2]; + int nev = 0; + unsigned flags = EV_ENABLE | EV_DISPATCH; + nni_posix_pollq *pq = pf->pq; + + nni_mtx_lock(&pf->mtx); + if (pf->closing) { + events = 0; + } else { + pf->events |= events; + events = pf->events; + } + nni_mtx_unlock(&pf->mtx); - NNI_ASSERT(pq != NULL); if (events == 0) { - return; + // No events, and kqueue is oneshot, so nothing to do. + return (0); } - nni_mtx_lock(&pq->mtx); - - if (!(node->events & POLLIN) && (events & POLLIN)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLIN) { + EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - - if (!(node->events & POLLOUT) && (events & POLLOUT)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLOUT) { + EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } - - if (nevents > 0) { - // This call should never fail, really. The only possible - // legitimate failure would be ENOMEM, but in that case - // lots of other things are going to be failing, or ENOENT - // or ESRCH, indicating we already lost interest; the - // only consequence of ignoring these errors is that a given - // descriptor might appear "stuck". This beats the alternative - // of just blithely crashing the application with an assertion. - (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL); - node->events |= events; + while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); } + return (0); +} +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pf; + nni_mtx_lock(&pq->mtx); + while ((pf = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pf); + pf->closed = true; + nni_cv_wake(&pf->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -209,117 +216,71 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq *pq = arg; - struct kevent kevents[NNI_MAX_KQUEUE_EVENTS]; - - nni_mtx_lock(&pq->mtx); - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - nevents = kevent( - pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL); - nni_mtx_lock(&pq->mtx); - - if (nevents < 0) { - continue; + for (;;) { + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd * pf; + nni_posix_pfd_cb cb; + void * cbarg; + int revents; + bool reap = false; + + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); + if (n < 0) { + if (errno == EBADF) { + nni_posix_pollq_reap(pq); + return; + } + reap = true; } - // dispatch events - for (i = 0; i < nevents; ++i) { - struct kevent ev_disable; - const struct kevent * ev; - nni_posix_pollq_node *node; + for (int i = 0; i < n; i++) { + struct kevent *ev = &evs[i]; - ev = &kevents[i]; - if (ev->filter == EVFILT_USER && - ev->ident == NNI_KQ_EV_EXIT_ID) { - // we've woken up to exit the polling thread + switch (ev->filter) { + case EVFILT_READ: + revents = POLLIN; break; - } - - node = (nni_posix_pollq_node *) ev->udata; - if (node->pq == NULL) { - // node was removed while we were blocking + case EVFILT_WRITE: + revents = POLLOUT; + break; + case EVFILT_USER: + default: + reap = true; continue; } - node->revents = 0; - + pf = (void *) ev->udata; if (ev->flags & (EV_ERROR | EV_EOF)) { - node->revents |= POLLHUP; - } - if (ev->filter == EVFILT_WRITE) { - node->revents |= POLLOUT; - } else if (ev->filter == EVFILT_READ) { - node->revents |= POLLIN; - } else { - NNI_ASSERT(false); // unhandled filter - break; + revents |= POLLHUP; } - // explicitly disable this event. we'd ideally rely on - // the behavior of EV_DISPATCH to do this, - // but that only happens once we've acknowledged the - // event by reading/or writing the fd. because there - // can currently be some latency between the time we - // receive this event and the time we read/write in - // response, disable the event in the meantime to avoid - // needless wakeups. - // revisit if we're able to reduce/remove this latency. - EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter, - EV_DISABLE, 0, 0, NULL); - // this will only fail if the fd is already - // closed/invalid which we don't mind anyway, - // so ignore return value. - (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); - nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pf->mtx); + cb = pf->cb; + cbarg = pf->data; + pf->events &= ~(revents); + nni_mtx_unlock(&pf->mtx); + + if (cb != NULL) { + cb(pf, revents, cbarg); } } + if (reap) { + nni_posix_pollq_reap(pq); + } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE, - NOTE_TRIGGER, 0, NULL); - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); - if (pq->kq >= 0) { close(pq->kq); pq->kq = -1; } + nni_thr_fini(&pq->thr); + + nni_posix_pollq_reap(pq); nni_mtx_fini(&pq->mtx); } @@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) static int nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) { - // add user event so we can wake ourself on exit + int rv; struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL); - return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL))); + + EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL); + while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + return (0); } static int @@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (nni_plat_errno(errno)); } - pq->close = false; - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { @@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (rv); } - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index efc9ff48..26753df6 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -11,8 +11,6 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -#ifdef NNG_USE_POSIX_POLLQ_POLL - #include <errno.h> #include <fcntl.h> #include <poll.h> @@ -32,306 +30,285 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). + +// Locking strategy: We use the pollq lock to guard the lists on the pollq, +// the nfds (which counts the number of items in the pollq), the pollq +// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We +// use a lock on the pfd only to protect the the events field (which we treat +// as an atomic bitfield), and the cb and arg pointers. Note that the pfd +// lock is therefore a leaf lock, which is sometimes acquired while holding +// the pq lock. Every reasonable effort is made to minimize holding locks. +// (Btw, pfd->fd is not guarded, because it is set at pfd creation and +// persists until the pfd is destroyed.) + +typedef struct nni_posix_pollq nni_posix_pollq; + struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - struct pollfd * fds; - int nfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - int close; // request for worker to exit - int started; - nni_thr thr; // worker thread - nni_list polled; // polled nodes - nni_list armed; // armed nodes - nni_list idle; // idle nodes - int nnodes; // num of nodes in nodes list - int inpoll; // poller asleep in poll - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int nfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + bool closing; // request for worker to exit + bool closed; + nni_thr thr; // worker thread + nni_list pollq; // armed nodes + nni_list reapq; }; -static int -nni_posix_pollq_poll_grow(nni_posix_pollq *pq) -{ - int grow = pq->nnodes + 2; // one for us, one for waker - struct pollfd *newfds; +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_list_node node; + nni_cv cv; + nni_mtx mtx; + int events; + nni_posix_pfd_cb cb; + void * arg; +}; + +static nni_posix_pollq nni_posix_global_pollq; - if (grow < pq->nfds) { - return (0); +int +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +{ + nni_posix_pfd * pfd; + nni_posix_pollq *pq = &nni_posix_global_pollq; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + // If this code is getting used, you really should be using the + // kqueue poller, or you need to upgrade your older system. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif + + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } + NNI_LIST_NODE_INIT(&pfd->node); + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + pfd->fd = fd; + pfd->events = 0; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->pq = pq; + nni_mtx_lock(&pq->mtx); + if (pq->closing) { + nni_mtx_unlock(&pq->mtx); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); + return (NNG_ECLOSED); + } + nni_list_append(&pq->pollq, pfd); + pq->nfds++; + nni_mtx_unlock(&pq->mtx); + *pfdp = pfd; + return (0); +} - grow = grow + 128; +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) +{ + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); +} - if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) { - return (NNG_ENOMEM); +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) +{ + return (pfd->fd); +} + +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + (void) shutdown(pfd->fd, SHUT_RDWR); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + + nni_posix_pfd_close(pfd); + + nni_mtx_lock(&pq->mtx); + if (nni_list_active(&pq->pollq, pfd)) { + nni_list_remove(&pq->pollq, pfd); + pq->nfds--; } - if (pq->nfds != 0) { - NNI_FREE_STRUCTS(pq->fds, pq->nfds); + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_list_append(&pq->reapq, pfd); + nni_plat_pipe_raise(pq->wakewfd); + while (nni_list_active(&pq->reapq, pfd)) { + nni_cv_wait(&pfd->cv); + } } - pq->fds = newfds; - pq->nfds = grow; + nni_mtx_unlock(&pq->mtx); + + // We're exclusive now. + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} + +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) +{ + nni_posix_pollq *pq = pfd->pq; + nni_mtx_lock(&pfd->mtx); + pfd->events |= events; + nni_mtx_unlock(&pfd->mtx); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if (!nni_thr_is_self(&pq->thr)) { + nni_plat_pipe_raise(pq->wakewfd); + } return (0); } static void nni_posix_poll_thr(void *arg) { - nni_posix_pollq * pollq = arg; - nni_posix_pollq_node *node; + nni_posix_pollq *pq = arg; + int nalloc = 0; + struct pollfd * fds = NULL; + nni_posix_pfd ** pfds = NULL; - nni_mtx_lock(&pollq->mtx); for (;;) { - int rv; int nfds; - struct pollfd *fds; + int events; + nni_posix_pfd *pfd; - if (pollq->close) { - break; + nni_mtx_lock(&pq->mtx); + while (nalloc < (pq->nfds + 1)) { + int n = pq->nfds + 128; + + // Drop the lock while we sleep or allocate. This + // allows additional items to be added or removed (!) + // while we wait. + nni_mtx_unlock(&pq->mtx); + + // Toss the old ones first; avoids *doubling* memory + // consumption during alloc. + NNI_FREE_STRUCTS(fds, nalloc); + NNI_FREE_STRUCTS(pfds, nalloc); + nalloc = 0; + + if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) { + nni_msleep(10); // sleep for a bit, try later + } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) { + NNI_FREE_STRUCTS(pfds, n); + nni_msleep(10); + } else { + nalloc = n; + } + nni_mtx_lock(&pq->mtx); } - fds = pollq->fds; - nfds = 0; - // The waker pipe is set up so that we will be woken // when it is written (this allows us to be signaled). - fds[nfds].fd = pollq->wakerfd; - fds[nfds].events = POLLIN; - fds[nfds].revents = 0; - nfds++; + fds[0].fd = pq->wakerfd; + fds[0].events = POLLIN; + fds[0].revents = 0; + pfds[0] = NULL; + nfds = 1; + + // Also lets reap anything that was in the reaplist! + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pfd); + nni_cv_wake(&pfd->cv); + } - // Set up the poll list. - while ((node = nni_list_first(&pollq->armed)) != NULL) { - nni_list_remove(&pollq->armed, node); - nni_list_append(&pollq->polled, node); - fds[nfds].fd = node->fd; - fds[nfds].events = node->events; - fds[nfds].revents = 0; - node->index = nfds; - nfds++; + // If we're closing down, bail now. This is done *after* we + // have ensured that the reapq is empty. Anything still in + // the pollq is not going to receive further callbacks. + if (pq->closing) { + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + break; } - // Now poll it. We block indefinitely, since we use separate - // timeouts to wake and remove the elements from the list. - pollq->inpoll = 1; - nni_mtx_unlock(&pollq->mtx); - rv = poll(fds, nfds, -1); - nni_mtx_lock(&pollq->mtx); - pollq->inpoll = 0; - - if (rv < 0) { - // This shouldn't happen really. If it does, we - // just try again. (EINTR is probably the only - // reasonable failure here, unless internal memory - // allocations fail in the kernel, leading to EAGAIN.) - continue; + // Set up the poll list. + NNI_LIST_FOREACH (&pq->pollq, pfd) { + + nni_mtx_lock(&pfd->mtx); + events = pfd->events; + nni_mtx_unlock(&pfd->mtx); + + if (events != 0) { + fds[nfds].fd = pfd->fd; + fds[nfds].events = events; + fds[nfds].revents = 0; + pfds[nfds] = pfd; + nfds++; + } } + nni_mtx_unlock(&pq->mtx); + + // We could get the result from poll, and avoid iterating + // over the entire set of pollfds, but since on average we + // will be walking half the list, doubling the work we do + // (the condition with a potential pipeline stall) seems like + // adding complexity with no real benefit. It also makes the + // worst case even worse. + (void) poll(fds, nfds, -1); // If the waker pipe was signaled, read from it. if (fds[0].revents & POLLIN) { - NNI_ASSERT(fds[0].fd == pollq->wakerfd); - nni_plat_pipe_clear(pollq->wakerfd); + NNI_ASSERT(fds[0].fd == pq->wakerfd); + nni_plat_pipe_clear(pq->wakerfd); } - while ((node = nni_list_first(&pollq->polled)) != NULL) { - int index = node->index; - - // We remove ourselves from the polled list, and - // then put it on either the idle or armed list - // depending on whether it remains armed. - node->index = 0; - nni_list_remove(&pollq->polled, node); - NNI_ASSERT(index > 0); - if (fds[index].revents == 0) { - // If still watching for events, return it - // to the armed list. - if (node->events) { - nni_list_append(&pollq->armed, node); - } else { - nni_list_append(&pollq->idle, node); - } - continue; - } + for (int i = 1; i < nfds; i++) { + if ((events = fds[i].revents) != 0) { + nni_posix_pfd_cb cb; + void * arg; - // We are calling the callback, so disarm - // all events; the node can rearm them in its - // callback. - node->revents = fds[index].revents; - node->events &= ~node->revents; - if (node->events == 0) { - nni_list_append(&pollq->idle, node); - } else { - nni_list_append(&pollq->armed, node); - } + pfd = pfds[i]; + + nni_mtx_lock(&pfd->mtx); + cb = pfd->cb; + arg = pfd->arg; + pfd->events &= ~events; + nni_mtx_unlock(&pfd->mtx); - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pollq->active = node; - - // Execute the callback -- without locks held. - nni_mtx_unlock(&pollq->mtx); - node->cb(node->data); - nni_mtx_lock(&pollq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pollq->active = NULL; - if (pollq->wait == node) { - pollq->wait = NULL; - nni_cv_wake(&pollq->cv); + if (cb) { + cb(pfd, events, arg); + } } } } - nni_mtx_unlock(&pollq->mtx); -} - -int -nni_posix_pollq_add(nni_posix_pollq_node *node) -{ - int rv; - nni_posix_pollq *pq; - - NNI_ASSERT(!nni_list_node_active(&node->node)); - pq = nni_posix_pollq_get(node->fd); - if (node->pq != NULL) { - return (NNG_ESTATE); - } - - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - node->pq = pq; - if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) { - nni_mtx_unlock(&pq->mtx); - return (rv); - } - pq->nnodes++; - nni_list_append(&pq->idle, node); - nni_mtx_unlock(&pq->mtx); - return (0); + NNI_FREE_STRUCTS(fds, nalloc); + NNI_FREE_STRUCTS(pfds, nalloc); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +static void +nni_posix_pollq_destroy(nni_posix_pollq *pq) { - nni_posix_pollq *pq = node->pq; - - if (pq == NULL) { - return; - } - node->pq = NULL; nni_mtx_lock(&pq->mtx); - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - pq->nnodes--; - } - if (pq->close) { - nni_cv_wake(&pq->cv); - } + pq->closing = 1; nni_mtx_unlock(&pq->mtx); -} - -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) -{ - NNI_LIST_NODE_INIT(&node->node); - return (0); -} - -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. -void -nni_posix_pollq_fini(nni_posix_pollq_node *node) -{ - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - node->pq = NULL; - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - pq->nnodes--; - } - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); -} + nni_plat_pipe_raise(pq->wakewfd); -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) -{ - nni_posix_pollq *pq = node->pq; - int oevents; - - NNI_ASSERT(pq != NULL); - - nni_mtx_lock(&pq->mtx); - oevents = node->events; - node->events |= events; - - // We move this to the armed list if its not armed, or already - // on the polled list. The polled list would be the case where - // the index is set to a positive value. - if ((oevents == 0) && (events != 0) && (node->index < 1)) { - nni_list_node_remove(&node->node); - nni_list_append(&pq->armed, node); - } - if ((events != 0) && (oevents != events)) { - // Possibly wake up poller since we're looking for new events. - if (pq->inpoll) { - nni_plat_pipe_raise(pq->wakewfd); - } - } - nni_mtx_unlock(&pq->mtx); -} - -static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) -{ - if (pq->started) { - nni_mtx_lock(&pq->mtx); - pq->close = 1; - pq->started = 0; - nni_plat_pipe_raise(pq->wakewfd); - nni_mtx_unlock(&pq->mtx); - } nni_thr_fini(&pq->thr); - - // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_empty(&pq->polled)); - NNI_ASSERT(nni_list_empty(&pq->armed)); - NNI_ASSERT(nni_list_empty(&pq->idle)); - NNI_ASSERT(pq->nnodes == 0); - - if (pq->wakewfd >= 0) { - nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); - pq->wakewfd = pq->wakerfd = -1; - } - if (pq->nfds != 0) { - NNI_FREE_STRUCTS(pq->fds, pq->nfds); - pq->fds = NULL; - pq->nfds = 0; - } + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); nni_mtx_fini(&pq->mtx); } @@ -340,50 +317,27 @@ nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; - NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node); - NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node); - NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node); - pq->wakewfd = -1; - pq->wakerfd = -1; - pq->close = 0; - - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); + pq->closing = false; + pq->closed = false; - if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || - ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || - ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { - nni_posix_pollq_destroy(pq); + if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { return (rv); } - pq->started = 1; + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + return (rv); + } + nni_mtx_init(&pq->mtx); nni_thr_run(&pq->thr); return (0); } -// We use a single pollq for the entire system, which means only a single -// thread is polling. This may be somewhat less than optimally efficient, -// and it may be worth investigating having multiple threads to improve -// efficiency and scalability. (This would shorten the linked lists, -// improving C10K scalability, and also allow us to engage multiple cores.) -// It's not entirely clear how many threads are "optimal". -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - // This is the point where we could choose a pollq based on FD. - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { - int rv; - - rv = nni_posix_pollq_create(&nni_posix_global_pollq); - return (rv); + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } void @@ -391,5 +345,3 @@ nni_posix_pollq_sysfini(void) { nni_posix_pollq_destroy(&nni_posix_global_pollq); } - -#endif // NNG_USE_POSIX_POLLQ_POLL diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c index 7231cb27..63c1d1d1 100644 --- a/src/platform/posix/posix_pollq_port.c +++ b/src/platform/posix/posix_pollq_port.c @@ -1,7 +1,6 @@ // // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Liam Staskawicz <liam@stask.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -12,7 +11,9 @@ #ifdef NNG_HAVE_PORT_CREATE #include <errno.h> +#include <fcntl.h> #include <port.h> +#include <sched.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ @@ -21,6 +22,9 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" +#define NNI_MAX_PORTEV 64 +typedef struct nni_posix_pollq nni_posix_pollq; + // nni_posix_pollq is a work structure that manages state for the port-event // based pollq implementation. We only really need to keep track of the // single thread, and the associated port itself. @@ -29,190 +33,178 @@ struct nni_posix_pollq { nni_thr thr; // worker thread }; +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_mtx mtx; + nni_cv cv; + int events; + bool closed; + bool closing; + nni_posix_pfd_cb cb; + void * data; +}; + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { nni_posix_pollq *pq; + nni_posix_pfd * pfd; - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - nni_mtx_lock(&node->mx); - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - nni_mtx_unlock(&node->mx); - return (NNG_ESTATE); + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } - - node->pq = pq; - node->events = 0; - node->armed = false; - nni_mtx_unlock(&node->mx); - + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pfd->mtx); + pfd->closed = false; + pfd->closing = false; + pfd->fd = fd; + pfd->pq = pq; + pfd->cb = NULL; + pfd->data = NULL; + *pfdp = pfd; return (0); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; + return (pfd->fd); +} - if (pq == NULL) { - return; - } +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; - nni_mtx_lock(&node->mx); - node->events = 0; - if (node->armed) { - // Failure modes that can occur are uninteresting. - (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); - node->armed = false; + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + pfd->closing = true; + (void) shutdown(pfd->fd, SHUT_RDWR); + port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd); } - nni_mtx_unlock(&node->mx); -} + nni_mtx_unlock(&pfd->mtx); -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) -{ - nni_mtx_init(&node->mx); - nni_cv_init(&node->cv, &node->mx); - node->pq = NULL; - node->armed = false; - NNI_LIST_NODE_INIT(&node->node); - return (0); + // Send the wake event to the poller to synchronize with it. + // Note that port_send should only really fail if out of memory + // or we run into a resource limit. } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the node is removed properly. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq = pfd->pq; - nni_mtx_lock(&node->mx); - if ((pq = node->pq) != NULL) { - // Dissociate the port; if it isn't already associated we - // don't care. (An extra syscall, but it should not matter.) - (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); - node->armed = false; + nni_posix_pfd_close(pfd); - for (;;) { - if (port_send(pq->port, 0, node) == 0) { - break; - } - switch (errno) { - case EAGAIN: - case ENOMEM: - // Resource exhaustion. - // Best bet in these cases is to sleep it off. - // This may appear like a total application - // hang, but by sleeping here maybe we give - // a chance for things to clear up. - nni_mtx_unlock(&node->mx); - nni_msleep(5000); - nni_mtx_lock(&node->mx); - continue; - case EBADFD: - case EBADF: - // Most likely these indicate that the pollq - // itself has been closed. That's ok. + if (!nni_thr_is_self(&pq->thr)) { + + while (port_send(pq->port, 1, pfd) != 0) { + if ((errno == EBADF) || (errno == EBADFD)) { + pfd->closed = true; break; } + sched_yield(); // try again later... } - // Wait for the pollq thread to tell us with certainty that - // they are done. This is needed to ensure that the pollq - // thread isn't executing (or about to execute) the callback - // before we destroy it. - while (node->pq != NULL) { - nni_cv_wait(&node->cv); + + nni_mtx_lock(&pfd->mtx); + while (!pfd->closed) { + nni_cv_wait(&pfd->cv); } + nni_mtx_unlock(&pfd->mtx); } - nni_mtx_unlock(&node->mx); - nni_cv_fini(&node->cv); - nni_mtx_fini(&node->mx); + + // We're exclusive now. + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) { - nni_posix_pollq *pq = node->pq; - - NNI_ASSERT(pq != NULL); - if (events == 0) { - return; + nni_posix_pollq *pq = pfd->pq; + + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + pfd->events |= events; + if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, + pfd->events, pfd) != 0) { + int rv = nni_plat_errno(errno); + nni_mtx_unlock(&pfd->mtx); + return (rv); + } } - - nni_mtx_lock(&node->mx); - node->events |= events; - node->armed = true; - (void) port_associate( - pq->port, PORT_SOURCE_FD, node->fd, node->events, node); - - // Possible errors here are: - // - // EBADF -- programming error on our part - // EBADFD -- programming error on our part - // ENOMEM -- not much we can do here - // EAGAIN -- too many port events registered (65K!!) - // - // For now we ignore them all. (We need to be able to return - // errors to our caller.) Effect on the application will appear - // to be a stalled file descriptor (no notifications). - nni_mtx_unlock(&node->mx); + nni_mtx_unlock(&pfd->mtx); + return (0); } static void nni_posix_poll_thr(void *arg) { - for (;;) { - nni_posix_pollq * pq = arg; - port_event_t ev; - nni_posix_pollq_node *node; - - if (port_get(pq->port, &ev, NULL) != 0) { + nni_posix_pollq *pq = arg; + port_event_t ev[NNI_MAX_PORTEV]; + nni_posix_pfd * pfd; + int events; + nni_posix_pfd_cb cb; + void * arg; + unsigned n; + + n = 1; // wake us even on just one event + if (port_getn(pq->port, ev, NNI_MAX_PORTEV, &n, NULL) != 0) { if (errno == EINTR) { continue; } return; } - switch (ev.portev_source) { - case PORT_SOURCE_ALERT: - return; - - case PORT_SOURCE_FD: - node = ev.portev_user; + // We run through the returned ports twice. First we + // get the callbacks. Then we do the reaps. This way + // we ensure that we only reap *after* callbacks have run. + for (unsigned i = 0; i < n; i++) { + if (ev[i].portev_source != PORT_SOURCE_FD) { + continue; + } + pfd = ev[i].portev_user; + events = ev[i].portev_events; - nni_mtx_lock(&node->mx); - node->revents = ev.portev_events; - // mark events as cleared - node->events &= ~node->revents; - node->armed = false; - nni_mtx_unlock(&node->mx); + nni_mtx_lock(&pfd->mtx); + cb = pfd->cb; + arg = pfd->data; + pfd->events &= ~events; + nni_mtx_unlock(&pfd->mtx); - node->cb(node->data); - break; + if (cb != NULL) { + cb(pfd, events, arg); + } + } + for (unsigned i = 0; i < n; i++) { + if (ev[i].portev_source != PORT_SOURCE_USER) { + continue; + } - case PORT_SOURCE_USER: // User event telling us to stop doing things. - // We signal back to use this as a coordination event - // between the pollq and the thread handler. - // NOTE: It is absolutely critical that there is only - // a single thread per pollq. Otherwise we cannot - // be sure that we are blocked completely, - node = ev.portev_user; - nni_mtx_lock(&node->mx); - node->pq = NULL; - nni_cv_wake(&node->cv); - nni_mtx_unlock(&node->mx); + // We signal back to use this as a coordination + // event between the pollq and the thread + // handler. NOTE: It is absolutely critical + // that there is only a single thread per + // pollq. Otherwise we cannot be sure that we + // are blocked completely, + pfd = ev[i].portev_user; + nni_mtx_lock(&pfd->mtx); + pfd->closed = true; + nni_cv_wake(&pfd->cv); + nni_mtx_unlock(&pfd->mtx); } } } @@ -220,7 +212,6 @@ nni_posix_poll_thr(void *arg) static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - port_alert(pq->port, PORT_ALERT_SET, 1, NULL); (void) close(pq->port); nni_thr_fini(&pq->thr); } @@ -243,14 +234,15 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) { - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); + NNI_ASSERT(cb != NULL); // must not be null when established. + + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->data = arg; + nni_mtx_unlock(&pfd->mtx); } int diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index d86a2008..96d8debd 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,50 +37,51 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *nni_posix_resolv_tq = NULL; -static nni_mtx nni_posix_resolv_mtx; - -typedef struct nni_posix_resolv_item nni_posix_resolv_item; -struct nni_posix_resolv_item { - int family; - int passive; - const char *name; - const char *serv; - int proto; - nni_aio * aio; - nni_task task; +static nni_taskq *resolv_tq = NULL; +static nni_mtx resolv_mtx; + +typedef struct resolv_item resolv_item; +struct resolv_item { + int family; + int passive; + const char * name; + const char * serv; + int proto; + nni_aio * aio; + nni_task * task; + nng_sockaddr sa; }; static void -nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv) +resolv_finish(resolv_item *item, int rv) { nni_aio *aio; - if ((aio = item->aio) != NULL) { - if (nni_aio_get_prov_data(aio) == item) { - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_aio_finish(aio, rv, 0); - NNI_FREE_STRUCT(item); - } + if (((aio = item->aio) != NULL) && + (nni_aio_get_prov_data(aio) == item)) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); } } static void -nni_posix_resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, int rv) { - nni_posix_resolv_item *item; + resolv_item *item; - nni_mtx_lock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); item->aio = NULL; - nni_mtx_unlock(&nni_posix_resolv_mtx); - nni_task_cancel(&item->task); - NNI_FREE_STRUCT(item); + nni_mtx_unlock(&resolv_mtx); nni_aio_finish_error(aio, rv); } @@ -116,14 +117,23 @@ nni_posix_gai_errno(int rv) } static void -nni_posix_resolv_task(void *arg) +resolv_task(void *arg) { - nni_posix_resolv_item *item = arg; - nni_aio * aio = item->aio; - struct addrinfo hints; - struct addrinfo * results; - struct addrinfo * probe; - int rv; + resolv_item * item = arg; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; + + nni_mtx_lock(&resolv_mtx); + if (item->aio == NULL) { + nni_mtx_unlock(&resolv_mtx); + // Caller canceled, and no longer cares about this. + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + return; + } + nni_mtx_unlock(&resolv_mtx); results = NULL; @@ -170,7 +180,7 @@ nni_posix_resolv_task(void *arg) if (probe != NULL) { struct sockaddr_in * sin; struct sockaddr_in6 *sin6; - nng_sockaddr * sa = nni_aio_get_input(aio, 0); + nng_sockaddr * sa = &item->sa; switch (probe->ai_addr->sa_family) { case AF_INET: @@ -196,17 +206,18 @@ done: freeaddrinfo(results); } - nni_mtx_lock(&nni_posix_resolv_mtx); - nni_posix_resolv_finish(item, rv); - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); + resolv_finish(item, rv); + nni_mtx_unlock(&resolv_mtx); } static void -nni_posix_resolv_ip(const char *host, const char *serv, int passive, - int family, int proto, nni_aio *aio) +resolv_ip(const char *host, const char *serv, int passive, int family, + int proto, nni_aio *aio) { - nni_posix_resolv_item *item; - sa_family_t fam; + resolv_item *item; + sa_family_t fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -231,10 +242,15 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, return; } - nni_task_init( - nni_posix_resolv_tq, &item->task, nni_posix_resolv_task, item); + if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != + 0) { + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + }; // NB: host and serv must remain valid until this is completed. + memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; item->name = host; item->serv = serv; @@ -242,24 +258,30 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, item->aio = aio; item->family = fam; - nni_mtx_lock(&nni_posix_resolv_mtx); - nni_aio_schedule(aio, nni_posix_resolv_cancel, item); - nni_task_dispatch(&item->task); - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); + if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + nni_mtx_unlock(&resolv_mtx); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + } + nni_task_dispatch(item->task); + nni_mtx_unlock(&resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } int @@ -267,10 +289,10 @@ nni_posix_resolv_sysinit(void) { int rv; - nni_mtx_init(&nni_posix_resolv_mtx); + nni_mtx_init(&resolv_mtx); - if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { - nni_mtx_fini(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { + nni_mtx_fini(&resolv_mtx); return (rv); } return (0); @@ -279,11 +301,11 @@ nni_posix_resolv_sysinit(void) void nni_posix_resolv_sysfini(void) { - if (nni_posix_resolv_tq != NULL) { - nni_taskq_fini(nni_posix_resolv_tq); - nni_posix_resolv_tq = NULL; + if (resolv_tq != NULL) { + nni_taskq_fini(resolv_tq); + resolv_tq = NULL; } - nni_mtx_fini(&nni_posix_resolv_mtx); + nni_mtx_fini(&resolv_mtx); } #endif // NNG_USE_POSIX_RESOLV_GAI diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index cca8165c..0a521513 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -422,6 +422,12 @@ nni_plat_thr_fini(nni_plat_thr *thr) } } +bool +nni_plat_thr_is_self(nni_plat_thr *thr) +{ + return (pthread_self() == thr->tid); +} + void nni_atfork_child(void) { diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 654d31e3..759bdb96 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -36,24 +36,29 @@ #endif struct nni_plat_udp { - nni_posix_pollq_node udp_pitem; - int udp_fd; - nni_list udp_recvq; - nni_list udp_sendq; - nni_mtx udp_mtx; + nni_posix_pfd *udp_pfd; + int udp_fd; + nni_list udp_recvq; + nni_list udp_sendq; + nni_mtx udp_mtx; }; static void -nni_posix_udp_doclose(nni_plat_udp *udp) +nni_posix_udp_doerror(nni_plat_udp *udp, int rv) { nni_aio *aio; while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) || ((aio = nni_list_first(&udp->udp_sendq)) != NULL)) { nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_error(aio, rv); } - // Underlying socket left open until close API called. +} + +static void +nni_posix_udp_doclose(nni_plat_udp *udp) +{ + nni_posix_udp_doerror(udp, NNG_ECLOSED); } static void @@ -169,23 +174,22 @@ nni_posix_udp_dosend(nni_plat_udp *udp) // This function is called by the poller on activity on the FD. static void -nni_posix_udp_cb(void *arg) +nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_plat_udp *udp = arg; - int revents; + NNI_ASSERT(pfd == udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); - revents = udp->udp_pitem.revents; - if (revents & POLLIN) { + if (events & POLLIN) { nni_posix_udp_dorecv(udp); } - if (revents & POLLOUT) { + if (events & POLLOUT) { nni_posix_udp_dosend(udp); } - if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + if (events & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_udp_doclose(udp); } else { - int events = 0; + events = 0; if (!nni_list_empty(&udp->udp_sendq)) { events |= POLLOUT; } @@ -193,7 +197,11 @@ nni_posix_udp_cb(void *arg) events |= POLLIN; } if (events) { - nni_posix_pollq_arm(&udp->udp_pitem, events); + int rv; + rv = nni_posix_pfd_arm(udp->udp_pfd, events); + if (rv != 0) { + nni_posix_udp_doerror(udp, rv); + } } } nni_mtx_unlock(&udp->udp_mtx); @@ -232,22 +240,16 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) NNI_FREE_STRUCT(udp); return (rv); } - udp->udp_pitem.fd = udp->udp_fd; - udp->udp_pitem.cb = nni_posix_udp_cb; - udp->udp_pitem.data = udp; - - (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK); - - nni_aio_list_init(&udp->udp_recvq); - nni_aio_list_init(&udp->udp_sendq); - - if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) || - ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) { + if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) { (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); return (rv); } + nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp); + + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); *upp = udp; return (0); @@ -256,8 +258,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) void nni_plat_udp_close(nni_plat_udp *udp) { - // We're no longer interested in events. - nni_posix_pollq_fini(&udp->udp_pitem); + nni_posix_pfd_fini(udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); @@ -284,26 +285,46 @@ nni_plat_udp_cancel(nni_aio *aio, int rv) void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); - nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&udp->udp_recvq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + if (nni_list_first(&udp->udp_recvq) == aio) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + } nni_mtx_unlock(&udp->udp_mtx); } void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); - nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&udp->udp_sendq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); + if (nni_list_first(&udp->udp_sendq) == aio) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + } nni_mtx_unlock(&udp->udp_mtx); } |
