summaryrefslogtreecommitdiff
path: root/a1.scala
blob: 3817c6428bf3041a34eac308d0e77c76304aeb98 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd._
import org.bdgenomics.adam.rdd.ADAMContext._
import collection.JavaConverters._
import scala.io.Source
import scala.util.Random

object Assign1 {
    def main(args: Array[String])
    {
        val panelfile = args(0)
        val adamfile = args(1)
        val conf = new SparkConf().setAppName("Assign1")
        val sc = new SparkContext(conf)

        //... means "put something here"
        val biggroups = Source.fromFile(panelfile).getLines().drop(1).map((str)=>str.drop(8).take(3)).toList.groupBy(identity).mapValues(_.size).filter(_._2>90)
        val individualpeople = Source.fromFile(panelfile).getLines().drop(1).map((str)=>(str.take(7),str.drop(8).take(3))).toMap.filter(biggroups isDefinedAt _._2).keySet

        println("Populations with more than 90 individuals: " +biggroups.size )
        println("Individuals from these populations: " + individualpeople.size)

        val data = sc.loadGenotypes(adamfile)

        def convertAlleles
                (x: java.util.List[org.bdgenomics.formats.avro.GenotypeAllele])={
                        x.asScala.map(_.toString)
                }

        def distance(a: Iterable[Double], b: Iterable[Double])={
            Math.sqrt(a.zip(b).map(r=>(r._1-r._2)*(r._1-r._2)).fold(0.0)(_+_))
            }

        def sumof(a: Iterable[Double], b: Iterable[Double]) = {
                a.zip(b).map(r=>(r._1+r._2)).fold(0.0)(_+_)
                }

        def addlists(a: Iterable[Double], b: Iterable[Double]) = {
                a.zip(b).map(r=>r._1+r._2)}

        def centeroid(a: Iterable[Iterable[Double]]): Iterable[Double] = {
                val numelements = a.size
                a.tail.fold(a.head)((c,d)=>addlists(c,d)).map(r=>r/numelements)
                }

        val cdata = data.rdd.map(r=>(r.contigName,r.start,r.end,r.sampleId,convertAlleles(r.alleles)))

        val varients = data.rdd.map(r=>(r.contigName,r.start,r.end))

        val ids = data.rdd.map(r=>(r.sampleId,convertAlleles(r.alleles)))

        val copies = varients.zip(ids).groupBy(_._1).map(r=>(r._1->r._2.size))

        val tpeople = data.rdd.map(r=>(r.sampleId)).distinct()

        val npeople = tpeople.count

        val gvarients = copies.filter(_._2 == npeople)

        val indbypeople = cdata.map(r=>((r._4)->(r._5->(r._1,r._2,r._3))))

        val dcc = gvarients.count()

        println("Total variants: " + varients.distinct().count())
        println("Variants with right number of samples: " + dcc)

        val idsg = ids.groupBy(_._1)

        val people = idsg.map(r=>(r._1->r._2.map(c=>c._2.count(_=="Alt").toDouble)))

        var partitions = people.takeSample(false,21,0).map(r=>r._2)

        var i = 0
        var distancetoclusters = people.map(r=>r->partitions.map(t=>t->distance(r._2,t)))
        var closestclusterto = distancetoclusters.map(r=>r._1->r._2.minBy(_._2))
        var npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))

        while(i < 10){
            var ndistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
            closestclusterto = ndistancetoclusters.map(r=>r._1->r._2.reduce((a,b)=>if(a._2 < b._2) a else b))
            npartitions = closestclusterto.map(r=>r._1->r._2._1).groupBy(_._2).map(r=>r._1->r._2.map(c=>c._2)).map(r=>centeroid(r._2))
            i = i + 1
        }

        //One last clustering to put things in their final place
        val finaldistancetoclusters = people.map(r=>r->npartitions.map(t=>t->distance(r._2,t)))
        val finalclosestclusterto = distancetoclusters.map(r=>r._1->r._2.reduce((l,r)=>if(l._2 < r._2) l else r))
        val finalclusters = finalclosestclusterto.map(r=>r._2._1->r._1._1).groupBy(_._1).map(r=>r._1->r._2.map(t=>t._2))

        println("Clusters:")
        finalclusters.foreach(r=>println(r._2.fold("")(_+" "+_)))

        println("Number of final clusters:"+finalclusters.count())

        System.exit(0) //you shouldn't need this, but sometimes spark hangs without it
    }
}