Hi, forideal

I also encountered this problem and opened an issue[1], you can have a look.

Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-22472



> 在 2021年5月7日,20:31,forideal <fszw...@163.com> 写道:
> 
> I found the reason:
> 
>    Late data processing: The record will be written into its partition when a 
> record is supposed to be written into a partition that has already been 
> committed, and then the committing of this partition will be triggered again.
> So, I see that the success file is slower to update the file.
> 
> Best,
> Forideal
> 
> At 2021-05-07 19:41:45, "forideal" <fszw...@163.com> wrote:
> 
> Hi My friends:
>     I use FlieSystem in Flink SQL, and I found that my success file was 
> submitted late, probably dozens of minutes late.
>     Here I provide some information:
>     1.Flink version is 1.11.1.
>     2.Source DDL
>        create table test (
>           `timestamp bigint`,
>        event_time as _timestamp(timestamp),
>      WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE
>    )...
>   3.Sink DDL
>    create table sinkTest( 
>     xxx
>     dtm VARCHAR,
>     hh VARCHAR
>   ) PARTITIONED BY (dtm, hh) 
>    with(
>     'connector' = 'filesystem',
>     'format' = 'parquet',
>     'parquet.compression' = 'SNAPPY',
>     'sink.rolling-policy.file-size' = '512MB',
>     'sink.rolling-policy.check-interval' = '10 min',
>     'sink.partition-commit.trigger' = 'partition-time',
>     'sink.partition-commit.delay' = '1 h',
>     'sink.partition-commit.policy.kind' = 'success-file',
>     'sink.file-suffix' = '.parquet',
>     'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
>    )
> 
>    4.The interval for task submission checkpoint is 5 minutes, and the 
> checkpoints are all successful.
> 
>    I think that if my task is not delayed, then our success file will be 
> submitted in about 10 minutes every hour, but the fact is that it is 
> submitted very late.
>    Here are some source codes about submitting success file. When the 
> watermark is greater than the current partition time + delay time, I can 
> submit the success file.
> public List<String> committablePartitions(long checkpointId) {
>    if (!watermarks.containsKey(checkpointId)) {
>       throw new IllegalArgumentException(String.format(
>             "Checkpoint(%d) has not been snapshot. The watermark information 
> is: %s.",
>             checkpointId, watermarks));
>    }
> 
>    long watermark = watermarks.get(checkpointId);
>    watermarks.headMap(checkpointId, true).clear();
> 
>    List<String> needCommit = new ArrayList<>();
>    Iterator<String> iter = pendingPartitions.iterator();
>    while (iter.hasNext()) {
>       String partition = iter.next();
>       LocalDateTime partTime = extractor.extract(
>             partitionKeys, extractPartitionValues(new Path(partition)));
>       if (watermark > toMills(partTime) + commitDelay) {
>          needCommit.add(partition);
>          iter.remove();
>       }
>    }
>    return needCommit;
> }
> Best,
> Forideal
> 
> 
>  
> 
> 
>  

Reply via email to