aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_pipedesc.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-09 17:21:27 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-14 17:09:20 -0700
commit16b4c4019c7b7904de171c588ed8c72ca732d2cf (patch)
tree9e5a8416470631cfb48f5a6ebdd4b16e4b1be3d6 /src/platform/posix/posix_pipedesc.c
parente0beb13b066d27ce32347a1c18c9d441828dc553 (diff)
downloadnng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.gz
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.tar.bz2
nng-16b4c4019c7b7904de171c588ed8c72ca732d2cf.zip
fixes #352 aio lock is burning hot
fixes #326 consider nni_taskq_exec_synch() fixes #410 kqueue implementation could be smarter fixes #411 epoll_implementation could be smarter fixes #426 synchronous completion can lead to panic fixes #421 pipe close race condition/duplicate destroy This is a major refactoring of two significant parts of the code base, which are closely interrelated. First the aio and taskq framework have undergone a number of simplifications, and improvements. We have ditched a few parts of the internal API (for example tasks no longer support cancellation) that weren't terribly useful but added a lot of complexity, and we've made aio_schedule something that now checks for cancellation or other "premature" completions. The aio framework now uses the tasks more tightly, so that aio wait can devolve into just nni_task_wait(). We did have to add a "task_prep()" step to prevent race conditions. Second, the entire POSIX poller framework has been simplified, and made more robust, and more scalable. There were some fairly inherent race conditions around the shutdown/close code, where we *thought* we were synchronizing against the other thread, but weren't doing so adequately. With a cleaner design, we've been able to tighten up the implementation to remove these race conditions, while substantially reducing the chance for lock contention, thereby improving scalability. The illumos poller also got a performance boost by polling for multiple events. In highly "busy" systems, we expect to see vast reductions in lock contention, and therefore greater scalability, in addition to overall improved reliability. One area where we currently can do better is that there is still only a single poller thread run. Scaling this out is a task that has to be done differently for each poller, and carefuly to ensure that close conditions are safe on all pollers, and that no chance for deadlock/livelock waiting for pfd finalizers can occur.
Diffstat (limited to 'src/platform/posix/posix_pipedesc.c')
-rw-r--r--src/platform/posix/posix_pipedesc.c212
1 files changed, 112 insertions, 100 deletions
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index e7225395..b11036ea 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -40,11 +40,11 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq_node node;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
+ nni_posix_pfd *pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
};
static void
@@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
+ nni_posix_pfd_close(pd->pfd);
}
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
unsigned i;
@@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;
- n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
nni_aio_bump_count(aio, n);
// We completed the entire operation on this aioq.
+ // (Sendmsg never returns a partial result.)
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -147,6 +158,12 @@ static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
unsigned i;
@@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
}
- n = readv(pd->node.fd, iovec, niov);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
if (n == 0) {
@@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cb(void *arg)
+nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_pipedesc *pd = arg;
nni_mtx_lock(&pd->mtx);
- if (pd->node.revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_pipedesc_doread(pd);
}
- if (pd->node.revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_pipedesc_dowrite(pd);
}
- if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&pd->writeq)) {
events |= POLLOUT;
}
@@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg)
events |= POLLIN;
}
if ((!pd->closed) && (events != 0)) {
- nni_posix_pollq_arm(&pd->node, events);
+ nni_posix_pfd_arm(pfd, events);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_remove(&pd->node);
-
+ // NB: Events may still occur.
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
@@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->readq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
- // If we are only job on the list, go ahead and try to do an immediate
- // transfer. This allows for faster completions in many cases. We
- // also need not arm a list if it was already armed.
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
if (nni_list_first(&pd->readq) == aio) {
nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ nni_posix_pfd_arm(pd->pfd, POLLIN);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->writeq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ nni_posix_pfd_arm(pd->pfd, POLLOUT);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -349,9 +384,9 @@ int
nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
{
int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
@@ -361,61 +396,19 @@ int
nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
{
int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
}
int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
-{
- nni_posix_pipedesc *pd;
- int rv;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the pd to a single pollq we may get some kind of cache warmth.
-
- pd->closed = false;
- pd->node.fd = fd;
- pd->node.cb = nni_posix_pipedesc_cb;
- pd->node.data = pd;
-
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
- ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
- nni_mtx_fini(&pd->mtx);
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
- *pdp = pd;
- return (0);
-}
-
-int
nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
uint64_t *egid, uint64_t *prid, uint64_t *znid)
{
- int fd = pd->node.fd;
+ int fd = nni_posix_pfd_fd(pd->pfd);
#if defined(NNG_HAVE_GETPEEREID)
uid_t uid;
gid_t gid;
@@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
}
*euid = xu.cr_uid;
*egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID...
+ *prid = (uint64_t) -1; // XXX: macOS has undocumented
+ // LOCAL_PEERPID...
*znid = (uint64_t) -1;
return (0);
#else
@@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
#endif
}
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
+{
+ nni_posix_pipedesc *pd;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ pd->closed = false;
+ pd->pfd = pfd;
+
+ nni_mtx_init(&pd->mtx);
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
+
+ *pdp = pd;
+ return (0);
+}
+
void
nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
- // Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_fini(&pd->node);
- if (pd->node.fd >= 0) {
- (void) close(pd->node.fd);
- }
-
+ nni_posix_pfd_fini(pd->pfd);
+ pd->pfd = NULL;
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);