So, I was trying to improve by using the CheckpointedFunction as it shows
here [1]. But the method isRestored() says in its documentation [2]:

"Returns true, if state was restored from the snapshot of a previous
execution. This returns always false for stateless tasks."

It is weird because I am extending a ProcessFunction which is a
RichFunction.

public class AuctionExceptionSimulatorProcessFunction extends
ProcessFunction<KeyedReportingData, KeyedReportingData>
        implements CheckpointedFunction {
...

In the end, I cannot rely on the "isRestored()". Do you know what could be
wrong? I used the same implementation method of [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--


*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> wrote:

> You can also use accumulators [1] to collect the number of restarts
> (and then access it via client); but side outputs should work as well.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>
> Regards,
> Roman
>
> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > I just realised that only the ProcessFunction is enough. I don't need
> the CheckpointFunction.
> >
> >
> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
> felipe.o.gutier...@gmail.com> wrote:
> >>
> >> Cool!
> >>
> >> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
> >>
> >> Because I need to test it in an integration test I am using a side
> output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved....
> >>
> >> Thanks
> >> Felipe
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >>
> >>
> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>
> >>> Hi Felipe,
> >>>
> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
> >>> that depending on the configuration only a pipeline region can be
> >>> restarted, not the whole job).
> >>>
> >>> But if all you want is to check whether it's a first attempt or not,
> >>> you can also call context.isRestored() from initializeState() [2]
> >>>
> >>> [1]
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
> >>>
> >>> [2]
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>>
> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
> >>> <felipe.o.gutier...@gmail.com> wrote:
> >>> >
> >>> > Hello community,
> >>> >
> >>> > Is it possible to know programmatically how many times my Flink
> stream job restarted since it was running?
> >>> >
> >>> > My use case is like this. I have an Unit test that uses checkpoint
> and I throw one exception in a MapFunction for a given time, i.e.: for the
> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
> recover the state and after 2 seconds I don't throw any exception anymore.
> Then I would like to know how many times the job was restarted.
> >>> >
> >>> > Thanks,
> >>> > Felipe
> >>> >
>

Reply via email to