Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min

> 2020年11月12日 下午3:22,Jingsong Li <jingsongl...@gmail.com> 写道:
> 
> Hi admin,
> 
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> 
> On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> 
>> 补充一下不用partition time trigger的原因,partition
>> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>> 
>>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
>>> 
>>> Hi ,kandy
>>> 我没有基于partition time 提交分区,我是基于默认的process
>> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
>>> 
>>>> 2020年11月12日 下午12:46,kandy.wang <kandy1...@163.com> 写道:
>>>> 
>>>> hi:
>>>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
>> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
>>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>>>> 
>>>> 
>>>> https://cloud.tencent.com/developer/article/1707182
>>>> 
>>>> 这个连接可以看一下
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>>>> Hi,all
>>>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>>>> 现在有这样的场景:
>>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour
>> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>>>> 有大佬知道吗,有实际验证过吗
>>>>> 感谢
>>>>> 
>>>>> 附上简单sql:
>>>>> CREATE TABLE kafka (
>>>>> a STRING,
>>>>> b STRING,
>>>>> c BIGINT,
>>>>> process_time BIGINT,
>>>>> e STRING,
>>>>> f STRING,
>>>>> g STRING,
>>>>> h INT,
>>>>> i STRING
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'topic',
>>>>> 'properties.bootstrap.servers' = 'x',
>>>>> 'properties.group.id' = 'test-1',
>>>>> 'scan.startup.mode' = 'latest-offset',
>>>>> 'format' = 'json',
>>>>> 'properties.flink.partition-discovery.interval-millis' = '300000'
>>>>> );
>>>>> 
>>>>> CREATE TABLE filesystem (
>>>>> `day` STRING,
>>>>> `hour` STRING,
>>>>> a STRING,
>>>>> b STRING,
>>>>> c BIGINT,
>>>>> d BIGINT,
>>>>> e STRING,
>>>>> f STRING,
>>>>> g STRING,
>>>>> h INT,
>>>>> i STRING
>>>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>>> 'connector' = 'filesystem',
>>>>> 'format' = 'parquet',
>>>>> 'path' = 'hdfs://xx',
>>>>> 'parquet.compression'='SNAPPY',
>>>>> 'sink.partition-commit.policy.kind' = 'success-file'
>>>>> );
>>>>> 
>>>>> insert into filesystem
>>>>> select
>>>>> from_unixtime(process_time,'yyyy-MM-dd') as `day`,
>>>>> from_unixtime(process_time,'HH') as `hour`,
>>>>> a,
>>>>> b,
>>>>> c,
>>>>> d,
>>>>> e,
>>>>> f,
>>>>> g,
>>>>> h,
>>>>> i
>>>>> from kafka;
>>>>> 
>>>>> 
>>>>> 
>>>>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
>>> 
>> 
>> 
> 
> -- 
> Best, Jingsong Lee

回复