Hey Flink Users, I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:
val lateTag = new OutputTag[…]("tag"){} val windowedStream = stream .keyBy(…) .window(TumblingEventTimeWindows.of(…)) .allowedLateness(…) .sideOutputLateData(lateTag) .trigger(new myTrigger) .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount)) val lateStream = windowedStream.getSideOutput(lateTag); trigger: public class myTrigger extends Trigger<…>, Window> { @Override public TriggerResult onElement(…) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(…) throws Exception { throw new Exception("processing time not supported"); } @Override public TriggerResult onEventTime(…) throws Exception { return TriggerResult.FIRE_AND_PURGE; } The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding? Chris