Hi, 邵红晓

我跟进了相关代码,确实是HBaseRowInputFormat实现有问题[1],目前的代码实现HbaseTableSource只支持从classpath加载hbase-site.xml文件作为配置文件,DDL中的大多数配置项因为序列化原因无法传给InputFormat。建议你将配置写到hbase-site.xml并添加到classpath中,
if (this.conf == null) {
   this.conf = HBaseConfiguration.create();
}
InuptFormat中有段failback逻辑回去发现和加载classpath中的配置,本地IDE中也可以添加hbase-site.xml到classpath进行验证和调试,可以绕过现在的问题。

比较晚发现这个问题一是使用hbas作为source表的用户比较少,主要把hbase用作维表和结果表,维表走HBaseLookupFunction,结果表走HBaseUpsertSinkFunction,这两个的实现都是ok的,二是Hbase的ITCase实现比较特殊,没能覆盖DDL的测试,生产环境一般都是走环境变量配置,添加到classpath,我看这段代码历史还挺久的。

总之,是个好问题,我建了issue,会跟进修复。


Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-17932 
<https://issues.apache.org/jira/browse/FLINK-17932>


> 在 2020年5月23日,08:01,shao.hongxiao <[email protected]> 写道:
> 
> 感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
> hbaseRowinputformatImpl大概是这个类下面,有一句话
> private transient Configuration conf;
> 
> 这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下
> 
> 
> | |
> 邵红晓
> |
> |
> 邮箱:[email protected]
> |
> 
> 签名由 网易邮箱大师 定制
> 
> 在2020年05月23日 00:19,Leonard Xu 写道:
> Hi, hongxiao
> 
> 我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?
> 
> Best,
> Leonard Xu
> [1] 
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
>  
> <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33>
> 
>> 在 2020年5月22日,11:48,shao.hongxiao <[email protected]> 写道:
>> 
>> 一下是我的程序
>> sql
>> val hbaseTable =
>>      """
>>        |CREATE TABLE pvuv_sink(
>>        |    user_id varchar,
>>        |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
>>        |) WITH (
>>        |    'connector.type' = 'hbase',
>>        |    'connector.version' = '1.4.3',
>>        |    'connector.table-name' = 'test_shx',
>>        |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
>>        |    'connector.zookeeper.znode.parent' = '/hbase'
>>        |)
>>      """.stripMargin.toString
>> 
>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    
>> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> bsEnv.setParallelism(1)
>>    val bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> bsTableEnv.sqlUpdate(hbaseTable)
>> bsTableEnv.execute("SQL Job")
>> 
>> 报错
>> job graph 阶段
>> HBaseRowInputFormat.java
>> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
>> core-site.xml, hbase-default.xml, hbase-site.xml"
>> quietmode = true
>> allowNullValueProperties = false
>> resources = {ArrayList@4859}  size = 2
>> finalParameters = {Collections$SetFromMap@4860}  size = 0
>> loadDefaults = true
>> updatingResource = {ConcurrentHashMap@4861}  size = 343
>> properties = {Properties@4862}  size = 343
>> overlay = {Properties@4863}  size = 2
>> classLoader = {Launcher$AppClassLoader@4864}
>> 
>> Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
>> userCodeObject = {HBaseRowInputFormat@13658}
>> tableName = "test_shx"
>> schema = {HBaseTableSchema@13660}
>> conf = null
>> readHelper = null
>> endReached = false
>> table = null
>> scan = null
>> resultScanner = null
>> currentRow = null
>> scannedRows = 0
>> runtimeContext = null
>> 
>> 
>> 恳请各位大神相帮
>> 
>> 
>> 邵红晓
>> 邮箱:[email protected]
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
> 

回复