Re:FLINK 1.9.1 TriggerResult.FIRE_AND_PURGE 与 ctx.registerEventTimeTimer()

2020-01-18 文章 USERNAME
看到源码了,这里如果contents == null,就不触发计算了 evictingWindowState.clear();会导致contents 变成null,然后如果窗口没有数据就不会触发窗口计算 if (triggerResult.isFire()) { Iterable> contents = evictingWindowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } emitWindowContents(ac

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-18 文章 LakeShen
是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。 LakeShen 于2020年1月19日周日 下午3:30写道: > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 > Savepoint 类似,如果不清理,就永久保留。 > 非常感谢 > > > Yun Tang 于2020年1月19日周日 下午2:06写道: > >> Hi >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-18 文章 LakeShen
Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和 Savepoint 类似,如果不清理,就永久保留。 非常感谢 Yun Tang 于2020年1月19日周日 下午2:06写道: > Hi > > 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by > design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 > [1],Savepoint的生命

Re: 回复: 关于Flink SQL DISTINCT问题

2020-01-18 文章 LakeShen
是否可以使用 空闲状态 Retention Time 来设置 JingsongLee 于2019年9月4日周三 下午6:12写道: > 一般是按时间(比如天)来group by,state配置了超时过期的时间。 > 基本的去重方式就是靠state(比如RocksDbState)。 > 有mini-batch来减少 对state的访问。 > > 如果有倾斜,那是解倾斜问题的话题了。 > > Best, > Jingsong Lee > > > -- > Fr

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-18 文章 Yun Tang
Hi 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 因此,加载的checkpoint被赋予了savepoint的property [2]。 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed 属性是false,也就是当新的checkpo

FLINK 1.9.1 TriggerResult.FIRE_AND_PURGE 与 ctx.registerEventTimeTimer()

2020-01-18 文章 USERNAME
大家新年快乐! 版本:FLINK 1.9.1 部分代码 .keyBy("key") .window(EventTimeSessionWindows.withGap(Time.hours(1))) .trigger(new NewTrigger()) .process(new NewProcess()) --NewTrigger() @Override public TriggerResult onElement(Bean bean, long l, TimeWindow timeWindow, TriggerContext ctx) throws Exception {