Hi Mich,

We can't convert a DataStream to a value. There are some options:
1. Use a TableSink to write data[1] into Hbase.
2. Use a UDF[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions

On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I need this operation to stored filtered rows in an Hbase table.
>
> I can access an existing Hbase table through flink API
>
> My challenge is to put rows into Hbase table. Something like below and I
> don't seem to be able to extract individual column values from priceTable
>
>
>
>
> *                    val key =
> tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
> val ticker =
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
> val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
> val price =
> tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
>            val CURRENCY = "GBP"
>            val op_type = "1"
>            val op_time = System.currentTimeMillis.toString
> /*
>            if (price > 99.0)
>            {
>             // Save prices to Hbase table
>              var p = new Put(new String(key).getBytes())
>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
> new String(ticker).getBytes())
>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
> String(timeissued).getBytes())
>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
> new String(priceToString).getBytes())
>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
> new String(CURRENCY).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
> new String(1.toString).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
> new String(System.currentTimeMillis.toString).getBytes())
>              HbaseTable.put(p)
>              HbaseTable.flushCommits()
>              if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
> 'price > 99.0))
>              {
>                sqltext = Calendar.getInstance.getTime.toString + ", Price
> on "+ticker+" hit " +price.toString
>                //java.awt.Toolkit.getDefaultToolkit().beep()
>                println(sqltext)
>              }
>            }
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The following works fine
>>
>>    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>     val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
>> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>>     val r = result.toDataStream[Row]
>>     r.print()
>>
>> Now I would like to get the individual column values from priceTable into
>> local variables
>>
>> This does not seem to work
>>
>> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>> val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[
>> Row]
>>  val timeissued = tableEnv.scan("priceTable").select('timeissued).
>> toDataStream[Row]
>>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>>
>> What alternatives are there?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to