Hi 

 I am able to save my RDD generated to local file that are coming from Spark
SQL that are getting from Spark Streaming. If i put the steamingcontext to
10 sec the data coming in that 10 sec time window is only processed by my
sql and the data is stored in the location i specified and for next set of
data (streamingcontext) its erroring that the save to folder already exist.
So i increase my time sparkcontext duration to 100 sec for this the data
thats comes in 100 sec window is processed at once and outputting the data
to several files in that folder like 10 different files
(part-0001,part-00002...) each having one or two records. but i want to save
those files to single file. 
Please let me know if there any work around solution for this. 

the code that i am using

case class Record(ID:String,name:String,score:Int,school:String)
case class OutPut(name:String,score:String)
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
<numThreads>")
      System.exit(1)
    }

   //StreamingExamples.setStreamingLogLevels()
    val datenow = new Date()
    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount");     
 val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(100))
 val sqlContext = new SQLContext(sc)
    val timer = Time(100000)
   ssc.remember(Seconds(100))
//    val timenow = new java.util.Date
import sqlContext._
    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
     val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
val fields
=jsonf.map(data=>Record(data("ID").toString,data("name").toString,data("score").toString.toInt,data("school").toString))
fields.print()
//fields.saveAsTextFile("/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`")
val results = fields.foreachRDD((recrdd,timer) => {
recrdd.registerAsTable("table1")
val sqlreport =sqlContext.sql("select max(score) from table1 where ID =
'math' and score > 50")
sqlreport.map(t=>
OutPut(t(0).toString,t(1).toString)).collect().foreach(println)
//println(sqlreport)
//sqlreport.foreach(println)
sqlreport.saveAsTextFile("/home/ubuntu/spark-1.0.0/external/jsonfile2/"+datenow)
})
//results.print()
    ssc.start()
    ssc.awaitTermination()
  }
Thanks,
-Srinivas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to