Hi, As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?
If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery. Best, Piotrek czw., 17 cze 2021 o 13:46 Felipe Gutierrez <felipe.o.gutier...@gmail.com> napisaĆ(a): > Hi community, > > I have implemented a join function using CoProcessFunction with > CheckpointedFunction to recover from failures. I added some debug lines to > check if it is restoring and it does. Before the crash, I process events > that fall at processElement2. I create snapshots at snapshotState(), the > application comes back and restores the events. That is fine. > > After the restore, I process events that fall on processElement1. I > register event timers for them as I did on processElement2 before the > crash. But the onTimer() is never called. The point is that I don't have > any events to send to processElement2() to make the CoProcessFunction > register a time for them. They were sent before the crash. > > I suppose that the onTimer() is called only when there are > "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and > processElement2. Because when I test the same application without crashing > and the CoProcessFunction triggers the onTimer() method. > > But if I have a crash in the middle the CoProcessFunction does not call > onTimer(). Why is that? Is that normal? What do I have to do to make the > CoProcessFunction trigger the onTime() method even if only one stream is > processed let's say at the processElement2() method and the other stream is > restored from a snapshot? I imagine that I have to register a time during > the recovery (initializeState()). But how? > > thanks, > Felipe >