Hi Jacek and Cody, First of all, thanks for helping me out.
I started with using combineByKey while testing with just one field. Of course it worked fine, but I was worried that the code would become unreadable if there were many fields. Which is why I shifted to sqlContext because the code is comprehensible. Let me work out the stream statistics and update you in a while. Regards, Siva > On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote: > > Rather > > val df = sqlContext.read.json(rdd) > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <siva.kuma...@me.com> wrote: >> Cody, >> >> Are you referring to the val lines = messages.map(_._2)? >> >> Regards, >> >> Siva >> >>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <c...@koeninger.org> wrote: >>> >>> Doesn't that result in consuming each RDD twice, in order to infer the >>> json schema? >>> >>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> wrote: >>>> Of course :) >>>> >>>> object sparkStreaming { >>>> def main(args: Array[String]) { >>>> StreamingExamples.setStreamingLogLevels() //Set reasonable logging >>>> levels for streaming if the user has not configured log4j. >>>> val topics = "test" >>>> val brokers = "localhost:9092" >>>> val topicsSet = topics.split(",").toSet >>>> val sparkConf = new >>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local") >>>> //spark://localhost:7077 >>>> val sc = new SparkContext(sparkConf) >>>> val ssc = new StreamingContext(sc, Seconds(30)) >>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >>>> val messages = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) >>>> val lines = messages.map(_._2) >>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>>> lines.foreachRDD( rdd => { >>>> val df = sqlContext.read.json(rdd) >>>> df.registerTempTable(“drone") >>>> sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), >>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show() >>>> }) >>>> ssc.start() >>>> ssc.awaitTermination() >>>> } >>>> } >>>> >>>> I haven’t checked long running performance though. >>>> >>>> Regards, >>>> >>>> Siva >>>> >>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote: >>>> >>>> Hi, >>>> >>>> Good to hear so! Mind sharing a few snippets of your solution? >>>> >>>> Pozdrawiam, >>>> Jacek Laskowski >>>> ---- >>>> https://medium.com/@jaceklaskowski/ >>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark >>>> Follow me at https://twitter.com/jaceklaskowski >>>> >>>> >>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote: >>>> >>>> Thanks Jacek, >>>> >>>> Job completed!! :) Just used data frames and sql query. Very clean and >>>> functional code. >>>> >>>> Siva >>>> >>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote: >>>> >>>> mapWithState >>>> >>>> >>>> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org