Hi ,kandy
我没有基于partition time 提交分区,我是基于默认的process 
time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区

> 2020年11月12日 下午12:46,kandy.wang <kandy1...@163.com> 写道:
> 
> hi:
> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
> delay 时机触发分区提交,得看你的sink.partition-commit.delay
> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
> https://cloud.tencent.com/developer/article/1707182
> 
> 这个连接可以看一下 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>> Hi,all
>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>> 现在有这样的场景:
>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>> 有大佬知道吗,有实际验证过吗
>> 感谢
>> 
>> 附上简单sql:
>> CREATE TABLE kafka (
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   process_time BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.bootstrap.servers' = 'x',
>>   'properties.group.id' = 'test-1',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json',
>>   'properties.flink.partition-discovery.interval-millis' = '300000'
>> );
>> 
>> CREATE TABLE filesystem (
>>   `day` STRING,
>>   `hour` STRING,
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   d BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>   'connector' = 'filesystem',
>>   'format' = 'parquet',
>>   'path' = 'hdfs://xx',
>>   'parquet.compression'='SNAPPY',
>>   'sink.partition-commit.policy.kind' = 'success-file'
>> );
>> 
>> insert into filesystem
>> select
>>   from_unixtime(process_time,'yyyy-MM-dd') as `day`,
>>   from_unixtime(process_time,'HH') as `hour`,
>>   a,
>>   b,
>>   c,
>>   d,
>>   e,
>>   f,
>>   g,
>>   h,
>>   i
>> from kafka;
>> 
>> 
>> 
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger

回复