你好,我想提两个问题,关于flink-sql整合hbase,问题列表如下:
问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。

问题二:flink-sql
查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT



两个问题的具体情景如下:

我所使用的版本如下:



Hadoop  3.0.0+cdh6.3.2

HDFS    3.0.0+cdh6.3.2

HBase   2.1.0+cdh6.3.2

Hive    2.1.1+cdh6.3.2

Flink   1.11.1



我在HBase里有两个表,分别为't1'和'econ_run_data_minutes',其中,
表't1'的行数为2行,而'econ_run_data_minutes'的行数超过1亿行。



我在flink-sql里,例用hive catalog定义'myHbaseT1'映射't1',
定义'myHbaseMin'映射'econ_run_data_minutes',如下:



CREATE TABLE myHbaseT1 (

  rowkey string,

  f1 row (val VARCHAR, dqf VARCHAR),

  f2 row (val VARCHAR, dqf VARCHAR)

) WITH (

 'connector' = 'hbase-1.4',

 'table-name' = 't1',

 'zookeeper.quorum' =
'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'

);





CREATE TABLE myHbaseMin (

  rowkey string,

  m1 row (val VARCHAR, dqf VARCHAR, ts TIMESTAMP(3)),

 PRIMARY KEY (rowkey) NOT ENFORCED

) WITH (

 'connector' = 'hbase-1.4',

 'table-name' = 'econ_run_data_minutes',

 'zookeeper.quorum' =
'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'

);



然手,在flink-sql client里执行查询语句,"select * from myHbaseT1",因为数据量只有两行,所以很快返回结果,如下:

Flink SQL> select * from myHbaseT1 ;

+-----+----------------------+----------------------+----------------------+

| +/- |               rowkey |                   f1 |                   f2 |

+-----+----------------------+----------------------+----------------------+

|   + | IDIN200F.d1.P-159... |            null,null |                100,0 |

|   + |                 row1 |                200,0 |            null,null |

+-----+----------------------+----------------------+----------------------+

Received a total of 2 rows



Flink SQL> 

以上测试,证明我的flink-sql环境通过hive catalog整合hbase查询是没有问题的!



接着,我在flink-sql client里执行查询语句,

"select * from myhbasemin where rowkey >= 'IDIN200F.d1.P-1598493600000' and
rowkey <= 'IDIN200F.d1.P-1598497200000'",

这个查询结果其实只是返回不到100行的数据,但却要耗时半个小时左右,原因是因为flink做了整表扫描,执行计划如下以及核心日志如下:



Source: TableSourceScan(table=[[myhive, dw, myhbasemin]], fields=[rowkey,
m1]) -> Calc(select=[rowkey, m1], where=[((rowkey >=
_UTF-16LE'IDIN200F.d1.P-1598493600000') AND (rowkey <=
_UTF-16LE'IDIN200F.d1.P-1598497200000'))]) -> SinkConversionToTuple2 ->
Sink: SQL Client Stream Collect Sink





2020-08-28 11:20:24,501 INFO 
org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
opening split
(this=org.apache.flink.connector.hbase.source.HBaseRowDataInputFormat@5c382732)[1|[dev-hadoop-node-c:16020]|BSBMS.d1.cu3.061c.U-15935022|BSBMS.d1.cu3.147c.U-159092]

2020-08-28 11:20:49,086 INFO 
org.apache.flink.connector.hbase.source.AbstractTableInputFormat [] -
Closing split (scanned 4410553 rows)



我设想中的情景,flink-sql应该抓住这个rowkey,在hbase里面使用StartRow,
EndRow做精确扫描,而不是整表扫描后,在flink的task manager里面做filter !

所以,问题一:flink-sql通过hive catalog整合hbase后的查询执行计划,可否使用rowkey做精确filter而不是整表扫描。



在问题一的基础思考之上,我想是否可以通过hive的external table整合hbase,然后在flink-sql里查询hive的externa
table,实现快速精确查询,实验过程如下:



首先,在hive里定义external table整合hbase:



CREATE EXTERNAL TABLE `hive_hbase_t1`(

  `m_key` string COMMENT '', 

  `f1_val` string COMMENT '', 

  `f1_dqf` int COMMENT '', 

  `f2_val` string COMMENT '', 

  `f2_dqf` int COMMENT '', 

  `ts` timestamp COMMENT '')

ROW FORMAT SERDE 

  'org.apache.hadoop.hive.hbase.HBaseSerDe' 

STORED BY 

  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 

  WITH SERDEPROPERTIES ( 

   'hbase.columns.mapping'=':key,f1:val,f1:dqf,f2:val,f2:dqf, :timestamp', 

   'serialization.format'='1')



TBLPROPERTIES (

  'hbase.table.name'='t1'

);



然后,在hive里查询,"select * from hive_hbase_t1":



hive> select * from hive_hbase_t1 ;

IDIN200F.d1.P-1593484146001     NULL    NULL    100     0       2020-06-30 
10:56:07.983

row1    200     0       NULL    NULL    2020-06-30 10:54:56.558

hive>

以上测试,证明我的环境,hive是可以通过external table整合hbase进行查询。



接着,我尝试在flink-sql里,查询,"select * from hive_hbase_t1":

Flink SQL>  select * from hive_hbase_t1;

Caused by: java.lang.NullPointerException

        at java.lang.Class.forName0(Native Method)

        at java.lang.Class.forName(Class.java:348)

        at
org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:305)

        ... 24 more



End of exception on server side>]



Flink SQL> 



经过调试,我发现,当org.apache.flink.connectors.hive.read.HiveTableInputFormat进行序列化分区的时候,找不到StorageDescriptor的inputFormat,

报错的代码在方法的org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits:

format = (InputFormat)Class.forName(sd.getInputFormat(), true,
Thread.currentThread().getContextClassLoader()).newInstance();



而实际上,我是有引入"org.apache.hive:hive-hbase-handler:2.1.1",

但很明显,flink-sql并没有利用'hive_hbase_t1'里的定义"STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ",

从而找不到正确的 INPUTFORMAT
'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat' 和 OUTPUTFORMAT
'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat'



所以,我的问题二:flink-sql
查询hive的hbase外部表,是否没有通过HBaseStorageHandler正确解析INPUTFORMAT和OUTPUTFORMAT






--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复