I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction.
On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <felipe.o.gutier...@gmail.com> wrote: > Cool! > > I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > > Because I need to test it in an integration test I am using a side output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved.... > > Thanks > Felipe > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> Hi Felipe, >> >> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> that depending on the configuration only a pipeline region can be >> restarted, not the whole job). >> >> But if all you want is to check whether it's a first attempt or not, >> you can also call context.isRestored() from initializeState() [2] >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >> [2] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> Regards, >> Roman >> >> >> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > Hello community, >> > >> > Is it possible to know programmatically how many times my Flink stream >> job restarted since it was running? >> > >> > My use case is like this. I have an Unit test that uses checkpoint and >> I throw one exception in a MapFunction for a given time, i.e.: for the 2 >> seconds ahead. Because Flink restarts the job and I have checkpoint I can >> recover the state and after 2 seconds I don't throw any exception anymore. >> Then I would like to know how many times the job was restarted. >> > >> > Thanks, >> > Felipe >> > >> >