看到源码了,这里如果contents == null,就不触发计算了
evictingWindowState.clear();会导致contents 变成null,然后如果窗口没有数据就不会触发窗口计算
if (triggerResult.isFire()) {
Iterable> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(ac
是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
LakeShen 于2020年1月19日周日 下午3:30写道:
> Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> Savepoint 类似,如果不清理,就永久保留。
> 非常感谢
>
>
> Yun Tang 于2020年1月19日周日 下午2:06写道:
>
>> Hi
>>
>> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除
Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
Savepoint 类似,如果不清理,就永久保留。
非常感谢
Yun Tang 于2020年1月19日周日 下午2:06写道:
> Hi
>
> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> [1],Savepoint的生命
是否可以使用 空闲状态 Retention Time 来设置
JingsongLee 于2019年9月4日周三 下午6:12写道:
> 一般是按时间(比如天)来group by,state配置了超时过期的时间。
> 基本的去重方式就是靠state(比如RocksDbState)。
> 有mini-batch来减少 对state的访问。
>
> 如果有倾斜,那是解倾斜问题的话题了。
>
> Best,
> Jingsong Lee
>
>
> --
> Fr
Hi
如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
[1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
因此,加载的checkpoint被赋予了savepoint的property [2]。 这个CheckpointProperties#SAVEPOINT
里面的 discardSubsumed
属性是false,也就是当新的checkpo
大家新年快乐!
版本:FLINK 1.9.1
部分代码
.keyBy("key")
.window(EventTimeSessionWindows.withGap(Time.hours(1)))
.trigger(new NewTrigger())
.process(new NewProcess())
--NewTrigger()
@Override
public TriggerResult onElement(Bean bean, long l, TimeWindow timeWindow,
TriggerContext ctx) throws Exception {