Hi I am able to save my RDD generated to local file that are coming from Spark SQL that are getting from Spark Streaming. If i put the steamingcontext to 10 sec the data coming in that 10 sec time window is only processed by my sql and the data is stored in the location i specified and for next set of data (streamingcontext) its erroring that the save to folder already exist. So i increase my time sparkcontext duration to 100 sec for this the data thats comes in 100 sec window is processed at once and outputting the data to several files in that folder like 10 different files (part-0001,part-00002...) each having one or two records. but i want to save those files to single file. Please let me know if there any work around solution for this.
the code that i am using case class Record(ID:String,name:String,score:Int,school:String) case class OutPut(name:String,score:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } //StreamingExamples.setStreamingLogLevels() val datenow = new Date() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount"); val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(100)) val sqlContext = new SQLContext(sc) val timer = Time(100000) ssc.remember(Seconds(100)) // val timenow = new java.util.Date import sqlContext._ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields =jsonf.map(data=>Record(data("ID").toString,data("name").toString,data("score").toString.toInt,data("school").toString)) fields.print() //fields.saveAsTextFile("/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`") val results = fields.foreachRDD((recrdd,timer) => { recrdd.registerAsTable("table1") val sqlreport =sqlContext.sql("select max(score) from table1 where ID = 'math' and score > 50") sqlreport.map(t=> OutPut(t(0).toString,t(1).toString)).collect().foreach(println) //println(sqlreport) //sqlreport.foreach(println) sqlreport.saveAsTextFile("/home/ubuntu/spark-1.0.0/external/jsonfile2/"+datenow) }) //results.print() ssc.start() ssc.awaitTermination() } Thanks, -Srinivas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html Sent from the Apache Spark User List mailing list archive at Nabble.com.