Hi Binil,

I think the code itself also looks good to me. May I have a double confirmation 
on the 
details of the issue:
1. What is the parallelism of this operator, and does the issues occurs for all 
the subtasks?
2. Have we already added some logs in the processElement and onTimer to print 
the time of 
registered processing timer and the time of the callbacks? Could you also share 
this part of result?

Best,
Yun Gao


------------------------------------------------------------------
From:Binil Benjamin <bbenja...@splunk.com>
Send Time:2022 Mar. 18 (Fri.) 16:07
To:"yu'an huang" <h.yuan...@gmail.com>
Cc:user <user@flink.apache.org>
Subject:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a 
while

Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks like 
this:

public class WfProcessFunction extends KeyedProcessFunction<Tuple2<String, 
String>, Map<String, Object>, Map<String, Object>> {

    @Override
    public void processElement(Map<String, Object> inputRecord,
        Context context, Collector<Map<String, Object>> collector) throws 
Exception {
        ...
        
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
 + 5 * TimeUnit.SECONDS.toMillis(1L));
        ...
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Map<String, Object>> out) throws Exception {
        ...
    } 
}
Thanks!
On Thu, Mar 17, 2022 at 9:24 PM yu'an huang <h.yuan...@gmail.com> wrote:
[ External sender. Exercise caution. ]

 Hi, can you share your code so we can check whether it is written correctly.



 > On 18 Mar 2022, at 7:54 AM, Binil Benjamin <bbenja...@splunk.com> wrote:
 > 
 > Hi,
 > 
 > We have a class that extends KeyedProcessFunction and overrides onTimer() 
 > method. During processElement(), we register a timer callback using 
 > context.timerService().registerProcessingTimeTimer(<some-future-time>). For 
 > a while, we see that the onTimer() method is getting called back and 
 > everything works as expected; however, after a while, the onTimer() stops 
 > getting any callbacks from Flink (the registration of the timer via. 
 > registerProcessingTimeTimer() is working just fine). Does anyone know what 
 > could be wrong here and how we can debug this?
 > 
 > Flink version is 1.13.2 (running on AWS KDA)
 > 
 > Thanks!



Reply via email to