刚刚贴完代码,就分析除问题来了,如下。
我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
long nextFireTimestamp = Math.min(time + interval,
window.maxTimestamp()); 取min的逻辑。
我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 这种方式。

我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。



不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
window.maxTimetamp() 在我的场景下是 et。
找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。

yidan zhao <[email protected]> 于2022年6月28日周二 11:48写道:
>
> 对比实验了下,就是自定义的 trigger
> 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
>
> public class ContinuousProcessTimeTriggerForEventTimeWindow
>         extends Trigger<Object, TimeWindow> {
>
>     private final EventTimeTrigger eventTimeTrigger;
>
>     private final ContinuousProcessingTimeTrigger<TimeWindow>
> continuousProcessTimeTrigger;
>
>     public static ContinuousProcessTimeTriggerForEventTimeWindow
> of(long windowUpdateTimeInSeconds) {
>         return new
> ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
>     }
>
>     private ContinuousProcessTimeTriggerForEventTimeWindow(long
> windowUpdateTimeInSeconds) {
>         eventTimeTrigger = EventTimeTrigger.create();
>         continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
>                 Time.seconds(windowUpdateTimeInSeconds)
>         );
>     }
>
>     @Override
>     public TriggerResult onElement(
>             Object element, long timestamp, TimeWindow window,
> TriggerContext ctx
>     ) throws Exception {
>         continuousProcessTimeTrigger.onElement(element, timestamp, window, 
> ctx);
>         return eventTimeTrigger.onElement(element, timestamp, window, ctx);
>     }
>
>     @Override
>     public TriggerResult onEventTime(
>             long time, TimeWindow window, TriggerContext ctx
>     ) throws Exception {
>         return eventTimeTrigger.onEventTime(time, window, ctx);
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(
>             long time, TimeWindow window, TriggerContext ctx
>     ) throws Exception {
>         return continuousProcessTimeTrigger.onProcessingTime(time, window, 
> ctx);
>     }
>
>     @Override
>     public void clear(
>             TimeWindow window, TriggerContext ctx
>     ) throws Exception {
>         eventTimeTrigger.clear(window, ctx);
>         continuousProcessTimeTrigger.clear(window, ctx);
>     }
>
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
>
>     @Override
>     public void onMerge(
>             TimeWindow window, OnMergeContext ctx
>     ) throws Exception {
>         eventTimeTrigger.onMerge(window, ctx);
>         continuousProcessTimeTrigger.onMerge(window, ctx);
>     }
> }
>
> Shengkai Fang <[email protected]> 于2022年6月28日周二 10:51写道:
> >
> > Hi.
> >
> > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> >
> > Best,
> > Shengkai
> >
> > yidan zhao <[email protected]> 于2022年6月28日周二 10:44写道:
> >
> > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > >
> > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > window是event time window,配合自定义的
> > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > trigger,但是统计窗口是et window)。
> > >
> > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > >

回复