完了,现在的问题是发现好像所有的分区都没有提交,一直不提交,这是为什么呢?



 
发件人: [email protected]
发送时间: 2020-12-24 17:04
收件人: user-zh
主题: Flink-1.11.1流写filesystem分区提交问题
hi everyone,
 
最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。
 
问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)
 
那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..
 
另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。
 
描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!
 
source ddl:
CREATE TABLE kafka_source (
    order_id STRING,
    order_sales DOUBLE,
    update_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-kafka',
    'properties.bootstrap.servers' = '10.3.15.128:9092',
    'properties.group.id' = 'kafka_hdfs',
    'format' = 'json',
    'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
    order_id STRING,
    order_sales DOUBLE,
    dt STRING,
    `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs:///user/flink/order',
    'format' = 'json',
    'sink.partition-commit.delay' = '1h',
    'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink 
SELECT 
    order_id,
    order_sales,
    DATE_FORMAT(update_time, 'yyyy-MM-dd'),
    DATE_FORMAT(update_time, 'HH')
FROM kafka_source
 
best,
amenhub
 
 

回复