尽早的可查,直接把delay设为0即可 (其它默认值) On Thu, Nov 12, 2020 at 5:17 PM admin <[email protected]> wrote:
> Hi,jingsong > 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。 > 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的, > 比如sink.partition-commit.trigger = partition-time > sink.partition-commit.delay = 10 min > > > 2020年11月12日 下午3:22,Jingsong Li <[email protected]> 写道: > > > > Hi admin, > > > > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) > > > > On Thu, Nov 12, 2020 at 3:11 PM admin <[email protected]> wrote: > > > >> 补充一下不用partition time trigger的原因,partition > >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 > >> > >>> 2020年11月12日 下午2:15,admin <[email protected]> 写道: > >>> > >>> Hi ,kandy > >>> 我没有基于partition time 提交分区,我是基于默认的process > >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > >>> > >>>> 2020年11月12日 下午12:46,kandy.wang <[email protected]> 写道: > >>>> > >>>> hi: > >>>> 按照我的理解,partition time提交分区,是会在current watermark > partition time + > >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay > >>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。 > >>>> > >>>> > >>>> https://cloud.tencent.com/developer/article/1707182 > >>>> > >>>> 这个连接可以看一下 > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> 在 2020-11-12 11:58:22,"admin" <[email protected]> 写道: > >>>>> Hi,all > >>>>> Flink 1.11的filesystem connector,partition > trigger[1]都是使用的默认值,所以分区可以多次提交 > >>>>> 现在有这样的场景: > >>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour > >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, > >>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? > >>>>> 有大佬知道吗,有实际验证过吗 > >>>>> 感谢 > >>>>> > >>>>> 附上简单sql: > >>>>> CREATE TABLE kafka ( > >>>>> a STRING, > >>>>> b STRING, > >>>>> c BIGINT, > >>>>> process_time BIGINT, > >>>>> e STRING, > >>>>> f STRING, > >>>>> g STRING, > >>>>> h INT, > >>>>> i STRING > >>>>> ) WITH ( > >>>>> 'connector' = 'kafka', > >>>>> 'topic' = 'topic', > >>>>> 'properties.bootstrap.servers' = 'x', > >>>>> 'properties.group.id' = 'test-1', > >>>>> 'scan.startup.mode' = 'latest-offset', > >>>>> 'format' = 'json', > >>>>> 'properties.flink.partition-discovery.interval-millis' = '300000' > >>>>> ); > >>>>> > >>>>> CREATE TABLE filesystem ( > >>>>> `day` STRING, > >>>>> `hour` STRING, > >>>>> a STRING, > >>>>> b STRING, > >>>>> c BIGINT, > >>>>> d BIGINT, > >>>>> e STRING, > >>>>> f STRING, > >>>>> g STRING, > >>>>> h INT, > >>>>> i STRING > >>>>> ) PARTITIONED BY (`day`, `hour`) WITH ( > >>>>> 'connector' = 'filesystem', > >>>>> 'format' = 'parquet', > >>>>> 'path' = 'hdfs://xx', > >>>>> 'parquet.compression'='SNAPPY', > >>>>> 'sink.partition-commit.policy.kind' = 'success-file' > >>>>> ); > >>>>> > >>>>> insert into filesystem > >>>>> select > >>>>> from_unixtime(process_time,'yyyy-MM-dd') as `day`, > >>>>> from_unixtime(process_time,'HH') as `hour`, > >>>>> a, > >>>>> b, > >>>>> c, > >>>>> d, > >>>>> e, > >>>>> f, > >>>>> g, > >>>>> h, > >>>>> i > >>>>> from kafka; > >>>>> > >>>>> > >>>>> > >>>>> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger > >>> > >> > >> > > > > -- > > Best, Jingsong Lee > > -- Best, Jingsong Lee
