Hey JING,

thanks for getting back to me. I tried to produce the smallest,
self-contained example that produces the phenomenon:
https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f

If you run MainRepl you should see an infinite loop of re-processing the 5
integers. The offending process is BufferedLatestSelector - specifically
the event timer that is registered in it. Without the timer the process
will not emit an output.

The timer is set whenever the state is null. Is there a problem with how I
implemented that buffering process?
Thank you,
Matthias

On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Matthias,
> How often do you register the event-time timer?
> It is registered per input record, or re-registered a new timer after an
> event-time timer is triggered?
> Would you please provide your test case code, it would be very helpful for
> troubleshooting.
>
> Best wishes,
> JING ZHANG
>
> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>
>> Hey guys,
>>
>> I have a KeyedProcessFunction that gathers statistics on the events that
>> flow through and emits it periodically (every few seconds) to a SideOutput.
>> However, at the end of stream the last set of statistics don't get
>> emitted. I read on the mailing list that processing time timers that are
>> pending don't get triggered when Flink cleans up a stream, but that event
>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>> through the stream.
>> Hence, I thought that I could register a "backup" event timer for
>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>> the stream ends to emit the in-flight statistics.
>>
>> However, now my simple test case (with a data source fromCollection of 4
>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>
>> I don't know how to make sense of this and would appreciate your help.
>> Is there a better way to set a timer that gets triggered at the end of
>> stream?
>> And for my education: Why does registering an event timer cause an
>> infinite loop over the source elements?
>>
>> Thanks a lot and have a wonderful weekend,
>> Matthias
>>
>

Reply via email to