Oh yes, we have run sql, streaming and mllib all together.
You can take a look at the demo <https://databricks.com/cloud> that
DataBricks gave at the spark summit.
I think I get the problem is. Sql("....") returns a RDD, and println(rdd)
prints only the RDD's name. And rdd.foreach(println) prints the records in
the executors, so you wont find anything in the driver logs!
So try doing a collect, or take on the RDD returned by sql query and print
that.
TD
On Tue, Jul 15, 2014 at 4:28 PM, [email protected] <[email protected]> wrote:
> By the way, have you ever run SQL and stream together? Do you know any
> example that works? Thanks!
>
>
> On Tue, Jul 15, 2014 at 4:28 PM, [email protected] <[email protected]>
> wrote:
>
>> Hi Tathagata,
>>
>> I could see the output of count, but no sql results. Run in standalone is
>> meaningless for me and I just run in my local single node yarn cluster.
>> Thanks
>>
>>
>> On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das <
>> [email protected]> wrote:
>>
>>> Could you run it locally first to make sure it works, and you see
>>> output? Also, I recommend going through the previous step-by-step approach
>>> to narrow down where the problem is.
>>>
>>> TD
>>>
>>>
>>> On Mon, Jul 14, 2014 at 9:15 PM, [email protected] <[email protected]>
>>> wrote:
>>>
>>>> Actually, I deployed this on yarn cluster(spark-submit) and I couldn't
>>>> find any output from the yarn stdout logs
>>>>
>>>>
>>>> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <
>>>> [email protected]> wrote:
>>>>
>>>>> Can you make sure you are running locally on more than 1 local cores?
>>>>> You could set the master in the SparkConf as conf.setMaster("local[4]").
>>>>> Then see if there are jobs running on every batch of data in the Spark web
>>>>> ui (running on localhost:4040). If you still dont get any output, try
>>>>> first
>>>>> simple printing recRDD.count() in the foreachRDD (that is, first test
>>>>> spark
>>>>> streaming). If you can get that to work, then I would test the Spark SQL
>>>>> stuff.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Mon, Jul 14, 2014 at 5:25 PM, [email protected] <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> No errors but no output either... Thanks!
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Could you elaborate on what is the problem you are facing? Compiler
>>>>>>> error? Runtime error? Class-not-found error? Not receiving any data from
>>>>>>> Kafka? Receiving data but SQL command throwing error? No errors but no
>>>>>>> output either?
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 14, 2014 at 4:06 PM, [email protected] <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Couple days ago, I tried to integrate SQL and streaming together.
>>>>>>>> My understanding is I can transform RDD from Dstream to schemaRDD and
>>>>>>>> execute SQL on each RDD. But I got no luck
>>>>>>>> Would you guys help me take a look at my code? Thank you very much!
>>>>>>>>
>>>>>>>> object KafkaSpark {
>>>>>>>>
>>>>>>>> def main(args: Array[String]): Unit = {
>>>>>>>> if (args.length < 4) {
>>>>>>>> System.err.println("Usage: KafkaSpark <zkQuorum> <group>
>>>>>>>> <topics> <numThreads>")
>>>>>>>> System.exit(1)
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> val Array(zkQuorum, group, topics, numThreads) = args
>>>>>>>> val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>>>>>> val ssc = new StreamingContext(sparkConf, Seconds(10))
>>>>>>>> val sc = new SparkContext(sparkConf)
>>>>>>>> val sqlContext = new SQLContext(sc);
>>>>>>>> // ssc.checkpoint("checkpoint")
>>>>>>>>
>>>>>>>> // Importing the SQL context gives access to all the SQL
>>>>>>>> functions and implicit conversions.
>>>>>>>> import sqlContext._
>>>>>>>>
>>>>>>>>
>>>>>>>> val tt = Time(10000)
>>>>>>>> val topicpMap =
>>>>>>>> topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum,
>>>>>>>> group, topicpMap).map(t => getRecord(t._2.split("#")))
>>>>>>>>
>>>>>>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>>>>>> recRDD.registerAsTable("records")
>>>>>>>> val result = sql("select * from records")
>>>>>>>> println(result)
>>>>>>>> result.foreach(println)
>>>>>>>> })
>>>>>>>>
>>>>>>>> ssc.start()
>>>>>>>> ssc.awaitTermination()
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> def getRecord(l:Array[String]):Record = {
>>>>>>>> println("Getting the record")
>>>>>>>> Record(l(0), l(1))}
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>