Could you elaborate on what is the problem you are facing? Compiler error?
Runtime error? Class-not-found error? Not receiving any data from Kafka?
Receiving data but SQL command throwing error? No errors but no output
either?

TD


On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Hi All,
>
> Couple days ago, I tried to integrate SQL and streaming together. My
> understanding is I can transform RDD from Dstream to schemaRDD and execute
> SQL on each RDD. But I got no luck
> Would you guys help me take a look at my code?  Thank you very much!
>
> object KafkaSpark {
>
>   def main(args: Array[String]): Unit = {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics>
> <numThreads>")
>       System.exit(1)
>     }
>
>
>     val Array(zkQuorum, group, topics, numThreads) = args
>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>     val ssc =  new StreamingContext(sparkConf, Seconds(10))
>     val sc = new SparkContext(sparkConf)
>     val sqlContext = new SQLContext(sc);
> //    ssc.checkpoint("checkpoint")
>
>     // Importing the SQL context gives access to all the SQL functions and
> implicit conversions.
>     import sqlContext._
>
>
>     val tt = Time(10000)
>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(t => getRecord(t._2.split("#")))
>
>     val result = recordsStream.foreachRDD((recRDD, tt)=>{
>       recRDD.registerAsTable("records")
>       val result = sql("select * from records")
>       println(result)
>       result.foreach(println)
>     })
>
>     ssc.start()
>     ssc.awaitTermination()
>
>   }
>
>   def getRecord(l:Array[String]):Record = {
>     println("Getting the record")
>     Record(l(0), l(1))}
> }
>
>

Reply via email to