Hi,
I have a Kafka topic that transmits 100 security prices ever 2 seconds.
In Spark streaming I go through the RDD and walk through rows one by one
and check prices
In prices are valuable I store them into an Hbase table
val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
dstream.cache()
dstream.foreachRDD
{ pricesRDD =>
// Work on individual messages
* for(line <- pricesRDD.collect.toArray)*
{
var key = line._2.split(',').view(0).toString
var ticker = line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
val priceToString = line._2.split(',').view(3)
if (price > 90.0)
{
//save to Hbase table
}
}
}
That works fine.
In Flink I define my source as below
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = streamExecEnv
.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
Is there anyway I can perform similar operation in Flink? I need to go
through every topic load sent and look at the individual rows/ For example
what is the equivalent of
*for(line <- pricesRDD.collect.toArray)*
In flink?
Thanks
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.