Just to clarify these are the individual prices separated by ','. The below
shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>     dstream.cache()
>     dstream.foreachRDD
>     { pricesRDD =>
>           // Work on individual messages
>       *   for(line <- pricesRDD.collect.toArray)*
>          {
>            var key = line._2.split(',').view(0).toString
>            var ticker =  line._2.split(',').view(1).toString
>            var timeissued = line._2.split(',').view(2).toString
>            var price = line._2.split(',').view(3).toFloat
>            val priceToString = line._2.split(',').view(3)
>             if (price > 90.0)
>            {
>                //save to Hbase table
>            }
>           }
>      }
>
> That works fine.
>
> In Flink I define my source as below
>
>     val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val stream = streamExecEnv
>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to