失败的图没有呢。。具体什么异常?

On Mon, Sep 7, 2020 at 10:23 AM MuChen <9329...@qq.com> wrote:

> hi, all:
> 麻烦大佬们帮看个问题,多谢!
>
> 处理逻辑如下
> 1. 使用DataStream API读取kafka中的数据,写入DataStream ds1中
> 2. 新建一个tableEnv,并注册hive catalog:
>     tableEnv.registerCatalog(catalogName, catalog);
>     tableEnv.useCatalog(catalogName);
> 3. 声明以ds1为数据源的table
>     Table sourcetable = tableEnv.fromDataStream(ds1);
>     String souceTableName = "music_source";
>     tableEnv.createTemporaryView(souceTableName, sourcetable);
> 4. 创建一张hive表:
>
> CREATE TABLE `dwd_music_copyright_test`(
>   `url` string COMMENT 'url',
>   `md5` string COMMENT 'md5',
>   `utime` bigint COMMENT '时间',
>   `title` string COMMENT '歌曲名',
>   `singer` string COMMENT '演唱者',
>   `company` string COMMENT '公司',
>   `level` int COMMENT '置信度.0是标题切词,1是acrcloud返回的结果,3是人工标准')
> PARTITIONED BY (
>   `dt` string,
>   `hour` string)ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS 
> INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION
>   'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test'
> TBLPROPERTIES (
>   'connector'='HiveCatalog',
>   'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.rolling-policy.check-interval'='30s',
>   'sink.rolling-policy.rollover-interval'='1min',
>   'sink.rolling-policy.file-size'='1MB');
>
>
> 5. 将step3表中的数据插入dwd_music_copyright_test
>
> 环境
>
> flink:1.11
> kafka:1.1.1
> hadoop:2.6.0
> hive:1.2.0
>
> 问题
>     程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区:
>
> show partitions rt_dwd.dwd_music_copyright_test;
>
> | dt=2020-08-29/hour=00  |
> | dt=2020-08-29/hour=01  |
> | dt=2020-08-29/hour=04  |
> | dt=2020-08-29/hour=05  |
>
>  但是hdfs目录下有文件生成:
>
> $ hadoop fs -du -h 
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/4.5 K   
> 13.4 K  
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=002.0
>  K   6.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=011.7
>  K   5.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=021.3
>  K   3.8 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=033.1
>  K   9.2 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04
>
>
> 且手动add partition后可以正常读取数据。
>
> 通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生:
>
>
>
>
>
> 请问:
>
> 1. exactly-once只能保证写sink文件,不能保证更新catalog吗?
> 2. 是的话有什么方案解决这个问题吗?
> 3.
> EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE?
>
> streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE);
>
>

-- 
Best, Jingsong Lee

回复