diff options
| author | Alexander M Pickering <amp215@pitt.edu> | 2025-02-01 02:24:13 -0600 |
|---|---|---|
| committer | Alexander M Pickering <amp215@pitt.edu> | 2025-02-01 02:24:13 -0600 |
| commit | 61bdb4fef88c1e83787dbb023b51d8d200844e3a (patch) | |
| tree | 6d905b6f61a0e932b1ace9771c714a80e0388af0 /a1.scala | |
| download | mscbio2046-master.tar.gz mscbio2046-master.tar.bz2 mscbio2046-master.zip | |
Diffstat (limited to 'a1.scala')
| -rw-r--r-- | a1.scala | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/a1.scala b/a1.scala new file mode 100644 index 0000000..3817c64 --- /dev/null +++ b/a1.scala @@ -0,0 +1,99 @@ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.SparkConf +import org.apache.spark.rdd._ +import org.bdgenomics.adam.rdd.ADAMContext._ +import collection.JavaConverters._ +import scala.io.Source +import scala.util.Random + +object Assign1 { + def main(args: Array[String]) + { + val panelfile = args(0) + val adamfile = args(1) + val conf = new SparkConf().setAppName("Assign1") + val sc = new SparkContext(conf) + + //... means "put something here" + val biggroups = Source.fromFile(panelfile).getLines().drop(1).map((str)=>str.drop(8).take(3)).toList.groupBy(identity).mapValues(_.size).filter(_._2>90) + val individualpeople = Source.fromFile(panelfile).getLines().drop(1).map((str)=>(str.take(7),str.drop(8).take(3))).toMap.filter(biggroups isDefinedAt _._2).keySet + + println("Populations with more than 90 individuals: " +biggroups.size ) + println("Individuals from these populations: " + individualpeople.size) + + val data = sc.loadGenotypes(adamfile) + + def convertAlleles + (x: java.util.List[org.bdgenomics.formats.avro.GenotypeAllele])={ + x.asScala.map(_.toString) + } + + def distance(a: Iterable[Double], b: Iterable[Double])={ + Math.sqrt(a.zip(b).map(r=>(r._1-r._2)*(r._1-r._2)).fold(0.0)(_+_)) + } + + def sumof(a: Iterable[Double], b: Iterable[Double]) = { + a.zip(b).map(r=>(r._1+r._2)).fold(0.0)(_+_) + } + + def addlists(a: Iterable[Double], b: Iterable[Double]) = { + a.zip(b).map(r=>r._1+r._2)} + + def centeroid(a: Iterable[Iterable[Double]]): Iterable[Double] = { + val numelements = a.size + a.tail.fold(a.head)((c,d)=>addlists(c,d)).map(r=>r/numelements) + } + + val cdata = data.rdd.map(r=>(r.contigName,r.start,r.end,r.sampleId,convertAlleles(r.alleles))) + + val varients = data.rdd.map(r=>(r.contigName,r.start,r.end)) + + val ids = data.rdd.map(r=>(r.sampleId,convertAlleles(r.alleles))) + + val copies = varients.zip(ids).groupBy(_._1).map(r=>(r._1->r._2.size)) + + val tpeople = data.rdd.map(r=>(r.sampleId)).distinct() + + val npeople = tpeople.count + + val gvarients = copies.filter(_._2 == npeople) + + val indbypeople = cdata.map(r=>((r._4)->(r._5->(r._1,r._2,r._3)))) + + val dcc = gvarients.count() + + println("Total variants: " + varients.distinct().count()) + println("Variants with right number of samples: " + dcc) + + val idsg = ids.groupBy(_._1) + + val people = idsg.map(r=>(r._1->r._2.map(c=>c._2.count(_=="Alt").toDouble))) + + var partitions = people.takeSample(false,21,0).map(r=>r._2) + + var i = 0 + var distancetoclusters = people.map(r=>r->partitions.map(t=>t->distance(r._2,t))) + var closestclusterto = distancetoclusters.map(r=>r._1->r._2.minBy(_._2)) + var npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2)) + + while(i < 10){ + var ndistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t))) + closestclusterto = ndistancetoclusters.map(r=>r._1->r._2.reduce((a,b)=>if(a._2 < b._2) a else b)) + npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2)) + i = i + 1 + } + + //One last clustering to put things in their final place + val finaldistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t))) + val finalclosestclusterto = distancetoclusters.map(r=>r._1->r._2.reduce((l,r)=>if(l._2 < r._2) l else r)) + val finalclusters = finalclosestclusterto.map(r=>r._2._1->r._1._1).groupBy(_._1).map(r=>r._1->r._2.map(t=>t._2)) + + println("Clusters:") + finalclusters.foreach(r=>println(r._2.fold("")(_+" "+_))) + + println("Number of final clusters:"+finalclusters.count()) + + System.exit(0) //you shouldn't need this, but sometimes spark hangs without it + } +} |
