Hi Binil, I think the code itself also looks good to me. May I have a double confirmation on the details of the issue: 1. What is the parallelism of this operator, and does the issues occurs for all the subtasks? 2. Have we already added some logs in the processElement and onTimer to print the time of registered processing timer and the time of the callbacks? Could you also share this part of result?
Best, Yun Gao ------------------------------------------------------------------ From:Binil Benjamin <bbenja...@splunk.com> Send Time:2022 Mar. 18 (Fri.) 16:07 To:"yu'an huang" <h.yuan...@gmail.com> Cc:user <user@flink.apache.org> Subject:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while Hi, Unfortunately, I cannot share the entire code, but the class roughly looks like this: public class WfProcessFunction extends KeyedProcessFunction<Tuple2<String, String>, Map<String, Object>, Map<String, Object>> { @Override public void processElement(Map<String, Object> inputRecord, Context context, Collector<Map<String, Object>> collector) throws Exception { ... context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5 * TimeUnit.SECONDS.toMillis(1L)); ... } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Map<String, Object>> out) throws Exception { ... } } Thanks! On Thu, Mar 17, 2022 at 9:24 PM yu'an huang <h.yuan...@gmail.com> wrote: [ External sender. Exercise caution. ] Hi, can you share your code so we can check whether it is written correctly. > On 18 Mar 2022, at 7:54 AM, Binil Benjamin <bbenja...@splunk.com> wrote: > > Hi, > > We have a class that extends KeyedProcessFunction and overrides onTimer() > method. During processElement(), we register a timer callback using > context.timerService().registerProcessingTimeTimer(<some-future-time>). For > a while, we see that the onTimer() method is getting called back and > everything works as expected; however, after a while, the onTimer() stops > getting any callbacks from Flink (the registration of the timer via. > registerProcessingTimeTimer() is working just fine). Does anyone know what > could be wrong here and how we can debug this? > > Flink version is 1.13.2 (running on AWS KDA) > > Thanks!