You can also use accumulators [1] to collect the number of restarts (and then access it via client); but side outputs should work as well.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters Regards, Roman On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > I just realised that only the ProcessFunction is enough. I don't need the > CheckpointFunction. > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <felipe.o.gutier...@gmail.com> > wrote: >> >> Cool! >> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I want to >> count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> Because I need to test it in an integration test I am using a side output >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell me >> :). This was the way that I solved.... >> >> Thanks >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> wrote: >>> >>> Hi Felipe, >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> that depending on the configuration only a pipeline region can be >>> restarted, not the whole job). >>> >>> But if all you want is to check whether it's a first attempt or not, >>> you can also call context.isRestored() from initializeState() [2] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> <felipe.o.gutier...@gmail.com> wrote: >>> > >>> > Hello community, >>> > >>> > Is it possible to know programmatically how many times my Flink stream >>> > job restarted since it was running? >>> > >>> > My use case is like this. I have an Unit test that uses checkpoint and I >>> > throw one exception in a MapFunction for a given time, i.e.: for the 2 >>> > seconds ahead. Because Flink restarts the job and I have checkpoint I can >>> > recover the state and after 2 seconds I don't throw any exception >>> > anymore. Then I would like to know how many times the job was restarted. >>> > >>> > Thanks, >>> > Felipe >>> >