By timer I mean regular timer from KeyedState which utilized via function onTimer, for example:
public class StateWithTimer { public long timerValue = 0; public volatile boolean shouldResetTimer = true; public boolean resetIfMust(long timeoutInMilliseconds, TimerService timerService) { if (shouldResetTimer) { setupTimer(timeoutInMilliseconds, timerService); shouldResetTimer = false; return true; } return false; } public void setupTimer(long timeoutInMilliseconds, TimerService timerService) { // Cancel previous timer timerService.deleteProcessingTimeTimer(timerValue); // Register new timer // Should it be configurable ? timerValue = (timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000; timerService.registerProcessingTimeTimer(timerValue); } } State which utilizes timers extends StateWithTimer above, the function resetIfMust is current workaround - it resets timers first time after restart from checkpoint or start. @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception { MultiStorePacketState so = state.value(); if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, ctx.timerService())) { return; } closeAndReportFile(collector, so); ctx.timerService().deleteProcessingTimeTimer(so.timerValue); state.update(so); } пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <tsreape...@gmail.com>: > Hi! > > Could you elaborate more on your code or share it if possible? Which timer > are you talking about? Are you using the data stream API or SQL API? Do you > mean the timer registered per record for a window aggregation? Does mini > batch aggregation [1] solve your problem? > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation > > Alex Drobinsky <alex.drobin...@gmail.com> 于2022年2月3日周四 20:41写道: > >> Dear flink user, >> >> In our project, restoring the timer's state creates numerous issues, so I >> would like to know >> if it is possible to avoid save/restore of timers altogether. >> If it isn't possible, how could I delete all registered timers during the >> open function ? >> >> Best regards, >> Alexander >> >