Hi Alex,

I think the better solution is to know what the problem you have ever met when 
restoring the timers?

Flink does not support to remove state (including timer state) currently.

Best
Yun Tang
________________________________
From: Alex Drobinsky <alex.drobin...@gmail.com>
Sent: Monday, February 7, 2022 21:09
To: Caizhi Weng <tsreape...@gmail.com>
Cc: User-Flink <user@flink.apache.org>
Subject: Re: How to prevent check pointing of timers ?

By timer I mean regular timer from KeyedState which utilized via function 
onTimer, for example:



public class StateWithTimer {
    public long timerValue = 0;
    public volatile boolean shouldResetTimer = true;

    public boolean resetIfMust(long timeoutInMilliseconds, TimerService 
timerService) {
        if (shouldResetTimer) {
            setupTimer(timeoutInMilliseconds, timerService);
            shouldResetTimer = false;
            return true;
        }
        return false;
    }

    public void setupTimer(long timeoutInMilliseconds, TimerService 
timerService) {
        // Cancel previous timer
        timerService.deleteProcessingTimeTimer(timerValue);
        // Register new timer
        // Should it be configurable ?
        timerValue = (timerService.currentProcessingTime() + 
timeoutInMilliseconds)*1000/1000;
        timerService.registerProcessingTimeTimer(timerValue);
    }

}

State which utilizes timers extends StateWithTimer above, the function 
resetIfMust is current workaround - it resets timers first time after restart 
from checkpoint or start.


@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<ClassifierOutput> collector) throws Exception {
   MultiStorePacketState so = state.value();
   if 
(so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, 
ctx.timerService())) {
      return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}




пн, 7 февр. 2022 г. в 05:06, Caizhi Weng 
<tsreape...@gmail.com<mailto:tsreape...@gmail.com>>:
Hi!

Could you elaborate more on your code or share it if possible? Which timer are 
you talking about? Are you using the data stream API or SQL API? Do you mean 
the timer registered per record for a window aggregation? Does mini batch 
aggregation [1] solve your problem?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 
于2022年2月3日周四 20:41写道:
Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I would 
like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the open 
function ?

Best regards,
Alexander

Reply via email to