Hey Flink Users,

I am having some issues with getting sideOutputLateData to properly function 
with late event time reports. I have the following code that, per my 
understanding, should be allowing reports that fall after the window has 
triggered and beyond allowed lateness to pass through to the side output:


val lateTag = new OutputTag[…]("tag"){}

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)
  .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))


val lateStream =
  windowedStream.getSideOutput(lateTag);


trigger:

public class myTrigger extends Trigger<…>, Window> {
    @Override
    public TriggerResult onElement(…) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(…) throws Exception {
        throw new Exception("processing time not supported");
    }

    @Override
    public TriggerResult onEventTime(…) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

The main flow is functional when all reports are ontime, but when I start 
introducing late reports, they get rejected by the window but are failing to 
end up in the late stream. Is there something off with my understanding?

Chris

Reply via email to