Re: Spark Streaming Json file groupby function

2015-07-28 Thread swetha


Hi  TD, 

We have a  requirement to maintain the user session state and to
maintain/update the metrics for minute, day and hour granularities for a
user session in our Streaming job. Can I keep those granularities in the
state and recalculate each time there is a change? How would the performance
be impacted?


Thanks, 
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Json file groupby function

2015-07-28 Thread Tathagata Das
If you are trying to keep such long term state, it will be more robust in
the long term to use a dedicated data store (cassandra/HBase/etc.) that is
designed for long term storage.

On Tue, Jul 28, 2015 at 4:37 PM, swetha swethakasire...@gmail.com wrote:



 Hi  TD,

 We have a  requirement to maintain the user session state and to
 maintain/update the metrics for minute, day and hour granularities for a
 user session in our Streaming job. Can I keep those granularities in the
 state and recalculate each time there is a change? How would the
 performance
 be impacted?


 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming Json file groupby function

2014-07-18 Thread srinivas
Hi 

 I am able to save my RDD generated to local file that are coming from Spark
SQL that are getting from Spark Streaming. If i put the steamingcontext to
10 sec the data coming in that 10 sec time window is only processed by my
sql and the data is stored in the location i specified and for next set of
data (streamingcontext) its erroring that the save to folder already exist.
So i increase my time sparkcontext duration to 100 sec for this the data
thats comes in 100 sec window is processed at once and outputting the data
to several files in that folder like 10 different files
(part-0001,part-2...) each having one or two records. but i want to save
those files to single file. 
Please let me know if there any work around solution for this. 

the code that i am using

case class Record(ID:String,name:String,score:Int,school:String)
case class OutPut(name:String,score:String)
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()
val datenow = new Date()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount); 
 val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(100))
 val sqlContext = new SQLContext(sc)
val timer = Time(10)
   ssc.remember(Seconds(100))
//val timenow = new java.util.Date
import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
val fields
=jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString.toInt,data(school).toString))
fields.print()
//fields.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`)
val results = fields.foreachRDD((recrdd,timer) = {
recrdd.registerAsTable(table1)
val sqlreport =sqlContext.sql(select max(score) from table1 where ID =
'math' and score  50)
sqlreport.map(t=
OutPut(t(0).toString,t(1).toString)).collect().foreach(println)
//println(sqlreport)
//sqlreport.foreach(println)
sqlreport.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/+datenow)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
Thanks,
-Srinivas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
hi TD,

  Thanks for the solutions for my previous post...I am running into other
issue..i am getting data from json file and i am trying to parse it and
trying to map it to a record given below 

 val jsonf
=lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))



case class Record(ID:String,name:String,score:Int,school:String)


when i am trying to do this i am getting an error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any
[error]
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))
[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any

I tried giving immutable.Map[Any,Int] and tried converting Int to string my
application compiled but i am getting exception when i am running it 

14/07/17 17:11:30 ERROR Executor: Exception in task ID 6
java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

Basically i am trying to do max operation in my sparksql.
please let me know if their any work around solution for this.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-17 Thread Tathagata Das
This is a basic scala problem. You cannot apply toInt to Any. Try doing
toString.toInt

For such scala issues, I recommend trying it out in the Scala shell. For
example, you could have tried this out as the following.

[tdas @ Xion streaming] scala
Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_45).
Type in expressions to have them evaluated.
Type :help for more information.

scala 12.asInstanceOf[Any].toInt
console:8: error: value toInt is not a member of Any
  12.asInstanceOf[Any].toInt
 ^

scala 12.asInstanceOf[Any].toString.toInt
res1: Int = 12

scala



On Thu, Jul 17, 2014 at 10:32 AM, srinivas kusamsrini...@gmail.com wrote:

 hi TD,

   Thanks for the solutions for my previous post...I am running into other
 issue..i am getting data from json file and i am trying to parse it and
 trying to map it to a record given below

  val jsonf

 =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))



 case class Record(ID:String,name:String,score:Int,school:String)


 when i am trying to do this i am getting an error

 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
 value toInt is not a member of Any
 [error]

 lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))
 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
 value toInt is not a member of Any

 I tried giving immutable.Map[Any,Int] and tried converting Int to string my
 application compiled but i am getting exception when i am running it

 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6
 java.lang.ClassCastException: java.lang.String cannot be cast to
 java.lang.Integer
 at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

 Basically i am trying to do max operation in my sparksql.
 please let me know if their any work around solution for this.

 Thanks,
 -Srinivas.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
Hi TD,
It Worked...Thank you so much for all your help.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-16 Thread srinivas
)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I am trying to enter data to kafka like
{type:math,name:srinivas,score:10,school:lfs}

I am thinking of some thing wrong with input RDD. Please let me know whats
causing this error.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-16 Thread Yin Huai
)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 I am trying to enter data to kafka like
 {type:math,name:srinivas,score:10,school:lfs}

 I am thinking of some thing wrong with input RDD. Please let me know whats
 causing this error.

 Thanks,
 -Srinivas.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-16 Thread Tathagata Das
)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 I am trying to enter data to kafka like
 {type:math,name:srinivas,score:10,school:lfs}

 I am thinking of some thing wrong with input RDD. Please let me know whats
 causing this error.

 Thanks,
 -Srinivas.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Spark Streaming Json file groupby function

2014-07-15 Thread Tathagata Das
I see you have the code to convert to Record class but commented it out.
That is the right way to go. When you are converting it to a 4-tuple with 
(data(type),data(name),data(score),data(school)) ... its of type
(Any, Any, Any, Any)  as data(xyz) returns Any. And registerAsTable
probably doesnt work well with Any as the columns.

@michael any insights?

TD


On Mon, Jul 14, 2014 at 10:07 PM, srinivas kusamsrini...@gmail.com wrote:

 Hi TD,
   Thanks for ur help...i am able to convert map to records using case
 class.
 I am left with doing some aggregations. I am trying to do some SQL type
 operations on my records set. My code looks like

  case class Record(ID:Int,name:String,score:Int,school:String)
 //val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3)))
 val fields = jsonf.map(data =
 (data(type),data(name),data(score),data(school)))
 val results = fields.transform((rdd,time) = {
  rdd.registerAsTable(table1)
  sqlc.sql(select * from table1)
 })

 when i am trying to compile my code it  giving me
 jsonfile.scala:30: value registerAsTable is not a member of
 org.apache.spark.rdd.RDD[(Any, Any, Any, Any)]

 Please let me know if i am missing any thing.
 And using Spark Streaming can i really use sql kind of operations on
 Dstreams?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-15 Thread srinivas
I am still getting the error...even if i convert it to record
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sql = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sql)
val timer = Time(1)
   // ssc.checkpoint(checkpoint)

//import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])
 case class Record(ID:String,name:String,score:String,school:String)
val fields =
jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
val results = fields.transform((recrdd,tt) = {
 recrdd.registerAsTable(table1)
 val results = sql(select * from table1)
 println(results)
 results.foreach(println)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
}

I am getting error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36:
value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record]
[error]  recrdd.registerAsTable(table1)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed


Please look into this and let me know if i am missing any thing.

Thanks,
-Srinivas.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-15 Thread Tathagata Das
You need to have

import sqlContext._

so just uncomment that and it should work.

TD


On Tue, Jul 15, 2014 at 1:40 PM, srinivas kusamsrini...@gmail.com wrote:

 I am still getting the error...even if i convert it to record
 object KafkaWordCount {
   def main(args: Array[String]) {
 if (args.length  4) {
   System.err.println(Usage: KafkaWordCount zkQuorum group topics
 numThreads)
   System.exit(1)
 }

//StreamingExamples.setStreamingLogLevels()

 val Array(zkQuorum, group, topics, numThreads) = args
 val sparkConf = new SparkConf().setAppName(KafkaWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(10))
 val sql = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sql)
 val timer = Time(1)
// ssc.checkpoint(checkpoint)

 //import sqlContext._
 val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).map(_._2)
  val jsonf =

 lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])
  case class Record(ID:String,name:String,score:String,school:String)
 val fields =

 jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
 val results = fields.transform((recrdd,tt) = {
  recrdd.registerAsTable(table1)
  val results = sql(select * from table1)
  println(results)
  results.foreach(println)
 })
 //results.print()
 ssc.start()
 ssc.awaitTermination()
   }
 }

 I am getting error

 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36:
 value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record]
 [error]  recrdd.registerAsTable(table1)
 [error] ^
 [error] one error found
 [error] (compile:compile) Compilation failed


 Please look into this and let me know if i am missing any thing.

 Thanks,
 -Srinivas.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-15 Thread srinivas
Hi TD,

I uncomment import sqlContext._ and tried to compile the code

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sql = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sql)
val timer = Time(1)
   // ssc.checkpoint(checkpoint)

import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
 case class Record(ID:String,name:String,score:String,school:String)
val fields =
jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
val results = fields.transform((recrdd,tt) = {
 recrdd.registerAsTable(table1)
 val results = sql(select * from table1)
 println(results)
 results.foreach(println)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
}

but received the error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38:
No TypeTag available for Record
[error]  recrdd.registerAsTable(table1)
[error]  ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM


Please advice me on how to proceed

Thanks,
-Srinivas.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-15 Thread Tathagata Das
Can you try defining the case class outside the main function. In fact
outside the object?

TD



On Tue, Jul 15, 2014 at 8:20 PM, srinivas kusamsrini...@gmail.com wrote:

 Hi TD,

 I uncomment import sqlContext._ and tried to compile the code

 import java.util.Properties
 import kafka.producer._
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.SparkConf
 import scala.util.parsing.json.JSON
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 object KafkaWordCount {
   def main(args: Array[String]) {
 if (args.length  4) {
   System.err.println(Usage: KafkaWordCount zkQuorum group topics
 numThreads)
   System.exit(1)
 }

//StreamingExamples.setStreamingLogLevels()

 val Array(zkQuorum, group, topics, numThreads) = args
 val sparkConf = new SparkConf().setAppName(KafkaWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(10))
 val sql = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sql)
 val timer = Time(1)
// ssc.checkpoint(checkpoint)

 import sqlContext._
 val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).map(_._2)
  val jsonf =

 lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
  case class Record(ID:String,name:String,score:String,school:String)
 val fields =

 jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
 val results = fields.transform((recrdd,tt) = {
  recrdd.registerAsTable(table1)
  val results = sql(select * from table1)
  println(results)
  results.foreach(println)
 })
 //results.print()
 ssc.start()
 ssc.awaitTermination()
   }
 }

 but received the error

 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38:
 No TypeTag available for Record
 [error]  recrdd.registerAsTable(table1)
 [error]  ^
 [error] one error found
 [error] (compile:compile) Compilation failed
 [error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM


 Please advice me on how to proceed

 Thanks,
 -Srinivas.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
hi 
  I am new to spark and scala and I am trying to do some aggregations on
json file stream using Spark Streaming. I am able to parse the json string
and it is converted to map(id - 123, name - srini, mobile - 12324214,
score - 123, test_type - math) now i want to use GROUPBY function on each
student map data and wanted to do some aggregations on scores. Here is my
main function 
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
   // ssc.checkpoint(checkpoint)

val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

 
jsonf.print()

ssc.start()
ssc.awaitTermination()
  }

Can anyone please Let me know how to use groupby function..thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread Tathagata Das
You have to import StreamingContext._  to enable groupByKey operations on
DStreams. After importing that you can apply groupByKey on any DStream,
that is a DStream of key-value pairs (e.g. DStream[(String, Int)]) . The
data in each pair RDDs will be grouped by the first element in the tuple as
the grouping element.

TD


On Mon, Jul 14, 2014 at 10:59 AM, srinivas kusamsrini...@gmail.com wrote:

 hi
   I am new to spark and scala and I am trying to do some aggregations on
 json file stream using Spark Streaming. I am able to parse the json string
 and it is converted to map(id - 123, name - srini, mobile - 12324214,
 score - 123, test_type - math) now i want to use GROUPBY function on each
 student map data and wanted to do some aggregations on scores. Here is my
 main function
 val Array(zkQuorum, group, topics, numThreads) = args
 val sparkConf = new SparkConf().setAppName(KafkaWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(10))
// ssc.checkpoint(checkpoint)

 val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).map(_._2)
  val jsonf =

 lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])


 jsonf.print()

 ssc.start()
 ssc.awaitTermination()
   }

 Can anyone please Let me know how to use groupby function..thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id - 123, name - srini, mobile - 12324214, score - 123, test_type
- math)
 map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type
-sci)
 map(id - 432, name -, mobile -423141234,score - 322,test_type -
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id - 123, name - srini, mobile - 12324214, score - 123, test_type
- math)
 map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type
-sci)
 map(id - 432, name -, mobile -423141234,score - 322,test_type -
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread Tathagata Das
In general it may be a better idea to actually convert the records from
hashmaps, to a specific data structure. Say

case class Record(id: int, name: String, mobile: String, score: Int,
test_type: String ... )

Then you should be able to do something like

val records = jsonf.map(m = convertMapToRecord(m))

Then to filter only math results you can do records.filter(r = r.test_type
== math). ...

If you have to do aggregations (sum, max, etc.) you have to figure out
whether you want to aggregate in every batch, or aggregate over a window of
time.

If you want to do each batch, then

filteredRecords.foreachRDD(rdd = {
   // get aggregates for each batch
})

If you want to do across a window of time (say 1 minute), then

filteredRecords.window(Minutes(1)).foreachRDD( rdd = {
   // get aggregates over last 1 minute, every 10 seconds (since 10 second
is the batch interval)
})




On Mon, Jul 14, 2014 at 3:06 PM, srinivas kusamsrini...@gmail.com wrote:

 Hi,
   Thanks for ur reply...i imported StreamingContext and right now i am
 getting my Dstream as something like
  map(id - 123, name - srini, mobile - 12324214, score - 123, test_type
 - math)
  map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type
 -sci)
  map(id - 432, name -, mobile -423141234,score - 322,test_type -
 math)

 each map collection is from json string. now if i want aggregrate the
 scores
 on only math or if i want to find out who got the highest score in math
 that
 shows both name and score..i would like to what transformation should i do
 to my existing dstream.I am very new to dealing with maps and dstream
 transformations..so please advise on how to proceed from here.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi TD,
  Thanks for ur help...i am able to convert map to records using case class.
I am left with doing some aggregations. I am trying to do some SQL type
operations on my records set. My code looks like

 case class Record(ID:Int,name:String,score:Int,school:String)
//val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3)))
val fields = jsonf.map(data =
(data(type),data(name),data(score),data(school)))
val results = fields.transform((rdd,time) = {
 rdd.registerAsTable(table1)
 sqlc.sql(select * from table1)
})

when i am trying to compile my code it  giving me 
jsonfile.scala:30: value registerAsTable is not a member of
org.apache.spark.rdd.RDD[(Any, Any, Any, Any)]

Please let me know if i am missing any thing.
And using Spark Streaming can i really use sql kind of operations on
Dstreams?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.