aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pipedesc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix/posix_pipedesc.c')
-rw-r--r--src/platform/posix/posix_pipedesc.c212
1 files changed, 112 insertions, 100 deletions
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index e7225395..b11036ea 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -40,11 +40,11 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq_node node;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
+ nni_posix_pfd *pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
};
static void
@@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
+ nni_posix_pfd_close(pd->pfd);
}
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
unsigned i;
@@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;
- n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
nni_aio_bump_count(aio, n);
// We completed the entire operation on this aioq.
+ // (Sendmsg never returns a partial result.)
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -147,6 +158,12 @@ static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
unsigned i;
@@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
}
- n = readv(pd->node.fd, iovec, niov);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
if (n == 0) {
@@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cb(void *arg)
+nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_pipedesc *pd = arg;
nni_mtx_lock(&pd->mtx);
- if (pd->node.revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_pipedesc_doread(pd);
}
- if (pd->node.revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_pipedesc_dowrite(pd);
}
- if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&pd->writeq)) {
events |= POLLOUT;
}
@@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg)
events |= POLLIN;
}
if ((!pd->closed) && (events != 0)) {
- nni_posix_pollq_arm(&pd->node, events);
+ nni_posix_pfd_arm(pfd, events);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_remove(&pd->node);
-
+ // NB: Events may still occur.
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
@@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->readq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
- // If we are only job on the list, go ahead and try to do an immediate
- // transfer. This allows for faster completions in many cases. We
- // also need not arm a list if it was already armed.
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
if (nni_list_first(&pd->readq) == aio) {
nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ nni_posix_pfd_arm(pd->pfd, POLLIN);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->writeq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ nni_posix_pfd_arm(pd->pfd, POLLOUT);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -349,9 +384,9 @@ int
nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
{
int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
@@ -361,61 +396,19 @@ int
nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
{
int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
}
int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
-{
- nni_posix_pipedesc *pd;
- int rv;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the pd to a single pollq we may get some kind of cache warmth.
-
- pd->closed = false;
- pd->node.fd = fd;
- pd->node.cb = nni_posix_pipedesc_cb;
- pd->node.data = pd;
-
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
- ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
- nni_mtx_fini(&pd->mtx);
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
- *pdp = pd;
- return (0);
-}
-
-int
nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
uint64_t *egid, uint64_t *prid, uint64_t *znid)
{
- int fd = pd->node.fd;
+ int fd = nni_posix_pfd_fd(pd->pfd);
#if defined(NNG_HAVE_GETPEEREID)
uid_t uid;
gid_t gid;
@@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
}
*euid = xu.cr_uid;
*egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID...
+ *prid = (uint64_t) -1; // XXX: macOS has undocumented
+ // LOCAL_PEERPID...
*znid = (uint64_t) -1;
return (0);
#else
@@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
#endif
}
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
+{
+ nni_posix_pipedesc *pd;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ pd->closed = false;
+ pd->pfd = pfd;
+
+ nni_mtx_init(&pd->mtx);
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
+
+ *pdp = pd;
+ return (0);
+}
+
void
nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
- // Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_fini(&pd->node);
- if (pd->node.fd >= 0) {
- (void) close(pd->node.fd);
- }
-
+ nni_posix_pfd_fini(pd->pfd);
+ pd->pfd = NULL;
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);