Hey JING, thanks for getting back to me. I tried to produce the smallest, self-contained example that produces the phenomenon: https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
If you run MainRepl you should see an infinite loop of re-processing the 5 integers. The offending process is BufferedLatestSelector - specifically the event timer that is registered in it. Without the timer the process will not emit an output. The timer is set whenever the state is null. Is there a problem with how I implemented that buffering process? Thank you, Matthias On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Matthias, > How often do you register the event-time timer? > It is registered per input record, or re-registered a new timer after an > event-time timer is triggered? > Would you please provide your test case code, it would be very helpful for > troubleshooting. > > Best wishes, > JING ZHANG > > Matthias Broecheler <matth...@dataeng.ai> 于2021年8月14日周六 上午3:44写道: > >> Hey guys, >> >> I have a KeyedProcessFunction that gathers statistics on the events that >> flow through and emits it periodically (every few seconds) to a SideOutput. >> However, at the end of stream the last set of statistics don't get >> emitted. I read on the mailing list that processing time timers that are >> pending don't get triggered when Flink cleans up a stream, but that event >> timers do get triggered because a watermark with Long.MAX_VALUE is sent >> through the stream. >> Hence, I thought that I could register a "backup" event timer for >> Long.MAX_VALUE-1 to make sure that my process function gets notified when >> the stream ends to emit the in-flight statistics. >> >> However, now my simple test case (with a data source fromCollection of 4 >> elements) keeps iterating over the same 4 elements in an infinite loop. >> >> I don't know how to make sense of this and would appreciate your help. >> Is there a better way to set a timer that gets triggered at the end of >> stream? >> And for my education: Why does registering an event timer cause an >> infinite loop over the source elements? >> >> Thanks a lot and have a wonderful weekend, >> Matthias >> >