Does your ProcessFunction has state? If not it would be in line with the
documentation.

Also which Flink version are you using? Before Flink 1.13 empty state was
omitted so I could imagine that `isRestored()` would return false but it
should actually now also return true for empty state.

On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> So, I was trying to improve by using the CheckpointedFunction as it shows
> here [1]. But the method isRestored() says in its documentation [2]:
>
> "Returns true, if state was restored from the snapshot of a previous
> execution. This returns always false for stateless tasks."
>
> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
>
> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction<KeyedReportingData, KeyedReportingData>
>         implements CheckpointedFunction {
> ...
>
> In the end, I cannot rely on the "isRestored()". Do you know what could be
> wrong? I used the same implementation method of [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> You can also use accumulators [1] to collect the number of restarts
>> (and then access it via client); but side outputs should work as well.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>
>> Regards,
>> Roman
>>
>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > I just realised that only the ProcessFunction is enough. I don't need
>> the CheckpointFunction.
>> >
>> >
>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>> felipe.o.gutier...@gmail.com> wrote:
>> >>
>> >> Cool!
>> >>
>> >> I did using this example
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> because I don't have a keyed stream on the specific operator that I want to
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>
>> >> Because I need to test it in an integration test I am using a side
>> output (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> to attach a sink. I am not sure if you have a better idea to test the
>> restarts on an integration test. If you have a simple idea please tell me
>> :). This was the way that I solved....
>> >>
>> >> Thanks
>> >> Felipe
>> >>
>> >> --
>> >> -- Felipe Gutierrez
>> >> -- skype: felipe.o.gutierrez
>> >>
>> >>
>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >>>
>> >>> Hi Felipe,
>> >>>
>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>> that depending on the configuration only a pipeline region can be
>> >>> restarted, not the whole job).
>> >>>
>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>
>> >>> [1]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>
>> >>> [2]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>>
>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>> <felipe.o.gutier...@gmail.com> wrote:
>> >>> >
>> >>> > Hello community,
>> >>> >
>> >>> > Is it possible to know programmatically how many times my Flink
>> stream job restarted since it was running?
>> >>> >
>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>> recover the state and after 2 seconds I don't throw any exception anymore.
>> Then I would like to know how many times the job was restarted.
>> >>> >
>> >>> > Thanks,
>> >>> > Felipe
>> >>> >
>>
>

Reply via email to