Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Jorn,

Thanks I uploaded the Scala code to my GitHub  --> md_streaming.scala

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 23:29, Jörn Franke  wrote:

> Hi Mich,
>
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
>
> Best regards
>
> On 8. Aug 2018, at 00:02, Mich Talebzadeh 
> wrote:
>
> Hi Fabian,
>
> Reading your notes above I have converted the table back to DataStream.
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
>
>val key =
> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>val ticker =
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>val price =
> tableEnv.scan("priceTable").select('price).toDataStream[Row]
>
> My intension is to create an Hbase sink as follows:
>
> // Save prices to Hbase table
>  var p = new Put(new String(key).getBytes())
>  p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
> new String(ticker).getBytes())
>  p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new
> String(timeissued).getBytes())
>  p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
> new String(priceToString).getBytes())
>  p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
> new String(CURRENCY).getBytes())
>  p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
> new String(1.toString).getBytes())
>  p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
> new String(System.currentTimeMillis.toString).getBytes())
>  HbaseTable.put(p)
>  HbaseTable.flushCommits()
>
> However, I don't seem to be able to get the correct values for the columns!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:
>
>> A *Table*Source [1], is a special input connector for Flink's relational
>> APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even
>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>
>> However, there is no *Table*Sink for HBase and you would need to convert
>> the Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>>
>>> Thanks Fabian. That was very useful.
>>>
>>> How about an operation like below?
>>>
>>>  // create builder
>>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>  // set Kafka topic
>>> .forTopic(topicsValue)
>>>  // set Kafka consumer properties
>>> .withKafkaProperties(properties)
>>>  // set Table schema
>>> .withSchema(TableSchema.builder()
>>> .field("key", Types.STRING)
>>> .field("ticker", Types.STRING)
>>> .field("timeissued", Types.STRING)
>>> .field("price", Types.FLOAT)
>>> .build())
>>>
>>> Will that be OK?
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com

Re: Working out through individual messages in Flink

2018-08-07 Thread Jörn Franke
(At the end of your code)

> On 8. Aug 2018, at 00:29, Jörn Franke  wrote:
> 
> Hi Mich,
> 
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
> 
> Best regards 
> 
>> On 8. Aug 2018, at 00:02, Mich Talebzadeh  wrote:
>> 
>> Hi Fabian,
>> 
>> Reading your notes above I have converted the table back to DataStream.
>> 
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
>> 'timeissued, 'price)
>> 
>>val key = 
>> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>>val ticker = 
>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>>val timeissued = 
>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>>val price = 
>> tableEnv.scan("priceTable").select('price).toDataStream[Row]
>> 
>> My intension is to create an Hbase sink as follows:
>> 
>> // Save prices to Hbase table
>>  var p = new Put(new String(key).getBytes())
>>  p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),  
>> new String(ticker).getBytes())
>>  p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new 
>> String(timeissued).getBytes())
>>  p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),   
>> new String(priceToString).getBytes())
>>  p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), 
>> new String(CURRENCY).getBytes())
>>  p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new 
>> String(1.toString).getBytes())
>>  p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new 
>> String(System.currentTimeMillis.toString).getBytes())
>>  HbaseTable.put(p)
>>  HbaseTable.flushCommits()
>> 
>> However, I don't seem to be able to get the correct values for the columns!
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:
>>> A *Table*Source [1], is a special input connector for Flink's relational 
>>> APIs (Table API and SQL) [2].
>>> You can transform and filter with these APIs as well (it's probably even 
>>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>> 
>>> However, there is no *Table*Sink for HBase and you would need to convert 
>>> the Table back to a DataStream [3].
>>> That's not very difficult since the APIs are integrated with each other.
>>> 
>>> Best, Fabian
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>>> [3] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>> 
>>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
 Thanks Fabian. That was very useful.
 
 How about an operation like below?
 
  // create builder
  val KafkaTableSource = Kafka011JsonTableSource.builder()
  // set Kafka topic
 .forTopic(topicsValue)
  // set Kafka consumer properties
 .withKafkaProperties(properties)
  // set Table schema
 .withSchema(TableSchema.builder()
 .field("key", Types.STRING)
 .field("ticker", Types.STRING)
 .field("timeissued", Types.STRING)
 .field("price", Types.FLOAT)
 .build())
 
 Will that be OK? 
 
 
 Dr Mich Talebzadeh
  
 LinkedIn  
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
  
 http://talebzadehmich.wordpress.com
 
 Disclaimer: Use it at your own risk. Any and all responsibility for any 
 loss, damage or destruction of data or any other property which may arise 
 from relying on this email's technical content is explicitly disclaimed. 
 The author will in no case be liable for any monetary damages arising from 
 such loss, damage or destruction.
  
 
 
> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
> Hi,
> 
> Flink processes streams record by record, instead of micro-batching 
> records together. Since every record comes by itself, there is no 
> for-each.
> Simple record-by-record transformations can be done with a 

Re: Working out through individual messages in Flink

2018-08-07 Thread Jörn Franke
Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

> On 8. Aug 2018, at 00:02, Mich Talebzadeh  wrote:
> 
> Hi Fabian,
> 
> Reading your notes above I have converted the table back to DataStream.
> 
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
> 'timeissued, 'price)
> 
>val key = 
> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>val ticker = 
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>val timeissued = 
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>val price = 
> tableEnv.scan("priceTable").select('price).toDataStream[Row]
> 
> My intension is to create an Hbase sink as follows:
> 
> // Save prices to Hbase table
>  var p = new Put(new String(key).getBytes())
>  p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),  new 
> String(ticker).getBytes())
>  p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new 
> String(timeissued).getBytes())
>  p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),   new 
> String(priceToString).getBytes())
>  p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), 
> new String(CURRENCY).getBytes())
>  p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new 
> String(1.toString).getBytes())
>  p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new 
> String(System.currentTimeMillis.toString).getBytes())
>  HbaseTable.put(p)
>  HbaseTable.flushCommits()
> 
> However, I don't seem to be able to get the correct values for the columns!
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:
>> A *Table*Source [1], is a special input connector for Flink's relational 
>> APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even 
>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>> 
>> However, there is no *Table*Sink for HBase and you would need to convert the 
>> Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> 
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>>> Thanks Fabian. That was very useful.
>>> 
>>> How about an operation like below?
>>> 
>>>  // create builder
>>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>  // set Kafka topic
>>> .forTopic(topicsValue)
>>>  // set Kafka consumer properties
>>> .withKafkaProperties(properties)
>>>  // set Table schema
>>> .withSchema(TableSchema.builder()
>>> .field("key", Types.STRING)
>>> .field("ticker", Types.STRING)
>>> .field("timeissued", Types.STRING)
>>> .field("price", Types.FLOAT)
>>> .build())
>>> 
>>> Will that be OK? 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
 On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
 Hi,
 
 Flink processes streams record by record, instead of micro-batching 
 records together. Since every record comes by itself, there is no for-each.
 Simple record-by-record transformations can be done with a MapFunction, 
 filtering out records with a FilterFunction. You can also implement a 
 FlatMapFunction to do both in one step.
 
 Once the stream is transformed and filtered, you can write it to HBase 

Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)

   val key =
tableEnv.scan("priceTable").select('key).toDataStream[Row]
   val ticker =
tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
   val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
   val price =
tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
 p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>  // create builder
>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>  // set Kafka topic
>> .forTopic(topicsValue)
>>  // set Kafka consumer properties
>> .withKafkaProperties(properties)
>>  // set Table schema
>> .withSchema(TableSchema.builder()
>> .field("key", Types.STRING)
>> .field("ticker", Types.STRING)
>> .field("timeissued", Types.STRING)
>> .field("price", Types.FLOAT)
>> .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink processes streams record by record, instead of micro-batching
>>> records together. Since every record comes by itself, there is no for-each.
>>> Simple record-by-record transformations can be done with a MapFunction,
>>> filtering out records with a FilterFunction. You can also implement a
>>> FlatMapFunction to do both in one step.
>>>
>>> Once the stream is transformed and filtered, you can write it to HBase
>>> with a sink function.
>>>
>>>
>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>>>
 Just to clarify these are the individual prices separated by ','. The
 below shows three price lines in the topic

 UUID,

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks again.

The Hbase connector works fine in Flink

// Start Hbase table stuff
val tableName = "MARKETDATAHBASESPEEDFLINK"
val hbaseConf = HBaseConfiguration.create()
//  Connecting to remote Hbase
hbaseConf.set("hbase.master", hbaseHost)
hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost)
hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort)
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
// create this table with column family
val admin = new HBaseAdmin(hbaseConf)
if(!admin.isTableAvailable(tableName))
{
  println("Creating table " + tableName)
  val tableDesc = new HTableDescriptor(tableName)
  tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes()))
  tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes()))
  admin.createTable(tableDesc)
} else {
  println("Table " + tableName + " already exists!!")
}
val HbaseTable = new HTable(hbaseConf, tableName)
// End Hbase table stuff

So I just need to split every row into columns and put it into Hbase as
follows:

// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 //p.add("PRICE_INFO".getBytes(), "key".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
 p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>  // create builder
>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>  // set Kafka topic
>> .forTopic(topicsValue)
>>  // set Kafka consumer properties
>> .withKafkaProperties(properties)
>>  // set Table schema
>> .withSchema(TableSchema.builder()
>> .field("key", Types.STRING)
>> .field("ticker", Types.STRING)
>> .field("timeissued", Types.STRING)
>> .field("price", Types.FLOAT)
>> .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink processes streams record by record, instead of micro-batching
>>> records together. Since every record comes by itself, there is no for-each.
>>> Simple record-by-record transformations 

Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
A *Table*Source [1], is a special input connector for Flink's relational
APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even
easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert
the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset

2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :

> Thanks Fabian. That was very useful.
>
> How about an operation like below?
>
>  // create builder
>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>  // set Kafka topic
> .forTopic(topicsValue)
>  // set Kafka consumer properties
> .withKafkaProperties(properties)
>  // set Table schema
> .withSchema(TableSchema.builder()
> .field("key", Types.STRING)
> .field("ticker", Types.STRING)
> .field("timeissued", Types.STRING)
> .field("price", Types.FLOAT)
> .build())
>
> Will that be OK?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink processes streams record by record, instead of micro-batching
>> records together. Since every record comes by itself, there is no for-each.
>> Simple record-by-record transformations can be done with a MapFunction,
>> filtering out records with a FilterFunction. You can also implement a
>> FlatMapFunction to do both in one step.
>>
>> Once the stream is transformed and filtered, you can write it to HBase
>> with a sink function.
>>
>>
>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>>
>>> Just to clarify these are the individual prices separated by ','. The
>>> below shows three price lines in the topic
>>>
>>> UUID,Security, Time,Price
>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>>> wrote:
>>>

 Hi,

 I have a Kafka topic that transmits 100 security prices ever 2 seconds.

 In Spark streaming I go through the RDD and walk through rows one by
 one and check prices
 In prices are valuable I store them into an Hbase table

 val dstream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
 dstream.cache()
 dstream.foreachRDD
 { pricesRDD =>
   // Work on individual messages
   *   for(line <- pricesRDD.collect.toArray)*
  {
var key = line._2.split(',').view(0).toString
var ticker =  line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
val priceToString = line._2.split(',').view(3)
 if (price > 90.0)
{
//save to Hbase table
}
   }
  }

 That works fine.

 In Flink I define my source as below

 val streamExecEnv = StreamExecutionEnvironment.
 getExecutionEnvironment
 

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks Fabian. That was very useful.

How about an operation like below?

 // create builder
 val KafkaTableSource = Kafka011JsonTableSource.builder()
 // set Kafka topic
.forTopic(topicsValue)
 // set Kafka consumer properties
.withKafkaProperties(properties)
 // set Table schema
.withSchema(TableSchema.builder()
.field("key", Types.STRING)
.field("ticker", Types.STRING)
.field("timeissued", Types.STRING)
.field("price", Types.FLOAT)
.build())

Will that be OK?


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:

> Hi,
>
> Flink processes streams record by record, instead of micro-batching
> records together. Since every record comes by itself, there is no for-each.
> Simple record-by-record transformations can be done with a MapFunction,
> filtering out records with a FilterFunction. You can also implement a
> FlatMapFunction to do both in one step.
>
> Once the stream is transformed and filtered, you can write it to HBase
> with a sink function.
>
>
> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>
>> Just to clarify these are the individual prices separated by ','. The
>> below shows three price lines in the topic
>>
>> UUID,Security, Time,Price
>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>>
>>> In Spark streaming I go through the RDD and walk through rows one by one
>>> and check prices
>>> In prices are valuable I store them into an Hbase table
>>>
>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>> dstream.cache()
>>> dstream.foreachRDD
>>> { pricesRDD =>
>>>   // Work on individual messages
>>>   *   for(line <- pricesRDD.collect.toArray)*
>>>  {
>>>var key = line._2.split(',').view(0).toString
>>>var ticker =  line._2.split(',').view(1).toString
>>>var timeissued = line._2.split(',').view(2).toString
>>>var price = line._2.split(',').view(3).toFloat
>>>val priceToString = line._2.split(',').view(3)
>>> if (price > 90.0)
>>>{
>>>//save to Hbase table
>>>}
>>>   }
>>>  }
>>>
>>> That works fine.
>>>
>>> In Flink I define my source as below
>>>
>>> val streamExecEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val stream = streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>
>>> Is there anyway I can perform similar operation in Flink? I need to go
>>> through every topic load sent and look at the individual rows/ For example
>>> what is the equivalent of
>>>
>>> *for(line <- pricesRDD.collect.toArray)*
>>> In flink?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content 

Re: Working out through individual messages in Flink

2018-07-30 Thread Renjie Liu
Hi, Mich:
You can add write a sink function for that.

On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh 
wrote:

>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
> val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> dstream.cache()
> dstream.foreachRDD
> { pricesRDD =>
>   // Work on individual messages
>   *   for(line <- pricesRDD.collect.toArray)*
>  {
>var key = line._2.split(',').view(0).toString
>var ticker =  line._2.split(',').view(1).toString
>var timeissued = line._2.split(',').view(2).toString
>var price = line._2.split(',').view(3).toFloat
>val priceToString = line._2.split(',').view(3)
> if (price > 90.0)
>{
>//save to Hbase table
>}
>   }
>  }
>
> That works fine.
>
> In Flink I define my source as below
>
> val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
-- 
Liu, Renjie
Software Engineer, MVAD