import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd._ import org.bdgenomics.adam.rdd.ADAMContext._ import collection.JavaConverters._ import scala.io.Source import scala.util.Random object Assign1 { def main(args: Array[String]) { val panelfile = args(0) val adamfile = args(1) val conf = new SparkConf().setAppName("Assign1") val sc = new SparkContext(conf) //... means "put something here" val biggroups = Source.fromFile(panelfile).getLines().drop(1).map((str)=>str.drop(8).take(3)).toList.groupBy(identity).mapValues(_.size).filter(_._2>90) val individualpeople = Source.fromFile(panelfile).getLines().drop(1).map((str)=>(str.take(7),str.drop(8).take(3))).toMap.filter(biggroups isDefinedAt _._2).keySet println("Populations with more than 90 individuals: " +biggroups.size ) println("Individuals from these populations: " + individualpeople.size) val data = sc.loadGenotypes(adamfile) def convertAlleles (x: java.util.List[org.bdgenomics.formats.avro.GenotypeAllele])={ x.asScala.map(_.toString) } def distance(a: Iterable[Double], b: Iterable[Double])={ Math.sqrt(a.zip(b).map(r=>(r._1-r._2)*(r._1-r._2)).fold(0.0)(_+_)) } def sumof(a: Iterable[Double], b: Iterable[Double]) = { a.zip(b).map(r=>(r._1+r._2)).fold(0.0)(_+_) } def addlists(a: Iterable[Double], b: Iterable[Double]) = { a.zip(b).map(r=>r._1+r._2)} def centeroid(a: Iterable[Iterable[Double]]): Iterable[Double] = { val numelements = a.size a.tail.fold(a.head)((c,d)=>addlists(c,d)).map(r=>r/numelements) } val cdata = data.rdd.map(r=>(r.contigName,r.start,r.end,r.sampleId,convertAlleles(r.alleles))) val varients = data.rdd.map(r=>(r.contigName,r.start,r.end)) val ids = data.rdd.map(r=>(r.sampleId,convertAlleles(r.alleles))) val copies = varients.zip(ids).groupBy(_._1).map(r=>(r._1->r._2.size)) val tpeople = data.rdd.map(r=>(r.sampleId)).distinct() val npeople = tpeople.count val gvarients = copies.filter(_._2 == npeople) val indbypeople = cdata.map(r=>((r._4)->(r._5->(r._1,r._2,r._3)))) val dcc = gvarients.count() println("Total variants: " + varients.distinct().count()) println("Variants with right number of samples: " + dcc) val idsg = ids.groupBy(_._1) val people = idsg.map(r=>(r._1->r._2.map(c=>c._2.count(_=="Alt").toDouble))) var partitions = people.takeSample(false,21,0).map(r=>r._2) var i = 0 var distancetoclusters = people.map(r=>r->partitions.map(t=>t->distance(r._2,t))) var closestclusterto = distancetoclusters.map(r=>r._1->r._2.minBy(_._2)) var npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2)) while(i < 10){ var ndistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t))) closestclusterto = ndistancetoclusters.map(r=>r._1->r._2.reduce((a,b)=>if(a._2 < b._2) a else b)) npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2)) i = i + 1 } //One last clustering to put things in their final place val finaldistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t))) val finalclosestclusterto = distancetoclusters.map(r=>r._1->r._2.reduce((l,r)=>if(l._2 < r._2) l else r)) val finalclusters = finalclosestclusterto.map(r=>r._2._1->r._1._1).groupBy(_._1).map(r=>r._1->r._2.map(t=>t._2)) println("Clusters:") finalclusters.foreach(r=>println(r._2.fold("")(_+" "+_))) println("Number of final clusters:"+finalclusters.count()) System.exit(0) //you shouldn't need this, but sometimes spark hangs without it } }