Can you make sure you are running locally on more than 1 local cores? You could set the master in the SparkConf as conf.setMaster("local[4]"). Then see if there are jobs running on every batch of data in the Spark web ui (running on localhost:4040). If you still dont get any output, try first simple printing recRDD.count() in the foreachRDD (that is, first test spark streaming). If you can get that to work, then I would test the Spark SQL stuff.
TD On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > No errors but no output either... Thanks! > > > On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Could you elaborate on what is the problem you are facing? Compiler >> error? Runtime error? Class-not-found error? Not receiving any data from >> Kafka? Receiving data but SQL command throwing error? No errors but no >> output either? >> >> TD >> >> >> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> Hi All, >>> >>> Couple days ago, I tried to integrate SQL and streaming together. My >>> understanding is I can transform RDD from Dstream to schemaRDD and execute >>> SQL on each RDD. But I got no luck >>> Would you guys help me take a look at my code? Thank you very much! >>> >>> object KafkaSpark { >>> >>> def main(args: Array[String]): Unit = { >>> if (args.length < 4) { >>> System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics> >>> <numThreads>") >>> System.exit(1) >>> } >>> >>> >>> val Array(zkQuorum, group, topics, numThreads) = args >>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>> val ssc = new StreamingContext(sparkConf, Seconds(10)) >>> val sc = new SparkContext(sparkConf) >>> val sqlContext = new SQLContext(sc); >>> // ssc.checkpoint("checkpoint") >>> >>> // Importing the SQL context gives access to all the SQL functions >>> and implicit conversions. >>> import sqlContext._ >>> >>> >>> val tt = Time(10000) >>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >>> topicpMap).map(t => getRecord(t._2.split("#"))) >>> >>> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >>> recRDD.registerAsTable("records") >>> val result = sql("select * from records") >>> println(result) >>> result.foreach(println) >>> }) >>> >>> ssc.start() >>> ssc.awaitTermination() >>> >>> } >>> >>> def getRecord(l:Array[String]):Record = { >>> println("Getting the record") >>> Record(l(0), l(1))} >>> } >>> >>> >> >