By timer I mean regular timer from KeyedState which utilized via function
onTimer, for example:


public class StateWithTimer {
    public long timerValue = 0;
    public volatile boolean shouldResetTimer = true;

    public boolean resetIfMust(long timeoutInMilliseconds,
TimerService timerService) {
        if (shouldResetTimer) {
            setupTimer(timeoutInMilliseconds, timerService);
            shouldResetTimer = false;
            return true;
        }
        return false;
    }

    public void setupTimer(long timeoutInMilliseconds, TimerService
timerService) {
        // Cancel previous timer
        timerService.deleteProcessingTimeTimer(timerValue);
        // Register new timer
        // Should it be configurable ?
        timerValue = (timerService.currentProcessingTime() +
timeoutInMilliseconds)*1000/1000;
        timerService.registerProcessingTimeTimer(timerValue);
    }

}


State which utilizes timers extends StateWithTimer above, the function
resetIfMust is current workaround - it resets timers first time after
restart from checkpoint or start.

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ClassifierOutput> collector) throws Exception {
   MultiStorePacketState so = state.value();
   if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout,
ctx.timerService())) {
      return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}





пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <tsreape...@gmail.com>:

> Hi!
>
> Could you elaborate more on your code or share it if possible? Which timer
> are you talking about? Are you using the data stream API or SQL API? Do you
> mean the timer registered per record for a window aggregation? Does mini
> batch aggregation [1] solve your problem?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
> Alex Drobinsky <alex.drobin...@gmail.com> 于2022年2月3日周四 20:41写道:
>
>> Dear flink user,
>>
>> In our project, restoring the timer's state creates numerous issues, so I
>> would like to know
>> if it is possible to avoid save/restore of timers altogether.
>> If it isn't possible, how could I delete all registered timers during the
>> open function ?
>>
>> Best regards,
>> Alexander
>>
>

Reply via email to