Hi Tathagata,

I could see the output of count, but no sql results. Run in standalone is
meaningless for me and I just run in my local single node yarn cluster.
Thanks


On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> Could you run it locally first to make sure it works, and you see output?
> Also, I recommend going through the previous step-by-step approach to
> narrow down where the problem is.
>
> TD
>
>
> On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
>> Actually, I deployed this on yarn cluster(spark-submit) and I couldn't
>> find any output from the yarn stdout logs
>>
>>
>> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Can you make sure you are running locally on more than 1 local cores?
>>> You could set the master in the SparkConf as conf.setMaster("local[4]").
>>> Then see if there are jobs running on every batch of data in the Spark web
>>> ui (running on localhost:4040). If you still dont get any output, try first
>>> simple printing recRDD.count() in the foreachRDD (that is, first test spark
>>> streaming). If you can get that to work, then I would test the Spark SQL
>>> stuff.
>>>
>>> TD
>>>
>>>
>>> On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> No errors but no output either... Thanks!
>>>>
>>>>
>>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Could you elaborate on what is the problem you are facing? Compiler
>>>>> error? Runtime error? Class-not-found error? Not receiving any data from
>>>>> Kafka? Receiving data but SQL command throwing error? No errors but no
>>>>> output either?
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> Couple days ago, I tried to integrate SQL and streaming together. My
>>>>>> understanding is I can transform RDD from Dstream to schemaRDD and 
>>>>>> execute
>>>>>> SQL on each RDD. But I got no luck
>>>>>> Would you guys help me take a look at my code?  Thank you very much!
>>>>>>
>>>>>> object KafkaSpark {
>>>>>>
>>>>>>   def main(args: Array[String]): Unit = {
>>>>>>     if (args.length < 4) {
>>>>>>       System.err.println("Usage: KafkaSpark <zkQuorum> <group>
>>>>>> <topics> <numThreads>")
>>>>>>       System.exit(1)
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     val Array(zkQuorum, group, topics, numThreads) = args
>>>>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>>>>     val ssc =  new StreamingContext(sparkConf, Seconds(10))
>>>>>>     val sc = new SparkContext(sparkConf)
>>>>>>     val sqlContext = new SQLContext(sc);
>>>>>> //    ssc.checkpoint("checkpoint")
>>>>>>
>>>>>>     // Importing the SQL context gives access to all the SQL
>>>>>> functions and implicit conversions.
>>>>>>     import sqlContext._
>>>>>>
>>>>>>
>>>>>>     val tt = Time(10000)
>>>>>>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>> topicpMap).map(t => getRecord(t._2.split("#")))
>>>>>>
>>>>>>     val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>>>>       recRDD.registerAsTable("records")
>>>>>>       val result = sql("select * from records")
>>>>>>       println(result)
>>>>>>       result.foreach(println)
>>>>>>     })
>>>>>>
>>>>>>     ssc.start()
>>>>>>     ssc.awaitTermination()
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   def getRecord(l:Array[String]):Record = {
>>>>>>     println("Getting the record")
>>>>>>     Record(l(0), l(1))}
>>>>>> }
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to