Re: How to know (in code) how many times the job restarted?

2021-06-21 Thread Roman Khachatryan
Just to clarify, I was using isRestored() but I think
getAttemptNumber() should be simpler.

Regards,
Roman


On Mon, Jun 21, 2021 at 10:30 AM Felipe Gutierrez
 wrote:
>
> ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was 
> using the "isRestored()". Now it is counting.
> thanks!
> Felipe
>
>
> On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan  wrote:
>>
>> > do you mean inside the processElement() method?
>> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.
>>
>> > what is 0ms pause? do you mean 
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>> Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
>>
>> > How do you create a random exception? do you mean not mine 
>> > SimulatedException?
>> I mean it should be thrown at random because the checkpoint must
>> reliably precede it. So on recovery that there is some state. Checking
>> against ... only once assumes that the checkpoint was triggered
>> before. Besides, checkpoint is not guaranteed to be triggered before
>> the end of input.
>> I tried to run it in with maven and it worked after making the source 
>> infinite.
>>
>> From the code you provided the parallelism level doesn't seem
>> important and can be set 1 (or restart strategy to full). Then using
>> getRuntimeContext().getAttemptNumber() would be simpler and more
>> reliable.
>>
>> Regards,
>> Roman
>>
>> On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez
>>  wrote:
>> >
>> >
>> >
>> > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan  wrote:
>> >>
>> >> I tried to run the test that you mentioned
>> >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
>> >> 6f08d0a.
>> >>
>> >> In IDE, I see that:
>> >> - checkpoint is never triggered (sentence is too short, checkpoint
>> >> pause and interval are too large)
>> >> - exception is never thrown, so the job never restarted
>> >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>> >>
>> >> When I add sleep between each element, set 10ms interval, 0ms pause
>> >> and introduce some random exception, I do see
>> >
>> >
>> > do you mean inside the processElement() method?
>> >
>> > what is 0ms pause? do you mean 
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>> >
>> > How do you create a random exception? do you mean not mine 
>> > SimulatedException?
>> >
>> > Using these configurations that I just said it is not working for me. I am 
>> > testing on the terminal "mvn 
>> > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". 
>> > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: 
>> > latencyTrackingConfigBuilder" when I call "env.execute();"
>> >
>> > org.apache.flink.runtime.client.JobExecutionException: Job execution 
>> > failed.
>> > at 
>> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> > at 
>> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
>> > at 
>> > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> > at 
>> > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
>> > at 
>> > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
>> > at 
>> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
>> > at 
>> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
>> > at 
>> > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at 
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at 
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at 
>> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> > at 
>> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> > at 
>> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> > at 
>> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> > at 
>> > 

Re: How to know (in code) how many times the job restarted?

2021-06-21 Thread Felipe Gutierrez
ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was
using the "isRestored()". Now it is counting.
thanks!
Felipe


On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan  wrote:

> > do you mean inside the processElement() method?
> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.
>
> > what is 0ms pause? do you mean
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
> Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
>
> > How do you create a random exception? do you mean not mine
> SimulatedException?
> I mean it should be thrown at random because the checkpoint must
> reliably precede it. So on recovery that there is some state. Checking
> against ... only once assumes that the checkpoint was triggered
> before. Besides, checkpoint is not guaranteed to be triggered before
> the end of input.
> I tried to run it in with maven and it worked after making the source
> infinite.
>
> From the code you provided the parallelism level doesn't seem
> important and can be set 1 (or restart strategy to full). Then using
> getRuntimeContext().getAttemptNumber() would be simpler and more
> reliable.
>
> Regards,
> Roman
>
> On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez
>  wrote:
> >
> >
> >
> > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan 
> wrote:
> >>
> >> I tried to run the test that you mentioned
> >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
> >> 6f08d0a.
> >>
> >> In IDE, I see that:
> >> - checkpoint is never triggered (sentence is too short, checkpoint
> >> pause and interval are too large)
> >> - exception is never thrown, so the job never restarted
> >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
> >>
> >> When I add sleep between each element, set 10ms interval, 0ms pause
> >> and introduce some random exception, I do see
> >
> >
> > do you mean inside the processElement() method?
> >
> > what is 0ms pause? do you mean
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
> >
> > How do you create a random exception? do you mean not mine
> SimulatedException?
> >
> > Using these configurations that I just said it is not working for me. I
> am testing on the terminal "mvn
> -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test".
> On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError:
> latencyTrackingConfigBuilder" when I call "env.execute();"
> >
> > org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> > at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> > at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> > at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> > at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> > at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
> > at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
> > at
> org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> > at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> > at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> > at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> > at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> > at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> > at 

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Roman Khachatryan
> do you mean inside the processElement() method?
I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.

> what is 0ms pause? do you mean 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);

> How do you create a random exception? do you mean not mine SimulatedException?
I mean it should be thrown at random because the checkpoint must
reliably precede it. So on recovery that there is some state. Checking
against ... only once assumes that the checkpoint was triggered
before. Besides, checkpoint is not guaranteed to be triggered before
the end of input.
I tried to run it in with maven and it worked after making the source infinite.

>From the code you provided the parallelism level doesn't seem
important and can be set 1 (or restart strategy to full). Then using
getRuntimeContext().getAttemptNumber() would be simpler and more
reliable.

Regards,
Roman

On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez
 wrote:
>
>
>
> On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan  wrote:
>>
>> I tried to run the test that you mentioned
>> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
>> 6f08d0a.
>>
>> In IDE, I see that:
>> - checkpoint is never triggered (sentence is too short, checkpoint
>> pause and interval are too large)
>> - exception is never thrown, so the job never restarted
>> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>>
>> When I add sleep between each element, set 10ms interval, 0ms pause
>> and introduce some random exception, I do see
>
>
> do you mean inside the processElement() method?
>
> what is 0ms pause? do you mean 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>
> How do you create a random exception? do you mean not mine SimulatedException?
>
> Using these configurations that I just said it is not working for me. I am 
> testing on the terminal "mvn 
> -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On 
> IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: 
> latencyTrackingConfigBuilder" when I call "env.execute();"
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> at 
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
> at 
> org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Felipe Gutierrez
On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan  wrote:

> I tried to run the test that you mentioned
> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
> 6f08d0a.
>
> In IDE, I see that:
> - checkpoint is never triggered (sentence is too short, checkpoint
> pause and interval are too large)
> - exception is never thrown, so the job never restarted
> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>
> When I add sleep between each element, set 10ms interval, 0ms pause
> and introduce some random exception, I do see
>

do you mean inside the processElement() method?

what is 0ms pause? do you
mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?

How do you create a random exception? do you mean not
mine SimulatedException?

Using these configurations that I just said it is not working for me. I am
testing on the terminal "mvn
-Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test".
On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError:
latencyTrackingConfigBuilder" when I call "env.execute();"

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
at
org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder




> 2021-06-18 17:37:53,241 INFO
> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
> Attempts restart: 1
> in the logs.
>
> These settings probably differ on the cluster and there is some
> unrelated exception which causes a restart.
>
> Regards,
> Roman
>
> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
>  wrote:
> >
> > I investigated a little bit more. I created the same POC on a Flink
> version 1.13. I have this ProcessFunction where I want to count the times
> it recovers. I tested with ListState and ValueState and it seems that
> during the integration test (only for integration test) the process is
> hanging on the open() or on the initializeState() methods.
> >

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Felipe Gutierrez
I investigated a little bit more. I created the same POC on a Flink version
1.13. I have this ProcessFunction where I want to count the times it
recovers. I tested with ListState and ValueState and it seems that during
the integration test (only for integration test) the process is hanging on
the open() or on the initializeState() methods.

https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java

Here is my integration test:
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154

If I comment out the state ListState or the ValueState, the integration
test works. It first throws an exception and I see it on the output. Then
after 2 seconds it does not throw exceptions. Then I compare the output and
they match. But I don't have a way to count how many restarts happened
during the integration test. I con only count the restarts when I run the
application on a stand-alone flink cluster.

I don't know. Maybe, do I have to configure some parameters to work with
the state on integration tests?

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> No, it didn't work.
>
> The "context.isRestored()" returns true when I run the application on the
> Flink standalone-cluster and it is recovering after a failure. When I do
> the same on a integration test it does not returns true after a failure. I
> mean, I can log the exception that is causing the failure, the
> initializeState() is called after a failure, but the context.isRestored()
> is false again. I also tried to update the state on the first time to 0 "if
> (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
> I think the problem is not on the ListState that I am using and not on the
> context.isRestore() as well. It is on the "context.getOperatorStateStore()"
> that is always null only on integration tests. Using the below code I can
> see on the logs "restarts: 0" twice, before and after failure.
>
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> // unit tests does not open OperatorStateStore
> if (context.getOperatorStateStore() != null) {
> restartsState =
> context.getOperatorStateStore().getListState(new
> ListStateDescriptor("restarts", Long.class));
>
> List restoreList =
> Lists.newArrayList(restartsState.get());
> if (restoreList == null || restoreList.isEmpty()) {
> restartsState.add(0L);
> LOG.info("restarts: 0");
> } else {
> Long max = Collections.max(restoreList);
> LOG.info("restarts: " + max);
> restartsState.add(max + 1);
> }
> }
> }
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan 
> wrote:
>
>> Thanks for sharing,
>>
>> I think the problem is that restartsState is never updated:
>> - on the first attempt, context.isRestored() returns false (and "never
>> restored" is logged)
>> - on subsequent attempts, it again returns false, because the state
>> was never updated before
>>
>> Adding
>> if (!context.isRestored()) { restartsState.add(0L); }
>> should solve the problem
>> (it's also better to use state.update instead of state.add if only max
>> is needed).
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>>  wrote:
>> >
>> > Sure, here it is. Nothing is mocked. I double-checked.
>> >
>> > UnitTestClass {.
>> > protected static LocalFlinkMiniCluster flink;
>> >
>> > @BeforeClass
>> > public static void prepare() {
>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>> > flink.start();
>> >
>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>> > }
>> >
>> > private static Configuration getFlinkConfiguration() {
>> > Configuration flinkConfig = new Configuration();
>> > flinkConfig.setInteger("local.number-taskmanager", 1);
>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>> > try {
>> > flinkConfig.setString("state.checkpoints.dir", "file://" +
>> tempFolder.newFolder().getAbsolutePath());
>> > } catch (IOException e) {
>> > throw new RuntimeException("error in flink cluster config", e);
>> > }
>> > return flinkConfig;
>> > }
>> >
>> >
>> > The class that I check if the job was restarted:
>> >
>> > public class ExceptionSimulatorProcessFunction extends
>> ProcessFunction
>> > 

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Felipe Gutierrez
No, it didn't work.

The "context.isRestored()" returns true when I run the application on the
Flink standalone-cluster and it is recovering after a failure. When I do
the same on a integration test it does not returns true after a failure. I
mean, I can log the exception that is causing the failure, the
initializeState() is called after a failure, but the context.isRestored()
is false again. I also tried to update the state on the first time to 0 "if
(!context.isRestored()) { restartsState.add(0L); }" and it does not work.
I think the problem is not on the ListState that I am using and not on the
context.isRestore() as well. It is on the "context.getOperatorStateStore()"
that is always null only on integration tests. Using the below code I can
see on the logs "restarts: 0" twice, before and after failure.

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
// unit tests does not open OperatorStateStore
if (context.getOperatorStateStore() != null) {
restartsState =
context.getOperatorStateStore().getListState(new
ListStateDescriptor("restarts", Long.class));

List restoreList =
Lists.newArrayList(restartsState.get());
if (restoreList == null || restoreList.isEmpty()) {
restartsState.add(0L);
LOG.info("restarts: 0");
} else {
Long max = Collections.max(restoreList);
LOG.info("restarts: " + max);
restartsState.add(max + 1);
}
}
}

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan  wrote:

> Thanks for sharing,
>
> I think the problem is that restartsState is never updated:
> - on the first attempt, context.isRestored() returns false (and "never
> restored" is logged)
> - on subsequent attempts, it again returns false, because the state
> was never updated before
>
> Adding
> if (!context.isRestored()) { restartsState.add(0L); }
> should solve the problem
> (it's also better to use state.update instead of state.add if only max
> is needed).
>
> Regards,
> Roman
>
> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>  wrote:
> >
> > Sure, here it is. Nothing is mocked. I double-checked.
> >
> > UnitTestClass {.
> > protected static LocalFlinkMiniCluster flink;
> >
> > @BeforeClass
> > public static void prepare() {
> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
> > flink.start();
> >
> > TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> > }
> >
> > private static Configuration getFlinkConfiguration() {
> > Configuration flinkConfig = new Configuration();
> > flinkConfig.setInteger("local.number-taskmanager", 1);
> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
> > try {
> > flinkConfig.setString("state.checkpoints.dir", "file://" +
> tempFolder.newFolder().getAbsolutePath());
> > } catch (IOException e) {
> > throw new RuntimeException("error in flink cluster config", e);
> > }
> > return flinkConfig;
> > }
> >
> >
> > The class that I check if the job was restarted:
> >
> > public class ExceptionSimulatorProcessFunction extends
> ProcessFunction
> > implements CheckpointedFunction {
> >
> > final OutputTag outputTag = new OutputTag("side-output")
> {
> > };
> > private transient ListState restartsState;
> > private Long restartsLocal;
> > ...
> > @Override
> > public void processElement(Object value, Context ctx,
> Collector out) throws Exception {
> > this.currentTimeMillis = System.currentTimeMillis() -
> currentTimeMillisBehind;
> >
> > // If current time is less than the reference time ahead AND we
> have the poison auction an exception will throw
> > if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
> POISON__TRANSACTION_ID.equals(value.toString())) {
> >
> > LOG.error("This exception will trigger until the reference
> time [{}] reaches the trigger time [{}]",
> > sdfMillis.format(new Date(this.currentTimeMillis)),
> > sdfMillis.format(new
> Date(this.referenceTimeMillisAhead)));
> >
> > throw new SimulatedException("Transaction ID: " +
> value.toString() +
> > " not allowed. This is a simple exception for
> testing purposes.");
> > }
> > out.collect(value);
> >
> >
> > // counts the restarts
> > if (restartsState != null) {
> > List restoreList =
> Lists.newArrayList(restartsState.get());
> > Long attemptsRestart = 0L;
> > if (restoreList != null && !restoreList.isEmpty()) {
> > attemptsRestart = Collections.max(restoreList);
> >   

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
 wrote:
>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
> flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
> flink.start();
>
> TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
> Configuration flinkConfig = new Configuration();
> flinkConfig.setInteger("local.number-taskmanager", 1);
> flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
> flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
> flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
> try {
> flinkConfig.setString("state.checkpoints.dir", "file://" + 
> tempFolder.newFolder().getAbsolutePath());
> } catch (IOException e) {
> throw new RuntimeException("error in flink cluster config", e);
> }
> return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends 
> ProcessFunction
> implements CheckpointedFunction {
>
> final OutputTag outputTag = new OutputTag("side-output") {
> };
> private transient ListState restartsState;
> private Long restartsLocal;
> ...
> @Override
> public void processElement(Object value, Context ctx, Collector 
> out) throws Exception {
> this.currentTimeMillis = System.currentTimeMillis() - 
> currentTimeMillisBehind;
>
> // If current time is less than the reference time ahead AND we have 
> the poison auction an exception will throw
> if (this.currentTimeMillis < this.referenceTimeMillisAhead && 
> POISON__TRANSACTION_ID.equals(value.toString())) {
>
> LOG.error("This exception will trigger until the reference time 
> [{}] reaches the trigger time [{}]",
> sdfMillis.format(new Date(this.currentTimeMillis)),
> sdfMillis.format(new 
> Date(this.referenceTimeMillisAhead)));
>
> throw new SimulatedException("Transaction ID: " + 
> value.toString() +
> " not allowed. This is a simple exception for testing 
> purposes.");
> }
> out.collect(value);
>
>
> // counts the restarts
> if (restartsState != null) {
> List restoreList = Lists.newArrayList(restartsState.get());
> Long attemptsRestart = 0L;
> if (restoreList != null && !restoreList.isEmpty()) {
> attemptsRestart = Collections.max(restoreList);
> if (restartsLocal < attemptsRestart) {
> restartsLocal = attemptsRestart;
> ctx.output(outputTag, Long.valueOf(attemptsRestart));
> }
> }
> LOG.info("Attempts restart: " + attemptsRestart);
> }
> }
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {}
>
> @Override
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
> restartsState = context.getOperatorStateStore().getListState(new 
> ListStateDescriptor("restarts", Long.class));
>
> if (context.isRestored()) {
> List restoreList = Lists.newArrayList(restartsState.get());
> if (restoreList == null || restoreList.isEmpty()) {
> restartsState.add(1L);
> LOG.info("restarts: 1");
> } else {
> Long max = Collections.max(restoreList);
> LOG.info("restarts: " + max);
> restartsState.add(max + 1);
> }
> } else {
> LOG.info("restarts: never restored");
> }
> }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>>  wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
>> > cluster and it returns true when the application recovers. However, in 
>> > 

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Felipe Gutierrez
Sure, here it is. Nothing is mocked. I double-checked.

UnitTestClass {.
protected static LocalFlinkMiniCluster flink;

@BeforeClass
public static void prepare() {
flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
flink.start();

TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}

private static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger("local.number-taskmanager", 1);
flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
try {
flinkConfig.setString("state.checkpoints.dir", "file://" +
tempFolder.newFolder().getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("error in flink cluster config", e);
}
return flinkConfig;
}


The class that I check if the job was restarted:

public class ExceptionSimulatorProcessFunction extends
ProcessFunction
implements CheckpointedFunction {

final OutputTag outputTag = new OutputTag("side-output") {
};
private transient ListState restartsState;
private Long restartsLocal;
...
@Override
public void processElement(Object value, Context ctx, Collector
out) throws Exception {
this.currentTimeMillis = System.currentTimeMillis() -
currentTimeMillisBehind;

// If current time is less than the reference time ahead AND we
have the poison auction an exception will throw
if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
POISON__TRANSACTION_ID.equals(value.toString())) {

LOG.error("This exception will trigger until the reference time
[{}] reaches the trigger time [{}]",
sdfMillis.format(new Date(this.currentTimeMillis)),
sdfMillis.format(new
Date(this.referenceTimeMillisAhead)));

throw new SimulatedException("Transaction ID: " +
value.toString() +
" not allowed. This is a simple exception for testing
purposes.");
}
out.collect(value);


// counts the restarts
if (restartsState != null) {
List restoreList =
Lists.newArrayList(restartsState.get());
Long attemptsRestart = 0L;
if (restoreList != null && !restoreList.isEmpty()) {
attemptsRestart = Collections.max(restoreList);
if (restartsLocal < attemptsRestart) {
restartsLocal = attemptsRestart;
ctx.output(outputTag, Long.valueOf(attemptsRestart));
}
}
LOG.info("Attempts restart: " + attemptsRestart);
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {}

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
restartsState = context.getOperatorStateStore().getListState(new
ListStateDescriptor("restarts", Long.class));

if (context.isRestored()) {
List restoreList =
Lists.newArrayList(restartsState.get());
if (restoreList == null || restoreList.isEmpty()) {
restartsState.add(1L);
LOG.info("restarts: 1");
} else {
Long max = Collections.max(restoreList);
LOG.info("restarts: " + max);
restartsState.add(max + 1);
}
} else {
LOG.info("restarts: never restored");
}
}
}








On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please share the test code?
>
> I think the returned value might depend on the level on which the
> tests are executed. If it's a regular job then it should return the
> correct value (as with cluster). If the environment in which the code
> is executed is mocked then it can be false.
>
> Regards,
> Roman
>
> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>  wrote:
> >
> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone
> cluster and it returns true when the application recovers. However, in
> integration tests it does not returns true. I am using Flink 1.4. Do you
> know where it is saying at Flink release 1.13 (
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
> cannot see `isRestored()` equals true on integration tests?
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> >
> >
> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:
> >>
> >> Does your ProcessFunction has state? If not it would be in line with
> the documentation.
> >>
> >> Also which Flink version are you using? Before Flink 1.13 empty state
> was omitted so I could imagine that `isRestored()` would return false but
> it should actually now also return true for empty state.
> >>
> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
Hi,

Could you please share the test code?

I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.

Regards,
Roman

On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
 wrote:
>
> Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
> cluster and it returns true when the application recovers. However, in 
> integration tests it does not returns true. I am using Flink 1.4. Do you know 
> where it is saying at Flink release 1.13 
> (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot 
> see `isRestored()` equals true on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:
>>
>> Does your ProcessFunction has state? If not it would be in line with the 
>> documentation.
>>
>> Also which Flink version are you using? Before Flink 1.13 empty state was 
>> omitted so I could imagine that `isRestored()` would return false but it 
>> should actually now also return true for empty state.
>>
>> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>>  wrote:
>>>
>>> So, I was trying to improve by using the CheckpointedFunction as it shows 
>>> here [1]. But the method isRestored() says in its documentation [2]:
>>>
>>> "Returns true, if state was restored from the snapshot of a previous 
>>> execution. This returns always false for stateless tasks."
>>>
>>> It is weird because I am extending a ProcessFunction which is a 
>>> RichFunction.
>>>
>>> public class AuctionExceptionSimulatorProcessFunction extends 
>>> ProcessFunction
>>> implements CheckpointedFunction {
>>> ...
>>>
>>> In the end, I cannot rely on the "isRestored()". Do you know what could be 
>>> wrong? I used the same implementation method of [1].
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>>
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>>
>>>
>>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan  wrote:

 You can also use accumulators [1] to collect the number of restarts
 (and then access it via client); but side outputs should work as well.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

 Regards,
 Roman

 On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
  wrote:
 >
 > I just realised that only the ProcessFunction is enough. I don't need 
 > the CheckpointFunction.
 >
 >
 > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
 >  wrote:
 >>
 >> Cool!
 >>
 >> I did using this example 
 >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
 >>  because I don't have a keyed stream on the specific operator that I 
 >> want to count the number of restarts. (yes I am using version 1.4 
 >> unfortunately).
 >>
 >> Because I need to test it in an integration test I am using a side 
 >> output 
 >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
 >>  to attach a sink. I am not sure if you have a better idea to test the 
 >> restarts on an integration test. If you have a simple idea please tell 
 >> me :). This was the way that I solved
 >>
 >> Thanks
 >> Felipe
 >>
 >> --
 >> -- Felipe Gutierrez
 >> -- skype: felipe.o.gutierrez
 >>
 >>
 >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan  
 >> wrote:
 >>>
 >>> Hi Felipe,
 >>>
 >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
 >>> that depending on the configuration only a pipeline region can be
 >>> restarted, not the whole job).
 >>>
 >>> But if all you want is to check whether it's a first attempt or not,
 >>> you can also call context.isRestored() from initializeState() [2]
 >>>
 >>> [1]
 >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
 >>>
 >>> [2]
 >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
 >>>
 >>> Regards,
 >>> Roman
 >>>
 >>>
 >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
 >>>  wrote:
 >>> >
 >>> > Hello community,
 >>> >
 >>> > Is it possible to know programmatically how many times my Flink 
 >>> > stream job restarted 

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Felipe Gutierrez
Yes, I have state on the ProcessFunction. I tested it on a stand-alone
cluster and it returns true when the application recovers. However, in
integration tests it does not returns true. I am using Flink 1.4. Do you
know where it is saying at Flink release 1.13 (
https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot
see `isRestored()` equals true on integration tests?

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:

> Does your ProcessFunction has state? If not it would be in line with the
> documentation.
>
> Also which Flink version are you using? Before Flink 1.13 empty state was
> omitted so I could imagine that `isRestored()` would return false but it
> should actually now also return true for empty state.
>
> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> So, I was trying to improve by using the CheckpointedFunction as it shows
>> here [1]. But the method isRestored() says in its documentation [2]:
>>
>> "Returns true, if state was restored from the snapshot of a previous
>> execution. This returns always false for stateless tasks."
>>
>> It is weird because I am extending a ProcessFunction which is a
>> RichFunction.
>>
>> public class AuctionExceptionSimulatorProcessFunction extends
>> ProcessFunction
>> implements CheckpointedFunction {
>> ...
>>
>> In the end, I cannot rely on the "isRestored()". Do you know what could
>> be wrong? I used the same implementation method of [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>
>>
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>>
>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan 
>> wrote:
>>
>>> You can also use accumulators [1] to collect the number of restarts
>>> (and then access it via client); but side outputs should work as well.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>
>>> Regards,
>>> Roman
>>>
>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>>  wrote:
>>> >
>>> > I just realised that only the ProcessFunction is enough. I don't need
>>> the CheckpointFunction.
>>> >
>>> >
>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>>> felipe.o.gutier...@gmail.com> wrote:
>>> >>
>>> >> Cool!
>>> >>
>>> >> I did using this example
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>> because I don't have a keyed stream on the specific operator that I want to
>>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>>> >>
>>> >> Because I need to test it in an integration test I am using a side
>>> output (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>> to attach a sink. I am not sure if you have a better idea to test the
>>> restarts on an integration test. If you have a simple idea please tell me
>>> :). This was the way that I solved
>>> >>
>>> >> Thanks
>>> >> Felipe
>>> >>
>>> >> --
>>> >> -- Felipe Gutierrez
>>> >> -- skype: felipe.o.gutierrez
>>> >>
>>> >>
>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>>> wrote:
>>> >>>
>>> >>> Hi Felipe,
>>> >>>
>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> >>> that depending on the configuration only a pipeline region can be
>>> >>> restarted, not the whole job).
>>> >>>
>>> >>> But if all you want is to check whether it's a first attempt or not,
>>> >>> you can also call context.isRestored() from initializeState() [2]
>>> >>>
>>> >>> [1]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >>>
>>> >>> [2]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>>
>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >>>  wrote:
>>> >>> >
>>> >>> > Hello community,
>>> >>> >
>>> >>> > Is it possible to know programmatically how many times my Flink
>>> stream job restarted since it was running?
>>> >>> >
>>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>>> recover the state and after 2 seconds I don't throw any exception anymore.
>>> Then I would like to know how many times the job was restarted.
>>> >>> >
>>> >>> > Thanks,
>>> >>> > Felipe
>>> >>> >
>>>
>>


Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Arvid Heise
Does your ProcessFunction has state? If not it would be in line with the
documentation.

Also which Flink version are you using? Before Flink 1.13 empty state was
omitted so I could imagine that `isRestored()` would return false but it
should actually now also return true for empty state.

On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> So, I was trying to improve by using the CheckpointedFunction as it shows
> here [1]. But the method isRestored() says in its documentation [2]:
>
> "Returns true, if state was restored from the snapshot of a previous
> execution. This returns always false for stateless tasks."
>
> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
>
> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction
> implements CheckpointedFunction {
> ...
>
> In the end, I cannot rely on the "isRestored()". Do you know what could be
> wrong? I used the same implementation method of [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan 
> wrote:
>
>> You can also use accumulators [1] to collect the number of restarts
>> (and then access it via client); but side outputs should work as well.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>
>> Regards,
>> Roman
>>
>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>  wrote:
>> >
>> > I just realised that only the ProcessFunction is enough. I don't need
>> the CheckpointFunction.
>> >
>> >
>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>> felipe.o.gutier...@gmail.com> wrote:
>> >>
>> >> Cool!
>> >>
>> >> I did using this example
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> because I don't have a keyed stream on the specific operator that I want to
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>
>> >> Because I need to test it in an integration test I am using a side
>> output (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> to attach a sink. I am not sure if you have a better idea to test the
>> restarts on an integration test. If you have a simple idea please tell me
>> :). This was the way that I solved
>> >>
>> >> Thanks
>> >> Felipe
>> >>
>> >> --
>> >> -- Felipe Gutierrez
>> >> -- skype: felipe.o.gutierrez
>> >>
>> >>
>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>> wrote:
>> >>>
>> >>> Hi Felipe,
>> >>>
>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>> that depending on the configuration only a pipeline region can be
>> >>> restarted, not the whole job).
>> >>>
>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>
>> >>> [1]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>
>> >>> [2]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>>
>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>  wrote:
>> >>> >
>> >>> > Hello community,
>> >>> >
>> >>> > Is it possible to know programmatically how many times my Flink
>> stream job restarted since it was running?
>> >>> >
>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>> recover the state and after 2 seconds I don't throw any exception anymore.
>> Then I would like to know how many times the job was restarted.
>> >>> >
>> >>> > Thanks,
>> >>> > Felipe
>> >>> >
>>
>


Re: How to know (in code) how many times the job restarted?

2021-06-15 Thread Felipe Gutierrez
So, I was trying to improve by using the CheckpointedFunction as it shows
here [1]. But the method isRestored() says in its documentation [2]:

"Returns true, if state was restored from the snapshot of a previous
execution. This returns always false for stateless tasks."

It is weird because I am extending a ProcessFunction which is a
RichFunction.

public class AuctionExceptionSimulatorProcessFunction extends
ProcessFunction
implements CheckpointedFunction {
...

In the end, I cannot rely on the "isRestored()". Do you know what could be
wrong? I used the same implementation method of [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--


*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan  wrote:

> You can also use accumulators [1] to collect the number of restarts
> (and then access it via client); but side outputs should work as well.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>
> Regards,
> Roman
>
> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>  wrote:
> >
> > I just realised that only the ProcessFunction is enough. I don't need
> the CheckpointFunction.
> >
> >
> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
> felipe.o.gutier...@gmail.com> wrote:
> >>
> >> Cool!
> >>
> >> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
> >>
> >> Because I need to test it in an integration test I am using a side
> output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved
> >>
> >> Thanks
> >> Felipe
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >>
> >>
> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
> wrote:
> >>>
> >>> Hi Felipe,
> >>>
> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
> >>> that depending on the configuration only a pipeline region can be
> >>> restarted, not the whole job).
> >>>
> >>> But if all you want is to check whether it's a first attempt or not,
> >>> you can also call context.isRestored() from initializeState() [2]
> >>>
> >>> [1]
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
> >>>
> >>> [2]
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>>
> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
> >>>  wrote:
> >>> >
> >>> > Hello community,
> >>> >
> >>> > Is it possible to know programmatically how many times my Flink
> stream job restarted since it was running?
> >>> >
> >>> > My use case is like this. I have an Unit test that uses checkpoint
> and I throw one exception in a MapFunction for a given time, i.e.: for the
> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
> recover the state and after 2 seconds I don't throw any exception anymore.
> Then I would like to know how many times the job was restarted.
> >>> >
> >>> > Thanks,
> >>> > Felipe
> >>> >
>


Re: How to know (in code) how many times the job restarted?

2021-06-14 Thread Roman Khachatryan
You can also use accumulators [1] to collect the number of restarts
(and then access it via client); but side outputs should work as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

Regards,
Roman

On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
 wrote:
>
> I just realised that only the ProcessFunction is enough. I don't need the 
> CheckpointFunction.
>
>
> On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez,  
> wrote:
>>
>> Cool!
>>
>> I did using this example 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>  because I don't have a keyed stream on the specific operator that I want to 
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>>
>> Because I need to test it in an integration test I am using a side output 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>  to attach a sink. I am not sure if you have a better idea to test the 
>> restarts on an integration test. If you have a simple idea please tell me 
>> :). This was the way that I solved
>>
>> Thanks
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan  wrote:
>>>
>>> Hi Felipe,
>>>
>>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> that depending on the configuration only a pipeline region can be
>>> restarted, not the whole job).
>>>
>>> But if all you want is to check whether it's a first attempt or not,
>>> you can also call context.isRestored() from initializeState() [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>>  wrote:
>>> >
>>> > Hello community,
>>> >
>>> > Is it possible to know programmatically how many times my Flink stream 
>>> > job restarted since it was running?
>>> >
>>> > My use case is like this. I have an Unit test that uses checkpoint and I 
>>> > throw one exception in a MapFunction for a given time, i.e.: for the 2 
>>> > seconds ahead. Because Flink restarts the job and I have checkpoint I can 
>>> > recover the state and after 2 seconds I don't throw any exception 
>>> > anymore. Then I would like to know how many times the job was restarted.
>>> >
>>> > Thanks,
>>> > Felipe
>>> >


Re: How to know (in code) how many times the job restarted?

2021-06-13 Thread Felipe Gutierrez
I just realised that only the ProcessFunction is enough. I don't need the
CheckpointFunction.


On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
wrote:

> Cool!
>
> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
>
> Because I need to test it in an integration test I am using a side output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved
>
> Thanks
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
> wrote:
>
>> Hi Felipe,
>>
>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> that depending on the configuration only a pipeline region can be
>> restarted, not the whole job).
>>
>> But if all you want is to check whether it's a first attempt or not,
>> you can also call context.isRestored() from initializeState() [2]
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>
>> [2]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>  wrote:
>> >
>> > Hello community,
>> >
>> > Is it possible to know programmatically how many times my Flink stream
>> job restarted since it was running?
>> >
>> > My use case is like this. I have an Unit test that uses checkpoint and
>> I throw one exception in a MapFunction for a given time, i.e.: for the 2
>> seconds ahead. Because Flink restarts the job and I have checkpoint I can
>> recover the state and after 2 seconds I don't throw any exception anymore.
>> Then I would like to know how many times the job was restarted.
>> >
>> > Thanks,
>> > Felipe
>> >
>>
>


Re: How to know (in code) how many times the job restarted?

2021-06-11 Thread Felipe Gutierrez
Cool!

I did using this example
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
because I don't have a keyed stream on the specific operator that I want to
count the number of restarts. (yes I am using version 1.4 unfortunately).

Because I need to test it in an integration test I am using a side output (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
to attach a sink. I am not sure if you have a better idea to test the
restarts on an integration test. If you have a simple idea please tell me
:). This was the way that I solved

Thanks
Felipe

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan  wrote:

> Hi Felipe,
>
> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
> that depending on the configuration only a pipeline region can be
> restarted, not the whole job).
>
> But if all you want is to check whether it's a first attempt or not,
> you can also call context.isRestored() from initializeState() [2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>
> Regards,
> Roman
>
>
> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>  wrote:
> >
> > Hello community,
> >
> > Is it possible to know programmatically how many times my Flink stream
> job restarted since it was running?
> >
> > My use case is like this. I have an Unit test that uses checkpoint and I
> throw one exception in a MapFunction for a given time, i.e.: for the 2
> seconds ahead. Because Flink restarts the job and I have checkpoint I can
> recover the state and after 2 seconds I don't throw any exception anymore.
> Then I would like to know how many times the job was restarted.
> >
> > Thanks,
> > Felipe
> >
>


Re: How to know (in code) how many times the job restarted?

2021-06-10 Thread Roman Khachatryan
Hi Felipe,

You can use getRuntimeContext().getAttemptNumber() [1] (but beware
that depending on the configuration only a pipeline region can be
restarted, not the whole job).

But if all you want is to check whether it's a first attempt or not,
you can also call context.isRestored() from initializeState() [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--

Regards,
Roman


On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
 wrote:
>
> Hello community,
>
> Is it possible to know programmatically how many times my Flink stream job 
> restarted since it was running?
>
> My use case is like this. I have an Unit test that uses checkpoint and I 
> throw one exception in a MapFunction for a given time, i.e.: for the 2 
> seconds ahead. Because Flink restarts the job and I have checkpoint I can 
> recover the state and after 2 seconds I don't throw any exception anymore. 
> Then I would like to know how many times the job was restarted.
>
> Thanks,
> Felipe
>


How to know (in code) how many times the job restarted?

2021-06-10 Thread Felipe Gutierrez
Hello community,

Is it possible to know programmatically how many times my Flink stream job
restarted since it was running?

My use case is like this. I have an Unit test that uses checkpoint and I
throw one exception in a MapFunction for a given time, i.e.: for the 2
seconds ahead. Because Flink restarts the job and I have checkpoint I can
recover the state and after 2 seconds I don't throw any exception anymore.
Then I would like to know how many times the job was restarted.

Thanks,
Felipe