Actually, I deployed this on yarn cluster(spark-submit) and I couldn't find
any output from the yarn stdout logs


On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Can you make sure you are running locally on more than 1 local cores? You
> could set the master in the SparkConf as conf.setMaster("local[4]"). Then
> see if there are jobs running on every batch of data in the Spark web ui
> (running on localhost:4040). If you still dont get any output, try first
> simple printing recRDD.count() in the foreachRDD (that is, first test spark
> streaming). If you can get that to work, then I would test the Spark SQL
> stuff.
>
> TD
>
>
> On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
>> No errors but no output either... Thanks!
>>
>>
>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Could you elaborate on what is the problem you are facing? Compiler
>>> error? Runtime error? Class-not-found error? Not receiving any data from
>>> Kafka? Receiving data but SQL command throwing error? No errors but no
>>> output either?
>>>
>>> TD
>>>
>>>
>>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Couple days ago, I tried to integrate SQL and streaming together. My
>>>> understanding is I can transform RDD from Dstream to schemaRDD and execute
>>>> SQL on each RDD. But I got no luck
>>>> Would you guys help me take a look at my code?  Thank you very much!
>>>>
>>>> object KafkaSpark {
>>>>
>>>>   def main(args: Array[String]): Unit = {
>>>>     if (args.length < 4) {
>>>>       System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics>
>>>> <numThreads>")
>>>>       System.exit(1)
>>>>     }
>>>>
>>>>
>>>>     val Array(zkQuorum, group, topics, numThreads) = args
>>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>>     val ssc =  new StreamingContext(sparkConf, Seconds(10))
>>>>     val sc = new SparkContext(sparkConf)
>>>>     val sqlContext = new SQLContext(sc);
>>>> //    ssc.checkpoint("checkpoint")
>>>>
>>>>     // Importing the SQL context gives access to all the SQL functions
>>>> and implicit conversions.
>>>>     import sqlContext._
>>>>
>>>>
>>>>     val tt = Time(10000)
>>>>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>> topicpMap).map(t => getRecord(t._2.split("#")))
>>>>
>>>>     val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>>       recRDD.registerAsTable("records")
>>>>       val result = sql("select * from records")
>>>>       println(result)
>>>>       result.foreach(println)
>>>>     })
>>>>
>>>>     ssc.start()
>>>>     ssc.awaitTermination()
>>>>
>>>>   }
>>>>
>>>>   def getRecord(l:Array[String]):Record = {
>>>>     println("Getting the record")
>>>>     Record(l(0), l(1))}
>>>> }
>>>>
>>>>
>>>
>>
>

Reply via email to