Re: Getting figures from spark streaming
Just a guess, but doesn't the `.apply(0)' at the end of each of your print statements take just the first one of the returned list? On Wed, Sep 7, 2016 at 12:36 AM Ashok Kumarwrote: > Any help on this warmly appreciated. > > > On Tuesday, 6 September 2016, 21:31, Ashok Kumar > wrote: > > > Hello Gurus, > > I am creating some figures and feed them into Kafka and then spark > streaming. > > It works OK but I have the following issue. > > For now as a test I sent 5 prices in each batch interval. In the loop code > this is what is hapening > > dstream.foreachRDD { rdd => > val x= rdd.count > i += 1 > println(s"> rdd loop i is ${i}, number of lines is ${x} <==") > if (x > 0) { >println(s"processing ${x} records=") >var words1 = > rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0) > println (words1) >var words2 = > rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0) > println (words2) >var price = > rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0) > println (price) > rdd.collect.foreach(println) >} > } > > My tuple looks like this > > // (null, "ID TIMESTAMP PRICE") > // (null, "40,20160426-080924, 67.55738301621814598514") > > And this the sample output from the run > > processing 5 records= > 3 > 20160906-212509 > 80.224686 > (null,3,20160906-212509,80.22468448052631637099) > (null,1,20160906-212509,60.40695324215582386153) > (null,4,20160906-212509,61.95159400693415572125) > (null,2,20160906-212509,93.05912099305473237788) > (null,5,20160906-212509,81.08637370113427387121) > > Now it does process the first values 3, 20160906-212509, 80.224686 for > record (null,3,20160906-212509,80.22468448052631637099) > but ignores the rest. of 4 records. How can I make it go through all > records here? I want the third column from all records! > > Greetings > > > > > >
Re: Getting figures from spark streaming
Any help on this warmly appreciated. On Tuesday, 6 September 2016, 21:31, Ashok Kumarwrote: Hello Gurus, I am creating some figures and feed them into Kafka and then spark streaming. It works OK but I have the following issue. For now as a test I sent 5 prices in each batch interval. In the loop code this is what is hapening dstream.foreachRDD { rdd => val x= rdd.count i += 1 println(s"> rdd loop i is ${i}, number of lines is ${x} <==") if (x > 0) { println(s"processing ${x} records=") var words1 = rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0) println (words1) var words2 = rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0) println (words2) var price = rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0) println (price) rdd.collect.foreach(println) } } My tuple looks like this // (null, "ID TIMESTAMP PRICE")// (null, "40,20160426-080924, 67.55738301621814598514") And this the sample output from the run processing 5 records=320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121) Now it does process the first values 3, 20160906-212509, 80.224686 for record (null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 records. How can I make it go through all records here? I want the third column from all records! Greetings
Getting figures from spark streaming
Hello Gurus, I am creating some figures and feed them into Kafka and then spark streaming. It works OK but I have the following issue. For now as a test I sent 5 prices in each batch interval. In the loop code this is what is hapening dstream.foreachRDD { rdd => val x= rdd.count i += 1 println(s"> rdd loop i is ${i}, number of lines is ${x} <==") if (x > 0) { println(s"processing ${x} records=") var words1 = rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0) println (words1) var words2 = rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0) println (words2) var price = rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0) println (price) rdd.collect.foreach(println) } } My tuple looks like this // (null, "ID TIMESTAMP PRICE")// (null, "40,20160426-080924, 67.55738301621814598514") And this the sample output from the run processing 5 records=320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121) Now it does process the first values 3, 20160906-212509, 80.224686 for record (null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 records. How can I make it go through all records here? I want the third column from all records! Greetings