Hi, Mich:
You can add write a sink function for that.
On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh <[email protected]>
wrote:
>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
> val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> dstream.cache()
> dstream.foreachRDD
> { pricesRDD =>
> // Work on individual messages
> * for(line <- pricesRDD.collect.toArray)*
> {
> var key = line._2.split(',').view(0).toString
> var ticker = line._2.split(',').view(1).toString
> var timeissued = line._2.split(',').view(2).toString
> var price = line._2.split(',').view(3).toFloat
> val priceToString = line._2.split(',').view(3)
> if (price > 90.0)
> {
> //save to Hbase table
> }
> }
> }
>
> That works fine.
>
> In Flink I define my source as below
>
> val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = streamExecEnv
> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
--
Liu, Renjie
Software Engineer, MVAD