Hi,

Could you please share the test code?

I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.

Regards,
Roman

On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
> cluster and it returns true when the application recovers. However, in 
> integration tests it does not returns true. I am using Flink 1.4. Do you know 
> where it is saying at Flink release 1.13 
> (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot 
> see `isRestored()` equals true on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote:
>>
>> Does your ProcessFunction has state? If not it would be in line with the 
>> documentation.
>>
>> Also which Flink version are you using? Before Flink 1.13 empty state was 
>> omitted so I could imagine that `isRestored()` would return false but it 
>> should actually now also return true for empty state.
>>
>> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>> <felipe.o.gutier...@gmail.com> wrote:
>>>
>>> So, I was trying to improve by using the CheckpointedFunction as it shows 
>>> here [1]. But the method isRestored() says in its documentation [2]:
>>>
>>> "Returns true, if state was restored from the snapshot of a previous 
>>> execution. This returns always false for stateless tasks."
>>>
>>> It is weird because I am extending a ProcessFunction which is a 
>>> RichFunction.
>>>
>>> public class AuctionExceptionSimulatorProcessFunction extends 
>>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>>>         implements CheckpointedFunction {
>>> ...
>>>
>>> In the end, I cannot rely on the "isRestored()". Do you know what could be 
>>> wrong? I used the same implementation method of [1].
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>>
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>>
>>>
>>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> wrote:
>>>>
>>>> You can also use accumulators [1] to collect the number of restarts
>>>> (and then access it via client); but side outputs should work as well.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>>> <felipe.o.gutier...@gmail.com> wrote:
>>>> >
>>>> > I just realised that only the ProcessFunction is enough. I don't need 
>>>> > the CheckpointFunction.
>>>> >
>>>> >
>>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
>>>> > <felipe.o.gutier...@gmail.com> wrote:
>>>> >>
>>>> >> Cool!
>>>> >>
>>>> >> I did using this example 
>>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>>> >>  because I don't have a keyed stream on the specific operator that I 
>>>> >> want to count the number of restarts. (yes I am using version 1.4 
>>>> >> unfortunately).
>>>> >>
>>>> >> Because I need to test it in an integration test I am using a side 
>>>> >> output 
>>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>>> >>  to attach a sink. I am not sure if you have a better idea to test the 
>>>> >> restarts on an integration test. If you have a simple idea please tell 
>>>> >> me :). This was the way that I solved....
>>>> >>
>>>> >> Thanks
>>>> >> Felipe
>>>> >>
>>>> >> --
>>>> >> -- Felipe Gutierrez
>>>> >> -- skype: felipe.o.gutierrez
>>>> >>
>>>> >>
>>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> 
>>>> >> wrote:
>>>> >>>
>>>> >>> Hi Felipe,
>>>> >>>
>>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>>> >>> that depending on the configuration only a pipeline region can be
>>>> >>> restarted, not the whole job).
>>>> >>>
>>>> >>> But if all you want is to check whether it's a first attempt or not,
>>>> >>> you can also call context.isRestored() from initializeState() [2]
>>>> >>>
>>>> >>> [1]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>> >>>
>>>> >>> [2]
>>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>> >>>
>>>> >>> Regards,
>>>> >>> Roman
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>>> >>> <felipe.o.gutier...@gmail.com> wrote:
>>>> >>> >
>>>> >>> > Hello community,
>>>> >>> >
>>>> >>> > Is it possible to know programmatically how many times my Flink 
>>>> >>> > stream job restarted since it was running?
>>>> >>> >
>>>> >>> > My use case is like this. I have an Unit test that uses checkpoint 
>>>> >>> > and I throw one exception in a MapFunction for a given time, i.e.: 
>>>> >>> > for the 2 seconds ahead. Because Flink restarts the job and I have 
>>>> >>> > checkpoint I can recover the state and after 2 seconds I don't throw 
>>>> >>> > any exception anymore. Then I would like to know how many times the 
>>>> >>> > job was restarted.
>>>> >>> >
>>>> >>> > Thanks,
>>>> >>> > Felipe
>>>> >>> >

Reply via email to