Anyone has any idea on this?

On Tue, Jul 22, 2014 at 7:02 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> But how do they do the interactive sql in the demo?
> https://www.youtube.com/watch?v=dJQ5lV5Tldw
>
> And if it can work in the local mode. I think it should be able to work in
> cluster mode, correct?
>
>
> On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer <t...@preferred.jp> wrote:
>
>> Hi,
>>
>> as far as I know, after the Streaming Context has started, the processing
>> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
>> statement is transformed into RDD operations when the Streaming Context
>> starts, I think there is no way to change the statement that is executed on
>> the current stream after the StreamingContext has started.
>>
>> Tobias
>>
>>
>> On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com <hsy...@gmail.com>
>> wrote:
>>
>>> For example, this is what I tested and work on local mode, what it does
>>> is it get data and sql query both from kafka and do sql on each RDD and
>>> output the result back to kafka again
>>> I defined a var called *sqlS. * In the streaming part as you can see I
>>> change the sql statement if it consumes a sql message from kafka then next
>>> time when you do *sql(sqlS) *it execute the updated sql query.
>>>
>>> But this code doesn't work in cluster because sqlS is not updated on all
>>> the workers from what I understand.
>>>
>>> So my question is how do I change the sqlS value at runtime and make all
>>> the workers pick the latest value.
>>>
>>>
>>>     *var sqlS = "select count(*) from records"*
>>>     val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
>>> args
>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>     val sc = new SparkContext(sparkConf)
>>>     val ssc = new StreamingContext(sc, Seconds(2))
>>>     val sqlContext = new SQLContext(sc)
>>>
>>>     // Importing the SQL context gives access to all the SQL functions
>>> and implicit conversions.
>>>     import sqlContext._
>>>     import sqlContext.createSchemaRDD
>>>
>>>     //    val tt = Time(5000)
>>>     val topicpMap = collection.immutable.HashMap(topic ->
>>> numParts.toInt, sqltopic -> 2)
>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS
>>> = t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>>>
>>>     val zkClient = new ZkClient(zkQuorum, 30000, 30000,
>>> ZKStringSerializer)
>>>
>>>     val brokerString =
>>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>>>
>>>     KafkaSpark.props.put("metadata.broker.list", brokerString)
>>>     val config = new ProducerConfig(KafkaSpark.props)
>>>     val producer = new Producer[String, String](config)
>>>
>>>     val result = recordsStream.foreachRDD((recRDD) => {
>>>       val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>>>       schemaRDD.registerAsTable(tName)
>>>       val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) =>
>>> { s + r.mkString(",") + "\n" })
>>>       producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
>>> $sqlS \n $result"))
>>>     })
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>
>>>
>>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zonghen...@gmail.com>
>>> wrote:
>>>
>>>> Can you paste a small code example to illustrate your questions?
>>>>
>>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>> wrote:
>>>> > Sorry, typo. What I mean is sharing. If the sql is changing at
>>>> runtime, how
>>>> > do I broadcast the sql to all workers that is doing sql analysis.
>>>> >
>>>> > Best,
>>>> > Siyuan
>>>> >
>>>> >
>>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zonghen...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>>>> >> code? What do you mean by "cannot shar the sql to all workers"?
>>>> >>
>>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>> >> wrote:
>>>> >> > Hi guys,
>>>> >> >
>>>> >> > I'm able to run some Spark SQL example but the sql is static in the
>>>> >> > code. I
>>>> >> > would like to know is there a way to read sql from somewhere else
>>>> (shell
>>>> >> > for
>>>> >> > example)
>>>> >> >
>>>> >> > I could read sql statement from kafka/zookeeper, but I cannot
>>>> share the
>>>> >> > sql
>>>> >> > to all workers. broadcast seems not working for updating values.
>>>> >> >
>>>> >> > Moreover if I use some non-serializable class(DataInputStream etc)
>>>> to
>>>> >> > read
>>>> >> > sql from other source, I always get "Task not serializable:
>>>> >> > java.io.NotSerializableException"
>>>> >> >
>>>> >> >
>>>> >> > Best,
>>>> >> > Siyuan
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to