Hello Gurus,
I am creating some figures and feed them into Kafka and then spark streaming.
It works OK but I have the following issue.
For now as a test I sent 5 prices in each batch interval. In the loop code this
is what is hapening
dstream.foreachRDD { rdd => val x= rdd.count
i += 1 println(s"====> rdd loop i is ${i}, number of lines is ${x}
<======") if (x > 0) { println(s"================processing ${x}
records=================") var words1 =
rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0)
println (words1) var words2 =
rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0)
println (words2) var price =
rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0)
println (price) rdd.collect.foreach(println) } }
My tuple looks like this
// (null, "ID TIMESTAMP PRICE")// (null,
"40,20160426-080924, 67.55738301621814598514")
And this the sample output from the run
================processing 5
records=================320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121)
Now it does process the first values 3, 20160906-212509, 80.224686 for record
(null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4
records. How can I make it go through all records here? I want the third column
from all records!
Greetings