Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Hi Fabian,
> 
> Reading your notes above I have converted the table back to DataStream.
> 
>     val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>     tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
> 'timeissued, 'price)
> 
>            val key = 
> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>            val ticker = 
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>            val timeissued = 
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>            val price = 
> tableEnv.scan("priceTable").select('price).toDataStream[Row]
> 
> My intension is to create an Hbase sink as follows:
> 
>             // Save prices to Hbase table
>              var p = new Put(new String(key).getBytes())
>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new 
> String(ticker).getBytes())
>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new 
> String(timeissued).getBytes())
>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new 
> String(priceToString).getBytes())
>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         
> new String(CURRENCY).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new 
> String(1.toString).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new 
> String(System.currentTimeMillis.toString).getBytes())
>              HbaseTable.put(p)
>              HbaseTable.flushCommits()
> 
> However, I don't seem to be able to get the correct values for the columns!
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fhue...@gmail.com> wrote:
>> A *Table*Source [1], is a special input connector for Flink's relational 
>> APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even 
>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>> 
>> However, there is no *Table*Sink for HBase and you would need to convert the 
>> Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> 
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>> Thanks Fabian. That was very useful.
>>> 
>>> How about an operation like below?
>>> 
>>>          // create builder
>>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>          // set Kafka topic
>>>             .forTopic(topicsValue)
>>>          // set Kafka consumer properties
>>>             .withKafkaProperties(properties)
>>>          // set Table schema
>>>         .withSchema(TableSchema.builder()
>>>         .field("key", Types.STRING)
>>>         .field("ticker", Types.STRING)
>>>         .field("timeissued", Types.STRING)
>>>         .field("price", Types.FLOAT)
>>>         .build())
>>> 
>>> Will that be OK? 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote:
>>>> Hi,
>>>> 
>>>> Flink processes streams record by record, instead of micro-batching 
>>>> records together. Since every record comes by itself, there is no for-each.
>>>> Simple record-by-record transformations can be done with a MapFunction, 
>>>> filtering out records with a FilterFunction. You can also implement a 
>>>> FlatMapFunction to do both in one step.
>>>> 
>>>> Once the stream is transformed and filtered, you can write it to HBase 
>>>> with a sink function.
>>>> 
>>>> 
>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>>>> Just to clarify these are the individual prices separated by ','. The 
>>>>> below shows three price lines in the topic
>>>>> 
>>>>> UUID,                            Security,         Time,        Price
>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>> 
>>>>> 
>>>>> Dr Mich Talebzadeh
>>>>>  
>>>>> LinkedIn  
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>  
>>>>> http://talebzadehmich.wordpress.com
>>>>> 
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>> loss, damage or destruction of data or any other property which may arise 
>>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>>> The author will in no case be liable for any monetary damages arising 
>>>>> from such loss, damage or destruction.
>>>>>  
>>>>> 
>>>>> 
>>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>>>>>> <mich.talebza...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. 
>>>>>> 
>>>>>> In Spark streaming I go through the RDD and walk through rows one by one 
>>>>>> and check prices
>>>>>> In prices are valuable I store them into an Hbase table
>>>>>> 
>>>>>>     val dstream = KafkaUtils.createDirectStream[String, String, 
>>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>>     dstream.cache()
>>>>>>     dstream.foreachRDD
>>>>>>     { pricesRDD =>
>>>>>>           // Work on individual messages
>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>          {
>>>>>>            var key = line._2.split(',').view(0).toString
>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>             if (price > 90.0)
>>>>>>            {
>>>>>>                //save to Hbase table
>>>>>>            }
>>>>>>           }
>>>>>>      }
>>>>>> 
>>>>>> That works fine. 
>>>>>> 
>>>>>> In Flink I define my source as below
>>>>>> 
>>>>>>     val streamExecEnv = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     
>>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>     val stream = streamExecEnv
>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
>>>>>> SimpleStringSchema(), properties))
>>>>>> 
>>>>>> Is there anyway I can perform similar operation in Flink? I need to go 
>>>>>> through every topic load sent and look at the individual rows/ For 
>>>>>> example what is the equivalent of 
>>>>>> 
>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>> In flink?
>>>>>> 
>>>>>> Thanks
>>>>>> Dr Mich Talebzadeh
>>>>>>  
>>>>>> LinkedIn  
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>  
>>>>>> http://talebzadehmich.wordpress.com
>>>>>> 
>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>>> loss, damage or destruction of data or any other property which may 
>>>>>> arise from relying on this email's technical content is explicitly 
>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>> damages arising from such loss, damage or destruction.
>>>>>>  
>>>> 
>> 

Reply via email to