感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
hbaseRowinputformatImpl大概是这个类下面,有一句话
private transient Configuration conf;

这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下


| |
邵红晓
|
|
邮箱:[email protected]
|

签名由 网易邮箱大师 定制

在2020年05月23日 00:19,Leonard Xu 写道:
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 
<https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33>

> 在 2020年5月22日,11:48,shao.hongxiao <[email protected]> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>       """
>         |CREATE TABLE pvuv_sink(
>         |    user_id varchar,
>         |    f ROW<item_id varchar,category_id varchar,cnt BIGINT>
>         |) WITH (
>         |    'connector.type' = 'hbase',
>         |    'connector.version' = '1.4.3',
>         |    'connector.table-name' = 'test_shx',
>         |    'connector.zookeeper.quorum' = 'docker-hbase:2181',
>         |    'connector.zookeeper.znode.parent' = '/hbase'
>         |)
>       """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
>     val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.java    readObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>    
> 邵红晓
> 邮箱:[email protected]
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B5%E7%BA%A2%E6%99%93&uid=17611022895%40163.com&iconUrl=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg&items=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制

回复