So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]:
"Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." It is weird because I am extending a ProcessFunction which is a RichFunction. public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction<KeyedReportingData, KeyedReportingData> implements CheckpointedFunction { ... In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> wrote: > You can also use accumulators [1] to collect the number of restarts > (and then access it via client); but side outputs should work as well. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters > > Regards, > Roman > > On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > I just realised that only the ProcessFunction is enough. I don't need > the CheckpointFunction. > > > > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < > felipe.o.gutier...@gmail.com> wrote: > >> > >> Cool! > >> > >> I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > >> > >> Because I need to test it in an integration test I am using a side > output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved.... > >> > >> Thanks > >> Felipe > >> > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> > >> > >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> > wrote: > >>> > >>> Hi Felipe, > >>> > >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware > >>> that depending on the configuration only a pipeline region can be > >>> restarted, not the whole job). > >>> > >>> But if all you want is to check whether it's a first attempt or not, > >>> you can also call context.isRestored() from initializeState() [2] > >>> > >>> [1] > >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > >>> > >>> [2] > >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >>> > >>> Regards, > >>> Roman > >>> > >>> > >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > >>> <felipe.o.gutier...@gmail.com> wrote: > >>> > > >>> > Hello community, > >>> > > >>> > Is it possible to know programmatically how many times my Flink > stream job restarted since it was running? > >>> > > >>> > My use case is like this. I have an Unit test that uses checkpoint > and I throw one exception in a MapFunction for a given time, i.e.: for the > 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can > recover the state and after 2 seconds I don't throw any exception anymore. > Then I would like to know how many times the job was restarted. > >>> > > >>> > Thanks, > >>> > Felipe > >>> > >