Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.....
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
>     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>     flink.start();
>
>     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
>     Configuration flinkConfig = new Configuration();
>     flinkConfig.setInteger("local.number-taskmanager", 1);
>     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>     try {
>         flinkConfig.setString("state.checkpoints.dir", "file://" + 
> tempFolder.newFolder().getAbsolutePath());
>     } catch (IOException e) {
>         throw new RuntimeException("error in flink cluster config", e);
>     }
>     return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends 
> ProcessFunction<Object..., Object...>
>         implements CheckpointedFunction {
>
>     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>     };
>     private transient ListState<Long> restartsState;
>     private Long restartsLocal;
>     ...
>     @Override
>     public void processElement(Object value, Context ctx, Collector<Object> 
> out) throws Exception {
>         this.currentTimeMillis = System.currentTimeMillis() - 
> currentTimeMillisBehind;
>
>         // If current time is less than the reference time ahead AND we have 
> the poison auction an exception will throw
>         if (this.currentTimeMillis < this.referenceTimeMillisAhead && 
> POISON__TRANSACTION_ID.equals(value.toString())) {
>
>             LOG.error("This exception will trigger until the reference time 
> [{}] reaches the trigger time [{}]",
>                     sdfMillis.format(new Date(this.currentTimeMillis)),
>                     sdfMillis.format(new 
> Date(this.referenceTimeMillisAhead)));
>
>             throw new SimulatedException("Transaction ID: " + 
> value.toString() +
>                     " not allowed. This is a simple exception for testing 
> purposes.");
>         }
>         out.collect(value);
>
>
>         // counts the restarts
>         if (restartsState != null) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             Long attemptsRestart = 0L;
>             if (restoreList != null && !restoreList.isEmpty()) {
>                 attemptsRestart = Collections.max(restoreList);
>                 if (restartsLocal < attemptsRestart) {
>                     restartsLocal = attemptsRestart;
>                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>                 }
>             }
>             LOG.info("Attempts restart: " + attemptsRestart);
>         }
>     }
>
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {}
>
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws 
> Exception {
>         restartsState = context.getOperatorStateStore().getListState(new 
> ListStateDescriptor<Long>("restarts", Long.class));
>
>         if (context.isRestored()) {
>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>             if (restoreList == null || restoreList.isEmpty()) {
>                 restartsState.add(1L);
>                 LOG.info("restarts: 1");
>             } else {
>                 Long max = Collections.max(restoreList);
>                 LOG.info("restarts: " + max);
>                 restartsState.add(max + 1);
>             }
>         } else {
>             LOG.info("restarts: never restored");
>         }
>     }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
>> > cluster and it returns true when the application recovers. However, in 
>> > integration tests it does not returns true. I am using Flink 1.4. Do you 
>> > know where it is saying at Flink release 1.13 
>> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I 
>> > cannot see `isRestored()` equals true on integration tests?
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> >
>> >
>> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote:
>> >>
>> >> Does your ProcessFunction has state? If not it would be in line with the 
>> >> documentation.
>> >>
>> >> Also which Flink version are you using? Before Flink 1.13 empty state was 
>> >> omitted so I could imagine that `isRestored()` would return false but it 
>> >> should actually now also return true for empty state.
>> >>
>> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>> >> <felipe.o.gutier...@gmail.com> wrote:
>> >>>
>> >>> So, I was trying to improve by using the CheckpointedFunction as it 
>> >>> shows here [1]. But the method isRestored() says in its documentation 
>> >>> [2]:
>> >>>
>> >>> "Returns true, if state was restored from the snapshot of a previous 
>> >>> execution. This returns always false for stateless tasks."
>> >>>
>> >>> It is weird because I am extending a ProcessFunction which is a 
>> >>> RichFunction.
>> >>>
>> >>> public class AuctionExceptionSimulatorProcessFunction extends 
>> >>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >>>         implements CheckpointedFunction {
>> >>> ...
>> >>>
>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could 
>> >>> be wrong? I used the same implementation method of [1].
>> >>>
>> >>> [1] 
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >>> [2] 
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>>
>> >>> --
>> >>> -- Felipe Gutierrez
>> >>> -- skype: felipe.o.gutierrez
>> >>>
>> >>>
>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> 
>> >>> wrote:
>> >>>>
>> >>>> You can also use accumulators [1] to collect the number of restarts
>> >>>> (and then access it via client); but side outputs should work as well.
>> >>>>
>> >>>> [1]
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >>>>
>> >>>> Regards,
>> >>>> Roman
>> >>>>
>> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >>>> <felipe.o.gutier...@gmail.com> wrote:
>> >>>> >
>> >>>> > I just realised that only the ProcessFunction is enough. I don't need 
>> >>>> > the CheckpointFunction.
>> >>>> >
>> >>>> >
>> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
>> >>>> > <felipe.o.gutier...@gmail.com> wrote:
>> >>>> >>
>> >>>> >> Cool!
>> >>>> >>
>> >>>> >> I did using this example 
>> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> >>>> >>  because I don't have a keyed stream on the specific operator that I 
>> >>>> >> want to count the number of restarts. (yes I am using version 1.4 
>> >>>> >> unfortunately).
>> >>>> >>
>> >>>> >> Because I need to test it in an integration test I am using a side 
>> >>>> >> output 
>> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> >>>> >>  to attach a sink. I am not sure if you have a better idea to test 
>> >>>> >> the restarts on an integration test. If you have a simple idea 
>> >>>> >> please tell me :). This was the way that I solved....
>> >>>> >>
>> >>>> >> Thanks
>> >>>> >> Felipe
>> >>>> >>
>> >>>> >> --
>> >>>> >> -- Felipe Gutierrez
>> >>>> >> -- skype: felipe.o.gutierrez
>> >>>> >>
>> >>>> >>
>> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> 
>> >>>> >> wrote:
>> >>>> >>>
>> >>>> >>> Hi Felipe,
>> >>>> >>>
>> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>>> >>> that depending on the configuration only a pipeline region can be
>> >>>> >>> restarted, not the whole job).
>> >>>> >>>
>> >>>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>> >>>
>> >>>> >>> [1]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>> >>>
>> >>>> >>> [2]
>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>> >>>
>> >>>> >>> Regards,
>> >>>> >>> Roman
>> >>>> >>>
>> >>>> >>>
>> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
>> >>>> >>> >
>> >>>> >>> > Hello community,
>> >>>> >>> >
>> >>>> >>> > Is it possible to know programmatically how many times my Flink 
>> >>>> >>> > stream job restarted since it was running?
>> >>>> >>> >
>> >>>> >>> > My use case is like this. I have an Unit test that uses 
>> >>>> >>> > checkpoint and I throw one exception in a MapFunction for a given 
>> >>>> >>> > time, i.e.: for the 2 seconds ahead. Because Flink restarts the 
>> >>>> >>> > job and I have checkpoint I can recover the state and after 2 
>> >>>> >>> > seconds I don't throw any exception anymore. Then I would like to 
>> >>>> >>> > know how many times the job was restarted.
>> >>>> >>> >
>> >>>> >>> > Thanks,
>> >>>> >>> > Felipe
>> >>>> >>> >

Reply via email to