1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd._
import org.bdgenomics.adam.rdd.ADAMContext._
import collection.JavaConverters._
import scala.io.Source
import scala.util.Random
object Assign1 {
def main(args: Array[String])
{
val panelfile = args(0)
val adamfile = args(1)
val conf = new SparkConf().setAppName("Assign1")
val sc = new SparkContext(conf)
//... means "put something here"
val biggroups = Source.fromFile(panelfile).getLines().drop(1).map((str)=>str.drop(8).take(3)).toList.groupBy(identity).mapValues(_.size).filter(_._2>90)
val individualpeople = Source.fromFile(panelfile).getLines().drop(1).map((str)=>(str.take(7),str.drop(8).take(3))).toMap.filter(biggroups isDefinedAt _._2).keySet
println("Populations with more than 90 individuals: " +biggroups.size )
println("Individuals from these populations: " + individualpeople.size)
val data = sc.loadGenotypes(adamfile)
def convertAlleles
(x: java.util.List[org.bdgenomics.formats.avro.GenotypeAllele])={
x.asScala.map(_.toString)
}
def distance(a: Iterable[Double], b: Iterable[Double])={
Math.sqrt(a.zip(b).map(r=>(r._1-r._2)*(r._1-r._2)).fold(0.0)(_+_))
}
def sumof(a: Iterable[Double], b: Iterable[Double]) = {
a.zip(b).map(r=>(r._1+r._2)).fold(0.0)(_+_)
}
def addlists(a: Iterable[Double], b: Iterable[Double]) = {
a.zip(b).map(r=>r._1+r._2)}
def centeroid(a: Iterable[Iterable[Double]]): Iterable[Double] = {
val numelements = a.size
a.tail.fold(a.head)((c,d)=>addlists(c,d)).map(r=>r/numelements)
}
val cdata = data.rdd.map(r=>(r.contigName,r.start,r.end,r.sampleId,convertAlleles(r.alleles)))
val varients = data.rdd.map(r=>(r.contigName,r.start,r.end))
val ids = data.rdd.map(r=>(r.sampleId,convertAlleles(r.alleles)))
val copies = varients.zip(ids).groupBy(_._1).map(r=>(r._1->r._2.size))
val tpeople = data.rdd.map(r=>(r.sampleId)).distinct()
val npeople = tpeople.count
val gvarients = copies.filter(_._2 == npeople)
val indbypeople = cdata.map(r=>((r._4)->(r._5->(r._1,r._2,r._3))))
val dcc = gvarients.count()
println("Total variants: " + varients.distinct().count())
println("Variants with right number of samples: " + dcc)
val idsg = ids.groupBy(_._1)
val people = idsg.map(r=>(r._1->r._2.map(c=>c._2.count(_=="Alt").toDouble)))
var partitions = people.takeSample(false,21,0).map(r=>r._2)
var i = 0
var distancetoclusters = people.map(r=>r->partitions.map(t=>t->distance(r._2,t)))
var closestclusterto = distancetoclusters.map(r=>r._1->r._2.minBy(_._2))
var npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))
while(i < 10){
var ndistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
closestclusterto = ndistancetoclusters.map(r=>r._1->r._2.reduce((a,b)=>if(a._2 < b._2) a else b))
npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))
i = i + 1
}
//One last clustering to put things in their final place
val finaldistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
val finalclosestclusterto = distancetoclusters.map(r=>r._1->r._2.reduce((l,r)=>if(l._2 < r._2) l else r))
val finalclusters = finalclosestclusterto.map(r=>r._2._1->r._1._1).groupBy(_._1).map(r=>r._1->r._2.map(t=>t._2))
println("Clusters:")
finalclusters.foreach(r=>println(r._2.fold("")(_+" "+_)))
println("Number of final clusters:"+finalclusters.count())
System.exit(0) //you shouldn't need this, but sometimes spark hangs without it
}
}
|