package org.apache.spark.examples

import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.storage.StorageLevel
object SimpleApp001 { 

  def main(args: Array[String]) { 
       val conf = new SparkConf().setAppName("SimpleApp")   //.setMaster("yarn-client")
    conf.set("spark.scheduler.mode", "FAIR") 
    val sc = new SparkContext(conf) 

    val logData = sc.textFile("hdfs:/in1/file").persist(StorageLevel.MEMORY_ONLY_2)
  //  val parts=logData.repartition(10)
   //   println(parts.partitions.length)  //cache() 
    val threadA = new Thread(new Runnable { 
      def run() { 
        val numAs = logData.filter(line => line.contains("a")) 
        numAs.saveAsTextFile("hdfs:/t1") 
        println("Lines with a: %s".format(numAs.count)) 
      } 
    }) 
    
   val threadB = new Thread(new Runnable { 
      def run() { 
        val numBs = logData.filter(line => line.contains("b")) 
        numBs.saveAsTextFile("hdfs:/t2") 
        println("Lines with b: %s".format(numBs.count)) 
      } 
    }) 
    
    val threadC = new Thread(new Runnable { 
      def run() { 
        val numCs = logData.filter(line => line.contains("c")) 
        numCs.saveAsTextFile("hdfs:/t3") 
        println("Lines with c: %s".format( numCs.count)) 
      } 
    }) 
   
     val threadD = new Thread(new Runnable { 
      def run() { 
        val numDs = logData.filter(line => line.contains("d")) 
        numDs.saveAsTextFile("hdfs:/t4") 
        println("Lines with d: %s".format( numDs.count)) 
      } 
    })

 val threadE = new Thread(new Runnable { 
      def run() { 
        val numEs = logData.filter(line => line.contains("e")) 
        numEs.saveAsTextFile("hdfs:/t5") 
        println("Lines with e: %s".format( numEs.count)) 
      } 
    }) 
    threadA.start 
    threadB.start 
    threadC.start 
    threadD.start
    threadE.start  
    //sc.stop()

  } 

 //sc.stop()
}
