Actually, I deployed this on yarn cluster(spark-submit) and I couldn't find any output from the yarn stdout logs
On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Can you make sure you are running locally on more than 1 local cores? You > could set the master in the SparkConf as conf.setMaster("local[4]"). Then > see if there are jobs running on every batch of data in the Spark web ui > (running on localhost:4040). If you still dont get any output, try first > simple printing recRDD.count() in the foreachRDD (that is, first test spark > streaming). If you can get that to work, then I would test the Spark SQL > stuff. > > TD > > > On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> No errors but no output either... Thanks! >> >> >> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Could you elaborate on what is the problem you are facing? Compiler >>> error? Runtime error? Class-not-found error? Not receiving any data from >>> Kafka? Receiving data but SQL command throwing error? No errors but no >>> output either? >>> >>> TD >>> >>> >>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> Couple days ago, I tried to integrate SQL and streaming together. My >>>> understanding is I can transform RDD from Dstream to schemaRDD and execute >>>> SQL on each RDD. But I got no luck >>>> Would you guys help me take a look at my code? Thank you very much! >>>> >>>> object KafkaSpark { >>>> >>>> def main(args: Array[String]): Unit = { >>>> if (args.length < 4) { >>>> System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics> >>>> <numThreads>") >>>> System.exit(1) >>>> } >>>> >>>> >>>> val Array(zkQuorum, group, topics, numThreads) = args >>>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>>> val ssc = new StreamingContext(sparkConf, Seconds(10)) >>>> val sc = new SparkContext(sparkConf) >>>> val sqlContext = new SQLContext(sc); >>>> // ssc.checkpoint("checkpoint") >>>> >>>> // Importing the SQL context gives access to all the SQL functions >>>> and implicit conversions. >>>> import sqlContext._ >>>> >>>> >>>> val tt = Time(10000) >>>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >>>> topicpMap).map(t => getRecord(t._2.split("#"))) >>>> >>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >>>> recRDD.registerAsTable("records") >>>> val result = sql("select * from records") >>>> println(result) >>>> result.foreach(println) >>>> }) >>>> >>>> ssc.start() >>>> ssc.awaitTermination() >>>> >>>> } >>>> >>>> def getRecord(l:Array[String]):Record = { >>>> println("Getting the record") >>>> Record(l(0), l(1))} >>>> } >>>> >>>> >>> >> >