diff options
Diffstat (limited to 'src/platform/posix/posix_pipedesc.c')
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 212 |
1 files changed, 112 insertions, 100 deletions
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index e7225395..b11036ea 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -40,11 +40,11 @@ // file descriptor for TCP socket, etc.) This contains the list of pending // aios for that underlying socket, as well as the socket itself. struct nni_posix_pipedesc { - nni_posix_pollq_node node; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; + nni_posix_pfd *pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; }; static void @@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) while ((aio = nni_list_first(&pd->writeq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); } - if (pd->node.fd != -1) { - // Let any peer know we are closing. - (void) shutdown(pd->node.fd, SHUT_RDWR); - } + nni_posix_pfd_close(pd->pfd); } static void nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->writeq)) != NULL) { unsigned i; @@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) hdr.msg_iovlen = niov; hdr.msg_iov = iovec; - n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } nni_aio_bump_count(aio, n); // We completed the entire operation on this aioq. + // (Sendmsg never returns a partial result.) nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another @@ -147,6 +158,12 @@ static void nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->readq)) != NULL) { unsigned i; @@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } } - n = readv(pd->node.fd, iovec, niov); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } if (n == 0) { @@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_cb(void *arg) +nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_pipedesc *pd = arg; nni_mtx_lock(&pd->mtx); - if (pd->node.revents & POLLIN) { + if (events & POLLIN) { nni_posix_pipedesc_doread(pd); } - if (pd->node.revents & POLLOUT) { + if (events & POLLOUT) { nni_posix_pipedesc_dowrite(pd); } - if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) { + if (events & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_pipedesc_doclose(pd); } else { - int events = 0; + events = 0; if (!nni_list_empty(&pd->writeq)) { events |= POLLOUT; } @@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg) events |= POLLIN; } if ((!pd->closed) && (events != 0)) { - nni_posix_pollq_arm(&pd->node, events); + nni_posix_pfd_arm(pfd, events); } } nni_mtx_unlock(&pd->mtx); @@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_remove(&pd->node); - + // NB: Events may still occur. nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); nni_mtx_unlock(&pd->mtx); @@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv) void nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->readq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); - // If we are only job on the list, go ahead and try to do an immediate - // transfer. This allows for faster completions in many cases. We - // also need not arm a list if it was already armed. + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. if (nni_list_first(&pd->readq) == aio) { nni_posix_pipedesc_doread(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->readq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLIN); + nni_posix_pfd_arm(pd->pfd, POLLIN); } } nni_mtx_unlock(&pd->mtx); @@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) void nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->writeq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); if (nni_list_first(&pd->writeq) == aio) { nni_posix_pipedesc_dowrite(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLOUT); + nni_posix_pfd_arm(pd->pfd, POLLOUT); } } nni_mtx_unlock(&pd->mtx); @@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getpeername(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getsockname(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -349,9 +384,9 @@ int nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay) { int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); @@ -361,61 +396,19 @@ int nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep) { int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); } int -nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) -{ - nni_posix_pipedesc *pd; - int rv; - - if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { - return (NNG_ENOMEM); - } - - // We could randomly choose a different pollq, or for efficiencies - // sake we could take a modulo of the file desc number to choose - // one. For now we just have a global pollq. Note that by tying - // the pd to a single pollq we may get some kind of cache warmth. - - pd->closed = false; - pd->node.fd = fd; - pd->node.cb = nni_posix_pipedesc_cb; - pd->node.data = pd; - - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - -#ifdef SO_NOSIGPIPE - // Darwin lacks MSG_NOSIGNAL, but has a socket option. - int one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); -#endif - - nni_mtx_init(&pd->mtx); - nni_aio_list_init(&pd->readq); - nni_aio_list_init(&pd->writeq); - - if (((rv = nni_posix_pollq_init(&pd->node)) != 0) || - ((rv = nni_posix_pollq_add(&pd->node)) != 0)) { - nni_mtx_fini(&pd->mtx); - NNI_FREE_STRUCT(pd); - return (rv); - } - *pdp = pd; - return (0); -} - -int nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid) { - int fd = pd->node.fd; + int fd = nni_posix_pfd_fd(pd->pfd); #if defined(NNG_HAVE_GETPEEREID) uid_t uid; gid_t gid; @@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, } *euid = xu.cr_uid; *egid = xu.cr_gid; - *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID... + *prid = (uint64_t) -1; // XXX: macOS has undocumented + // LOCAL_PEERPID... *znid = (uint64_t) -1; return (0); #else @@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, #endif } +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd) +{ + nni_posix_pipedesc *pd; + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + + pd->closed = false; + pd->pfd = pfd; + + nni_mtx_init(&pd->mtx); + nni_aio_list_init(&pd->readq); + nni_aio_list_init(&pd->writeq); + + nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd); + + *pdp = pd; + return (0); +} + void nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { - // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); - nni_posix_pollq_fini(&pd->node); - if (pd->node.fd >= 0) { - (void) close(pd->node.fd); - } - + nni_posix_pfd_fini(pd->pfd); + pd->pfd = NULL; nni_mtx_fini(&pd->mtx); NNI_FREE_STRUCT(pd); |
