Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much!
object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length < 4) { System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaSpark") val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); // ssc.checkpoint("checkpoint") // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(10000) val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t => getRecord(t._2.split("#"))) val result = recordsStream.foreachRDD((recRDD, tt)=>{ recRDD.registerAsTable("records") val result = sql("select * from records") println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println("Getting the record") Record(l(0), l(1))} }