hi: 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。
https://cloud.tencent.com/developer/article/1707182 这个连接可以看一下 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道: >Hi,all >Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >现在有这样的场景: >消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >有大佬知道吗,有实际验证过吗 >感谢 > >附上简单sql: >CREATE TABLE kafka ( > a STRING, > b STRING, > c BIGINT, > process_time BIGINT, > e STRING, > f STRING, > g STRING, > h INT, > i STRING >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = 'x', > 'properties.group.id' = 'test-1', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json', > 'properties.flink.partition-discovery.interval-millis' = '300000' >); > >CREATE TABLE filesystem ( > `day` STRING, > `hour` STRING, > a STRING, > b STRING, > c BIGINT, > d BIGINT, > e STRING, > f STRING, > g STRING, > h INT, > i STRING >) PARTITIONED BY (`day`, `hour`) WITH ( > 'connector' = 'filesystem', > 'format' = 'parquet', > 'path' = 'hdfs://xx', > 'parquet.compression'='SNAPPY', > 'sink.partition-commit.policy.kind' = 'success-file' >); > >insert into filesystem >select > from_unixtime(process_time,'yyyy-MM-dd') as `day`, > from_unixtime(process_time,'HH') as `hour`, > a, > b, > c, > d, > e, > f, > g, > h, > i >from kafka; > > > >[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger