summaryrefslogtreecommitdiff
path: root/a1.scala
diff options
context:
space:
mode:
authorAlexander M Pickering <amp215@pitt.edu>2025-02-01 02:24:13 -0600
committerAlexander M Pickering <amp215@pitt.edu>2025-02-01 02:24:13 -0600
commit61bdb4fef88c1e83787dbb023b51d8d200844e3a (patch)
tree6d905b6f61a0e932b1ace9771c714a80e0388af0 /a1.scala
downloadmscbio2046-master.tar.gz
mscbio2046-master.tar.bz2
mscbio2046-master.zip
Inital commitHEADmaster
Diffstat (limited to 'a1.scala')
-rw-r--r--a1.scala99
1 files changed, 99 insertions, 0 deletions
diff --git a/a1.scala b/a1.scala
new file mode 100644
index 0000000..3817c64
--- /dev/null
+++ b/a1.scala
@@ -0,0 +1,99 @@
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd._
+import org.bdgenomics.adam.rdd.ADAMContext._
+import collection.JavaConverters._
+import scala.io.Source
+import scala.util.Random
+
+object Assign1 {
+ def main(args: Array[String])
+ {
+ val panelfile = args(0)
+ val adamfile = args(1)
+ val conf = new SparkConf().setAppName("Assign1")
+ val sc = new SparkContext(conf)
+
+ //... means "put something here"
+ val biggroups = Source.fromFile(panelfile).getLines().drop(1).map((str)=>str.drop(8).take(3)).toList.groupBy(identity).mapValues(_.size).filter(_._2>90)
+ val individualpeople = Source.fromFile(panelfile).getLines().drop(1).map((str)=>(str.take(7),str.drop(8).take(3))).toMap.filter(biggroups isDefinedAt _._2).keySet
+
+ println("Populations with more than 90 individuals: " +biggroups.size )
+ println("Individuals from these populations: " + individualpeople.size)
+
+ val data = sc.loadGenotypes(adamfile)
+
+ def convertAlleles
+ (x: java.util.List[org.bdgenomics.formats.avro.GenotypeAllele])={
+ x.asScala.map(_.toString)
+ }
+
+ def distance(a: Iterable[Double], b: Iterable[Double])={
+ Math.sqrt(a.zip(b).map(r=>(r._1-r._2)*(r._1-r._2)).fold(0.0)(_+_))
+ }
+
+ def sumof(a: Iterable[Double], b: Iterable[Double]) = {
+ a.zip(b).map(r=>(r._1+r._2)).fold(0.0)(_+_)
+ }
+
+ def addlists(a: Iterable[Double], b: Iterable[Double]) = {
+ a.zip(b).map(r=>r._1+r._2)}
+
+ def centeroid(a: Iterable[Iterable[Double]]): Iterable[Double] = {
+ val numelements = a.size
+ a.tail.fold(a.head)((c,d)=>addlists(c,d)).map(r=>r/numelements)
+ }
+
+ val cdata = data.rdd.map(r=>(r.contigName,r.start,r.end,r.sampleId,convertAlleles(r.alleles)))
+
+ val varients = data.rdd.map(r=>(r.contigName,r.start,r.end))
+
+ val ids = data.rdd.map(r=>(r.sampleId,convertAlleles(r.alleles)))
+
+ val copies = varients.zip(ids).groupBy(_._1).map(r=>(r._1->r._2.size))
+
+ val tpeople = data.rdd.map(r=>(r.sampleId)).distinct()
+
+ val npeople = tpeople.count
+
+ val gvarients = copies.filter(_._2 == npeople)
+
+ val indbypeople = cdata.map(r=>((r._4)->(r._5->(r._1,r._2,r._3))))
+
+ val dcc = gvarients.count()
+
+ println("Total variants: " + varients.distinct().count())
+ println("Variants with right number of samples: " + dcc)
+
+ val idsg = ids.groupBy(_._1)
+
+ val people = idsg.map(r=>(r._1->r._2.map(c=>c._2.count(_=="Alt").toDouble)))
+
+ var partitions = people.takeSample(false,21,0).map(r=>r._2)
+
+ var i = 0
+ var distancetoclusters = people.map(r=>r->partitions.map(t=>t->distance(r._2,t)))
+ var closestclusterto = distancetoclusters.map(r=>r._1->r._2.minBy(_._2))
+ var npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))
+
+ while(i < 10){
+ var ndistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
+ closestclusterto = ndistancetoclusters.map(r=>r._1->r._2.reduce((a,b)=>if(a._2 < b._2) a else b))
+ npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))
+ i = i + 1
+ }
+
+ //One last clustering to put things in their final place
+ val finaldistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
+ val finalclosestclusterto = distancetoclusters.map(r=>r._1->r._2.reduce((l,r)=>if(l._2 < r._2) l else r))
+ val finalclusters = finalclosestclusterto.map(r=>r._2._1->r._1._1).groupBy(_._1).map(r=>r._1->r._2.map(t=>t._2))
+
+ println("Clusters:")
+ finalclusters.foreach(r=>println(r._2.fold("")(_+" "+_)))
+
+ println("Number of final clusters:"+finalclusters.count())
+
+ System.exit(0) //you shouldn't need this, but sometimes spark hangs without it
+ }
+}