Anyone has any idea on this?
On Tue, Jul 22, 2014 at 7:02 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > But how do they do the interactive sql in the demo? > https://www.youtube.com/watch?v=dJQ5lV5Tldw > > And if it can work in the local mode. I think it should be able to work in > cluster mode, correct? > > > On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer <t...@preferred.jp> wrote: > >> Hi, >> >> as far as I know, after the Streaming Context has started, the processing >> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL >> statement is transformed into RDD operations when the Streaming Context >> starts, I think there is no way to change the statement that is executed on >> the current stream after the StreamingContext has started. >> >> Tobias >> >> >> On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> For example, this is what I tested and work on local mode, what it does >>> is it get data and sql query both from kafka and do sql on each RDD and >>> output the result back to kafka again >>> I defined a var called *sqlS. * In the streaming part as you can see I >>> change the sql statement if it consumes a sql message from kafka then next >>> time when you do *sql(sqlS) *it execute the updated sql query. >>> >>> But this code doesn't work in cluster because sqlS is not updated on all >>> the workers from what I understand. >>> >>> So my question is how do I change the sqlS value at runtime and make all >>> the workers pick the latest value. >>> >>> >>> *var sqlS = "select count(*) from records"* >>> val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = >>> args >>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>> val sc = new SparkContext(sparkConf) >>> val ssc = new StreamingContext(sc, Seconds(2)) >>> val sqlContext = new SQLContext(sc) >>> >>> // Importing the SQL context gives access to all the SQL functions >>> and implicit conversions. >>> import sqlContext._ >>> import sqlContext.createSchemaRDD >>> >>> // val tt = Time(5000) >>> val topicpMap = collection.immutable.HashMap(topic -> >>> numParts.toInt, sqltopic -> 2) >>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS >>> = t._2;* false } else true }).map(t => getRecord(t._2.split("#"))) >>> >>> val zkClient = new ZkClient(zkQuorum, 30000, 30000, >>> ZKStringSerializer) >>> >>> val brokerString = >>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",") >>> >>> KafkaSpark.props.put("metadata.broker.list", brokerString) >>> val config = new ProducerConfig(KafkaSpark.props) >>> val producer = new Producer[String, String](config) >>> >>> val result = recordsStream.foreachRDD((recRDD) => { >>> val schemaRDD = sqlContext.createSchemaRDD(recRDD) >>> schemaRDD.registerAsTable(tName) >>> val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => >>> { s + r.mkString(",") + "\n" }) >>> producer.send(new KeyedMessage[String, String](outputTopic, s"SQL: >>> $sqlS \n $result")) >>> }) >>> ssc.start() >>> ssc.awaitTermination() >>> >>> >>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zonghen...@gmail.com> >>> wrote: >>> >>>> Can you paste a small code example to illustrate your questions? >>>> >>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com <hsy...@gmail.com> >>>> wrote: >>>> > Sorry, typo. What I mean is sharing. If the sql is changing at >>>> runtime, how >>>> > do I broadcast the sql to all workers that is doing sql analysis. >>>> > >>>> > Best, >>>> > Siyuan >>>> > >>>> > >>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zonghen...@gmail.com> >>>> wrote: >>>> >> >>>> >> Do you mean that the texts of the SQL queries being hardcoded in the >>>> >> code? What do you mean by "cannot shar the sql to all workers"? >>>> >> >>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com <hsy...@gmail.com> >>>> >> wrote: >>>> >> > Hi guys, >>>> >> > >>>> >> > I'm able to run some Spark SQL example but the sql is static in the >>>> >> > code. I >>>> >> > would like to know is there a way to read sql from somewhere else >>>> (shell >>>> >> > for >>>> >> > example) >>>> >> > >>>> >> > I could read sql statement from kafka/zookeeper, but I cannot >>>> share the >>>> >> > sql >>>> >> > to all workers. broadcast seems not working for updating values. >>>> >> > >>>> >> > Moreover if I use some non-serializable class(DataInputStream etc) >>>> to >>>> >> > read >>>> >> > sql from other source, I always get "Task not serializable: >>>> >> > java.io.NotSerializableException" >>>> >> > >>>> >> > >>>> >> > Best, >>>> >> > Siyuan >>>> > >>>> > >>>> >>> >>> >> >