hi:
按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
delay 时机触发分区提交,得看你的sink.partition-commit.delay
设置的多久,如果超过之后,应当默认是会丢弃的吧。


https://cloud.tencent.com/developer/article/1707182

这个连接可以看一下 







在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>Hi,all
>Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>现在有这样的场景:
>消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>有大佬知道吗,有实际验证过吗
>感谢
>
>附上简单sql:
>CREATE TABLE kafka (
>    a STRING,
>    b STRING,
>    c BIGINT,
>    process_time BIGINT,
>    e STRING,
>    f STRING,
>    g STRING,
>    h INT,
>    i STRING
>) WITH (
>    'connector' = 'kafka',
>    'topic' = 'topic',
>    'properties.bootstrap.servers' = 'x',
>    'properties.group.id' = 'test-1',
>    'scan.startup.mode' = 'latest-offset',
>    'format' = 'json',
>    'properties.flink.partition-discovery.interval-millis' = '300000'
>);
>
>CREATE TABLE filesystem (
>    `day` STRING,
>    `hour` STRING,
>    a STRING,
>    b STRING,
>    c BIGINT,
>    d BIGINT,
>    e STRING,
>    f STRING,
>    g STRING,
>    h INT,
>    i STRING
>) PARTITIONED BY (`day`, `hour`) WITH (
>    'connector' = 'filesystem',
>    'format' = 'parquet',
>    'path' = 'hdfs://xx',
>    'parquet.compression'='SNAPPY',
>    'sink.partition-commit.policy.kind' = 'success-file'
>);
>
>insert into filesystem
>select
>    from_unixtime(process_time,'yyyy-MM-dd') as `day`,
>    from_unixtime(process_time,'HH') as `hour`,
>    a,
>    b,
>    c,
>    d,
>    e,
>    f,
>    g,
>    h,
>    i
>from kafka;
>
>
>
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger

回复