Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either?
TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Hi All, > > Couple days ago, I tried to integrate SQL and streaming together. My > understanding is I can transform RDD from Dstream to schemaRDD and execute > SQL on each RDD. But I got no luck > Would you guys help me take a look at my code? Thank you very much! > > object KafkaSpark { > > def main(args: Array[String]): Unit = { > if (args.length < 4) { > System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics> > <numThreads>") > System.exit(1) > } > > > val Array(zkQuorum, group, topics, numThreads) = args > val sparkConf = new SparkConf().setAppName("KafkaSpark") > val ssc = new StreamingContext(sparkConf, Seconds(10)) > val sc = new SparkContext(sparkConf) > val sqlContext = new SQLContext(sc); > // ssc.checkpoint("checkpoint") > > // Importing the SQL context gives access to all the SQL functions and > implicit conversions. > import sqlContext._ > > > val tt = Time(10000) > val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap > val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, > topicpMap).map(t => getRecord(t._2.split("#"))) > > val result = recordsStream.foreachRDD((recRDD, tt)=>{ > recRDD.registerAsTable("records") > val result = sql("select * from records") > println(result) > result.foreach(println) > }) > > ssc.start() > ssc.awaitTermination() > > } > > def getRecord(l:Array[String]):Record = { > println("Getting the record") > Record(l(0), l(1))} > } > >