Re: Spark Streaming Json file groupby function
Hi TD, We have a requirement to maintain the user session state and to maintain/update the metrics for minute, day and hour granularities for a user session in our Streaming job. Can I keep those granularities in the state and recalculate each time there is a change? How would the performance be impacted? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Json file groupby function
If you are trying to keep such long term state, it will be more robust in the long term to use a dedicated data store (cassandra/HBase/etc.) that is designed for long term storage. On Tue, Jul 28, 2015 at 4:37 PM, swetha swethakasire...@gmail.com wrote: Hi TD, We have a requirement to maintain the user session state and to maintain/update the metrics for minute, day and hour granularities for a user session in our Streaming job. Can I keep those granularities in the state and recalculate each time there is a change? How would the performance be impacted? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Json file groupby function
Hi I am able to save my RDD generated to local file that are coming from Spark SQL that are getting from Spark Streaming. If i put the steamingcontext to 10 sec the data coming in that 10 sec time window is only processed by my sql and the data is stored in the location i specified and for next set of data (streamingcontext) its erroring that the save to folder already exist. So i increase my time sparkcontext duration to 100 sec for this the data thats comes in 100 sec window is processed at once and outputting the data to several files in that folder like 10 different files (part-0001,part-2...) each having one or two records. but i want to save those files to single file. Please let me know if there any work around solution for this. the code that i am using case class Record(ID:String,name:String,score:Int,school:String) case class OutPut(name:String,score:String) object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val datenow = new Date() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount); val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(100)) val sqlContext = new SQLContext(sc) val timer = Time(10) ssc.remember(Seconds(100)) //val timenow = new java.util.Date import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val fields =jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString.toInt,data(school).toString)) fields.print() //fields.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`) val results = fields.foreachRDD((recrdd,timer) = { recrdd.registerAsTable(table1) val sqlreport =sqlContext.sql(select max(score) from table1 where ID = 'math' and score 50) sqlreport.map(t= OutPut(t(0).toString,t(1).toString)).collect().foreach(println) //println(sqlreport) //sqlreport.foreach(println) sqlreport.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/+datenow) }) //results.print() ssc.start() ssc.awaitTermination() } Thanks, -Srinivas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
hi TD, Thanks for the solutions for my previous post...I am running into other issue..i am getting data from json file and i am trying to parse it and trying to map it to a record given below val jsonf =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) case class Record(ID:String,name:String,score:Int,school:String) when i am trying to do this i am getting an error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any [error] lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any I tried giving immutable.Map[Any,Int] and tried converting Int to string my application compiled but i am getting exception when i am running it 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) Basically i am trying to do max operation in my sparksql. please let me know if their any work around solution for this. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
This is a basic scala problem. You cannot apply toInt to Any. Try doing toString.toInt For such scala issues, I recommend trying it out in the Scala shell. For example, you could have tried this out as the following. [tdas @ Xion streaming] scala Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45). Type in expressions to have them evaluated. Type :help for more information. scala 12.asInstanceOf[Any].toInt console:8: error: value toInt is not a member of Any 12.asInstanceOf[Any].toInt ^ scala 12.asInstanceOf[Any].toString.toInt res1: Int = 12 scala On Thu, Jul 17, 2014 at 10:32 AM, srinivas kusamsrini...@gmail.com wrote: hi TD, Thanks for the solutions for my previous post...I am running into other issue..i am getting data from json file and i am trying to parse it and trying to map it to a record given below val jsonf =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) case class Record(ID:String,name:String,score:Int,school:String) when i am trying to do this i am getting an error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any [error] lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString)) [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36: value toInt is not a member of Any I tried giving immutable.Map[Any,Int] and tried converting Int to string my application compiled but i am getting exception when i am running it 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) Basically i am trying to do max operation in my sparksql. please let me know if their any work around solution for this. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi TD, It Worked...Thank you so much for all your help. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10132.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I am trying to enter data to kafka like {type:math,name:srinivas,score:10,school:lfs} I am thinking of some thing wrong with input RDD. Please let me know whats causing this error. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I am trying to enter data to kafka like {type:math,name:srinivas,score:10,school:lfs} I am thinking of some thing wrong with input RDD. Please let me know whats causing this error. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I am trying to enter data to kafka like {type:math,name:srinivas,score:10,school:lfs} I am thinking of some thing wrong with input RDD. Please let me know whats causing this error. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
I see you have the code to convert to Record class but commented it out. That is the right way to go. When you are converting it to a 4-tuple with (data(type),data(name),data(score),data(school)) ... its of type (Any, Any, Any, Any) as data(xyz) returns Any. And registerAsTable probably doesnt work well with Any as the columns. @michael any insights? TD On Mon, Jul 14, 2014 at 10:07 PM, srinivas kusamsrini...@gmail.com wrote: Hi TD, Thanks for ur help...i am able to convert map to records using case class. I am left with doing some aggregations. I am trying to do some SQL type operations on my records set. My code looks like case class Record(ID:Int,name:String,score:Int,school:String) //val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3))) val fields = jsonf.map(data = (data(type),data(name),data(score),data(school))) val results = fields.transform((rdd,time) = { rdd.registerAsTable(table1) sqlc.sql(select * from table1) }) when i am trying to compile my code it giving me jsonfile.scala:30: value registerAsTable is not a member of org.apache.spark.rdd.RDD[(Any, Any, Any, Any)] Please let me know if i am missing any thing. And using Spark Streaming can i really use sql kind of operations on Dstreams? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
I am still getting the error...even if i convert it to record object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) //import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString)) val results = fields.transform((recrdd,tt) = { recrdd.registerAsTable(table1) val results = sql(select * from table1) println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } I am getting error [error] /home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record] [error] recrdd.registerAsTable(table1) [error] ^ [error] one error found [error] (compile:compile) Compilation failed Please look into this and let me know if i am missing any thing. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
You need to have import sqlContext._ so just uncomment that and it should work. TD On Tue, Jul 15, 2014 at 1:40 PM, srinivas kusamsrini...@gmail.com wrote: I am still getting the error...even if i convert it to record object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) //import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString)) val results = fields.transform((recrdd,tt) = { recrdd.registerAsTable(table1) val results = sql(select * from table1) println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } I am getting error [error] /home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record] [error] recrdd.registerAsTable(table1) [error] ^ [error] one error found [error] (compile:compile) Compilation failed Please look into this and let me know if i am missing any thing. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi TD, I uncomment import sqlContext._ and tried to compile the code import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString)) val results = fields.transform((recrdd,tt) = { recrdd.registerAsTable(table1) val results = sql(select * from table1) println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } but received the error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38: No TypeTag available for Record [error] recrdd.registerAsTable(table1) [error] ^ [error] one error found [error] (compile:compile) Compilation failed [error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM Please advice me on how to proceed Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Can you try defining the case class outside the main function. In fact outside the object? TD On Tue, Jul 15, 2014 at 8:20 PM, srinivas kusamsrini...@gmail.com wrote: Hi TD, I uncomment import sqlContext._ and tried to compile the code import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString)) val results = fields.transform((recrdd,tt) = { recrdd.registerAsTable(table1) val results = sql(select * from table1) println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } but received the error [error] /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38: No TypeTag available for Record [error] recrdd.registerAsTable(table1) [error] ^ [error] one error found [error] (compile:compile) Compilation failed [error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM Please advice me on how to proceed Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Json file groupby function
hi I am new to spark and scala and I am trying to do some aggregations on json file stream using Spark Streaming. I am able to parse the json string and it is converted to map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) now i want to use GROUPBY function on each student map data and wanted to do some aggregations on scores. Here is my main function val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) // ssc.checkpoint(checkpoint) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonf.print() ssc.start() ssc.awaitTermination() } Can anyone please Let me know how to use groupby function..thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
You have to import StreamingContext._ to enable groupByKey operations on DStreams. After importing that you can apply groupByKey on any DStream, that is a DStream of key-value pairs (e.g. DStream[(String, Int)]) . The data in each pair RDDs will be grouped by the first element in the tuple as the grouping element. TD On Mon, Jul 14, 2014 at 10:59 AM, srinivas kusamsrini...@gmail.com wrote: hi I am new to spark and scala and I am trying to do some aggregations on json file stream using Spark Streaming. I am able to parse the json string and it is converted to map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) now i want to use GROUPBY function on each student map data and wanted to do some aggregations on scores. Here is my main function val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) // ssc.checkpoint(checkpoint) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonf.print() ssc.start() ssc.awaitTermination() } Can anyone please Let me know how to use groupby function..thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9656.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
In general it may be a better idea to actually convert the records from hashmaps, to a specific data structure. Say case class Record(id: int, name: String, mobile: String, score: Int, test_type: String ... ) Then you should be able to do something like val records = jsonf.map(m = convertMapToRecord(m)) Then to filter only math results you can do records.filter(r = r.test_type == math). ... If you have to do aggregations (sum, max, etc.) you have to figure out whether you want to aggregate in every batch, or aggregate over a window of time. If you want to do each batch, then filteredRecords.foreachRDD(rdd = { // get aggregates for each batch }) If you want to do across a window of time (say 1 minute), then filteredRecords.window(Minutes(1)).foreachRDD( rdd = { // get aggregates over last 1 minute, every 10 seconds (since 10 second is the batch interval) }) On Mon, Jul 14, 2014 at 3:06 PM, srinivas kusamsrini...@gmail.com wrote: Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi TD, Thanks for ur help...i am able to convert map to records using case class. I am left with doing some aggregations. I am trying to do some SQL type operations on my records set. My code looks like case class Record(ID:Int,name:String,score:Int,school:String) //val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3))) val fields = jsonf.map(data = (data(type),data(name),data(score),data(school))) val results = fields.transform((rdd,time) = { rdd.registerAsTable(table1) sqlc.sql(select * from table1) }) when i am trying to compile my code it giving me jsonfile.scala:30: value registerAsTable is not a member of org.apache.spark.rdd.RDD[(Any, Any, Any, Any)] Please let me know if i am missing any thing. And using Spark Streaming can i really use sql kind of operations on Dstreams? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html Sent from the Apache Spark User List mailing list archive at Nabble.com.