对比实验了下,就是自定义的 trigger
问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
public class ContinuousProcessTimeTriggerForEventTimeWindow
extends Trigger<Object, TimeWindow> {
private final EventTimeTrigger eventTimeTrigger;
private final ContinuousProcessingTimeTrigger<TimeWindow>
continuousProcessTimeTrigger;
public static ContinuousProcessTimeTriggerForEventTimeWindow
of(long windowUpdateTimeInSeconds) {
return new
ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
}
private ContinuousProcessTimeTriggerForEventTimeWindow(long
windowUpdateTimeInSeconds) {
eventTimeTrigger = EventTimeTrigger.create();
continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
Time.seconds(windowUpdateTimeInSeconds)
);
}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window,
TriggerContext ctx
) throws Exception {
continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx);
return eventTimeTrigger.onElement(element, timestamp, window, ctx);
}
@Override
public TriggerResult onEventTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return eventTimeTrigger.onEventTime(time, window, ctx);
}
@Override
public TriggerResult onProcessingTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx);
}
@Override
public void clear(
TimeWindow window, TriggerContext ctx
) throws Exception {
eventTimeTrigger.clear(window, ctx);
continuousProcessTimeTrigger.clear(window, ctx);
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(
TimeWindow window, OnMergeContext ctx
) throws Exception {
eventTimeTrigger.onMerge(window, ctx);
continuousProcessTimeTrigger.onMerge(window, ctx);
}
}
Shengkai Fang <[email protected]> 于2022年6月28日周二 10:51写道:
>
> Hi.
>
> 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
>
> Best,
> Shengkai
>
> yidan zhao <[email protected]> 于2022年6月28日周二 10:44写道:
>
> > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> >
> > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > window是event time window,配合自定义的
> > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > trigger,但是统计窗口是et window)。
> >
> > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> >