You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> I just realised that only the ProcessFunction is enough. I don't need the 
> CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <felipe.o.gutier...@gmail.com> 
> wrote:
>>
>> Cool!
>>
>> I did using this example 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>  because I don't have a keyed stream on the specific operator that I want to 
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>  to attach a sink. I am not sure if you have a better idea to test the 
>> restarts on an integration test. If you have a simple idea please tell me 
>> :). This was the way that I solved....
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> <felipe.o.gutier...@gmail.com> wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream 
>>> > job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I 
>>> > throw one exception in a MapFunction for a given time, i.e.: for the 2 
>>> > seconds ahead. Because Flink restarts the job and I have checkpoint I can 
>>> > recover the state and after 2 seconds I don't throw any exception 
>>> > anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >

Reply via email to