Hi Mich, We can't convert a DataStream to a value. There are some options: 1. Use a TableSink to write data[1] into Hbase. 2. Use a UDF[2].
Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I need this operation to stored filtered rows in an Hbase table. > > I can access an existing Hbase table through flink API > > My challenge is to put rows into Hbase table. Something like below and I > don't seem to be able to extract individual column values from priceTable > > > > > * val key = > tableEnv.scan("priceTable").select('key).toDataStream[Row].print() > val ticker = > tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print() > val timeissued = > tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print() > val price = > tableEnv.scan("priceTable").select('price).toDataStream[Row].print()* > val CURRENCY = "GBP" > val op_type = "1" > val op_time = System.currentTimeMillis.toString > /* > if (price > 99.0) > { > // Save prices to Hbase table > var p = new Put(new String(key).getBytes()) > p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), > new String(ticker).getBytes()) > p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new > String(timeissued).getBytes()) > p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), > new String(priceToString).getBytes()) > p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), > new String(CURRENCY).getBytes()) > p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), > new String(1.toString).getBytes()) > p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), > new String(System.currentTimeMillis.toString).getBytes()) > HbaseTable.put(p) > HbaseTable.flushCommits() > if(tableEnv.scan("priceTable").filter('ticker == "VOD" && > 'price > 99.0)) > { > sqltext = Calendar.getInstance.getTime.toString + ", Price > on "+ticker+" hit " +price.toString > //java.awt.Toolkit.getDefaultToolkit().beep() > println(sqltext) > } > } > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hi, >> >> The following works fine >> >> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, >> 'timeissued, 'price) >> val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && >> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) >> val r = result.toDataStream[Row] >> r.print() >> >> Now I would like to get the individual column values from priceTable into >> local variables >> >> This does not seem to work >> >> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] >> val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[ >> Row] >> val timeissued = tableEnv.scan("priceTable").select('timeissued). >> toDataStream[Row] >> val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] >> >> What alternatives are there? >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >