Hi, forideal I also encountered this problem and opened an issue[1], you can have a look.
Best, Leonard [1] https://issues.apache.org/jira/browse/FLINK-22472 > 在 2021年5月7日,20:31,forideal <fszw...@163.com> 写道: > > I found the reason: > > Late data processing: The record will be written into its partition when a > record is supposed to be written into a partition that has already been > committed, and then the committing of this partition will be triggered again. > So, I see that the success file is slower to update the file. > > Best, > Forideal > > At 2021-05-07 19:41:45, "forideal" <fszw...@163.com> wrote: > > Hi My friends: > I use FlieSystem in Flink SQL, and I found that my success file was > submitted late, probably dozens of minutes late. > Here I provide some information: > 1.Flink version is 1.11.1. > 2.Source DDL > create table test ( > `timestamp bigint`, > event_time as _timestamp(timestamp), > WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE > )... > 3.Sink DDL > create table sinkTest( > xxx > dtm VARCHAR, > hh VARCHAR > ) PARTITIONED BY (dtm, hh) > with( > 'connector' = 'filesystem', > 'format' = 'parquet', > 'parquet.compression' = 'SNAPPY', > 'sink.rolling-policy.file-size' = '512MB', > 'sink.rolling-policy.check-interval' = '10 min', > 'sink.partition-commit.trigger' = 'partition-time', > 'sink.partition-commit.delay' = '1 h', > 'sink.partition-commit.policy.kind' = 'success-file', > 'sink.file-suffix' = '.parquet', > 'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00' > ) > > 4.The interval for task submission checkpoint is 5 minutes, and the > checkpoints are all successful. > > I think that if my task is not delayed, then our success file will be > submitted in about 10 minutes every hour, but the fact is that it is > submitted very late. > Here are some source codes about submitting success file. When the > watermark is greater than the current partition time + delay time, I can > submit the success file. > public List<String> committablePartitions(long checkpointId) { > if (!watermarks.containsKey(checkpointId)) { > throw new IllegalArgumentException(String.format( > "Checkpoint(%d) has not been snapshot. The watermark information > is: %s.", > checkpointId, watermarks)); > } > > long watermark = watermarks.get(checkpointId); > watermarks.headMap(checkpointId, true).clear(); > > List<String> needCommit = new ArrayList<>(); > Iterator<String> iter = pendingPartitions.iterator(); > while (iter.hasNext()) { > String partition = iter.next(); > LocalDateTime partTime = extractor.extract( > partitionKeys, extractPartitionValues(new Path(partition))); > if (watermark > toMills(partTime) + commitDelay) { > needCommit.add(partition); > iter.remove(); > } > } > return needCommit; > } > Best, > Forideal > > > > > >