Does your ProcessFunction has state? If not it would be in line with the documentation.
Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > So, I was trying to improve by using the CheckpointedFunction as it shows > here [1]. But the method isRestored() says in its documentation [2]: > > "Returns true, if state was restored from the snapshot of a previous > execution. This returns always false for stateless tasks." > > It is weird because I am extending a ProcessFunction which is a > RichFunction. > > public class AuctionExceptionSimulatorProcessFunction extends > ProcessFunction<KeyedReportingData, KeyedReportingData> > implements CheckpointedFunction { > ... > > In the end, I cannot rely on the "isRestored()". Do you know what could be > wrong? I used the same implementation method of [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> You can also use accumulators [1] to collect the number of restarts >> (and then access it via client); but side outputs should work as well. >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >> Regards, >> Roman >> >> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > I just realised that only the ProcessFunction is enough. I don't need >> the CheckpointFunction. >> > >> > >> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < >> felipe.o.gutier...@gmail.com> wrote: >> >> >> >> Cool! >> >> >> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I want to >> count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> >> >> Because I need to test it in an integration test I am using a side >> output ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell me >> :). This was the way that I solved.... >> >> >> >> Thanks >> >> Felipe >> >> >> >> -- >> >> -- Felipe Gutierrez >> >> -- skype: felipe.o.gutierrez >> >> >> >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> >> wrote: >> >>> >> >>> Hi Felipe, >> >>> >> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> >>> that depending on the configuration only a pipeline region can be >> >>> restarted, not the whole job). >> >>> >> >>> But if all you want is to check whether it's a first attempt or not, >> >>> you can also call context.isRestored() from initializeState() [2] >> >>> >> >>> [1] >> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>> >> >>> [2] >> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> >> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>> <felipe.o.gutier...@gmail.com> wrote: >> >>> > >> >>> > Hello community, >> >>> > >> >>> > Is it possible to know programmatically how many times my Flink >> stream job restarted since it was running? >> >>> > >> >>> > My use case is like this. I have an Unit test that uses checkpoint >> and I throw one exception in a MapFunction for a given time, i.e.: for the >> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can >> recover the state and after 2 seconds I don't throw any exception anymore. >> Then I would like to know how many times the job was restarted. >> >>> > >> >>> > Thanks, >> >>> > Felipe >> >>> > >> >