Re: How to know (in code) how many times the job restarted?
Just to clarify, I was using isRestored() but I think getAttemptNumber() should be simpler. Regards, Roman On Mon, Jun 21, 2021 at 10:30 AM Felipe Gutierrez wrote: > > ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was > using the "isRestored()". Now it is counting. > thanks! > Felipe > > > On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan wrote: >> >> > do you mean inside the processElement() method? >> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. >> >> > what is 0ms pause? do you mean >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? >> Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); >> >> > How do you create a random exception? do you mean not mine >> > SimulatedException? >> I mean it should be thrown at random because the checkpoint must >> reliably precede it. So on recovery that there is some state. Checking >> against ... only once assumes that the checkpoint was triggered >> before. Besides, checkpoint is not guaranteed to be triggered before >> the end of input. >> I tried to run it in with maven and it worked after making the source >> infinite. >> >> From the code you provided the parallelism level doesn't seem >> important and can be set 1 (or restart strategy to full). Then using >> getRuntimeContext().getAttemptNumber() would be simpler and more >> reliable. >> >> Regards, >> Roman >> >> On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez >> wrote: >> > >> > >> > >> > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan wrote: >> >> >> >> I tried to run the test that you mentioned >> >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. >> >> 6f08d0a. >> >> >> >> In IDE, I see that: >> >> - checkpoint is never triggered (sentence is too short, checkpoint >> >> pause and interval are too large) >> >> - exception is never thrown, so the job never restarted >> >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) >> >> >> >> When I add sleep between each element, set 10ms interval, 0ms pause >> >> and introduce some random exception, I do see >> > >> > >> > do you mean inside the processElement() method? >> > >> > what is 0ms pause? do you mean >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? >> > >> > How do you create a random exception? do you mean not mine >> > SimulatedException? >> > >> > Using these configurations that I just said it is not working for me. I am >> > testing on the terminal "mvn >> > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". >> > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: >> > latencyTrackingConfigBuilder" when I call "env.execute();" >> > >> > org.apache.flink.runtime.client.JobExecutionException: Job execution >> > failed. >> > at >> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >> > at >> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) >> > at >> > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >> > at >> > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) >> > at >> > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) >> > at >> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) >> > at >> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) >> > at >> > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >> > at >> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> > at >> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >> > at >> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> > at >> >
Re: How to know (in code) how many times the job restarted?
ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was using the "isRestored()". Now it is counting. thanks! Felipe On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan wrote: > > do you mean inside the processElement() method? > I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. > > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); > > > How do you create a random exception? do you mean not mine > SimulatedException? > I mean it should be thrown at random because the checkpoint must > reliably precede it. So on recovery that there is some state. Checking > against ... only once assumes that the checkpoint was triggered > before. Besides, checkpoint is not guaranteed to be triggered before > the end of input. > I tried to run it in with maven and it worked after making the source > infinite. > > From the code you provided the parallelism level doesn't seem > important and can be set 1 (or restart strategy to full). Then using > getRuntimeContext().getAttemptNumber() would be simpler and more > reliable. > > Regards, > Roman > > On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez > wrote: > > > > > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan > wrote: > >> > >> I tried to run the test that you mentioned > >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. > >> 6f08d0a. > >> > >> In IDE, I see that: > >> - checkpoint is never triggered (sentence is too short, checkpoint > >> pause and interval are too large) > >> - exception is never thrown, so the job never restarted > >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) > >> > >> When I add sleep between each element, set 10ms interval, 0ms pause > >> and introduce some random exception, I do see > > > > > > do you mean inside the processElement() method? > > > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > > > > How do you create a random exception? do you mean not mine > SimulatedException? > > > > Using these configurations that I just said it is not working for me. I > am testing on the terminal "mvn > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: > latencyTrackingConfigBuilder" when I call "env.execute();" > > > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) > > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) > > at > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > > at
Re: How to know (in code) how many times the job restarted?
> do you mean inside the processElement() method? I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); > How do you create a random exception? do you mean not mine SimulatedException? I mean it should be thrown at random because the checkpoint must reliably precede it. So on recovery that there is some state. Checking against ... only once assumes that the checkpoint was triggered before. Besides, checkpoint is not guaranteed to be triggered before the end of input. I tried to run it in with maven and it worked after making the source infinite. >From the code you provided the parallelism level doesn't seem important and can be set 1 (or restart strategy to full). Then using getRuntimeContext().getAttemptNumber() would be simpler and more reliable. Regards, Roman On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez wrote: > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan wrote: >> >> I tried to run the test that you mentioned >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. >> 6f08d0a. >> >> In IDE, I see that: >> - checkpoint is never triggered (sentence is too short, checkpoint >> pause and interval are too large) >> - exception is never thrown, so the job never restarted >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) >> >> When I add sleep between each element, set 10ms interval, 0ms pause >> and introduce some random exception, I do see > > > do you mean inside the processElement() method? > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > > How do you create a random exception? do you mean not mine SimulatedException? > > Using these configurations that I just said it is not working for me. I am > testing on the terminal "mvn > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On > IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: > latencyTrackingConfigBuilder" when I call "env.execute();" > > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) > at > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at
Re: How to know (in code) how many times the job restarted?
On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan wrote: > I tried to run the test that you mentioned > (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. > 6f08d0a. > > In IDE, I see that: > - checkpoint is never triggered (sentence is too short, checkpoint > pause and interval are too large) > - exception is never thrown, so the job never restarted > (currentTimeMillis is incremented but referenceTimeMillisAhead is not) > > When I add sleep between each element, set 10ms interval, 0ms pause > and introduce some random exception, I do see > do you mean inside the processElement() method? what is 0ms pause? do you mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? How do you create a random exception? do you mean not mine SimulatedException? Using these configurations that I just said it is not working for me. I am testing on the terminal "mvn -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder" when I call "env.execute();" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) at org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder > 2021-06-18 17:37:53,241 INFO > org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - > Attempts restart: 1 > in the logs. > > These settings probably differ on the cluster and there is some > unrelated exception which causes a restart. > > Regards, > Roman > > On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez > wrote: > > > > I investigated a little bit more. I created the same POC on a Flink > version 1.13. I have this ProcessFunction where I want to count the times > it recovers. I tested with ListState and ValueState and it seems that > during the integration test (only for integration test) the process is > hanging on the open() or on the initializeState() methods. > >
Re: How to know (in code) how many times the job restarted?
I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods. https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster. I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests? *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > No, it didn't work. > > The "context.isRestored()" returns true when I run the application on the > Flink standalone-cluster and it is recovering after a failure. When I do > the same on a integration test it does not returns true after a failure. I > mean, I can log the exception that is causing the failure, the > initializeState() is called after a failure, but the context.isRestored() > is false again. I also tried to update the state on the first time to 0 "if > (!context.isRestored()) { restartsState.add(0L); }" and it does not work. > I think the problem is not on the ListState that I am using and not on the > context.isRestore() as well. It is on the "context.getOperatorStateStore()" > that is always null only on integration tests. Using the below code I can > see on the logs "restarts: 0" twice, before and after failure. > > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > // unit tests does not open OperatorStateStore > if (context.getOperatorStateStore() != null) { > restartsState = > context.getOperatorStateStore().getListState(new > ListStateDescriptor("restarts", Long.class)); > > List restoreList = > Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(0L); > LOG.info("restarts: 0"); > } else { > Long max = Collections.max(restoreList); > LOG.info("restarts: " + max); > restartsState.add(max + 1); > } > } > } > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan > wrote: > >> Thanks for sharing, >> >> I think the problem is that restartsState is never updated: >> - on the first attempt, context.isRestored() returns false (and "never >> restored" is logged) >> - on subsequent attempts, it again returns false, because the state >> was never updated before >> >> Adding >> if (!context.isRestored()) { restartsState.add(0L); } >> should solve the problem >> (it's also better to use state.update instead of state.add if only max >> is needed). >> >> Regards, >> Roman >> >> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >> wrote: >> > >> > Sure, here it is. Nothing is mocked. I double-checked. >> > >> > UnitTestClass {. >> > protected static LocalFlinkMiniCluster flink; >> > >> > @BeforeClass >> > public static void prepare() { >> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >> > flink.start(); >> > >> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >> > } >> > >> > private static Configuration getFlinkConfiguration() { >> > Configuration flinkConfig = new Configuration(); >> > flinkConfig.setInteger("local.number-taskmanager", 1); >> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >> > try { >> > flinkConfig.setString("state.checkpoints.dir", "file://" + >> tempFolder.newFolder().getAbsolutePath()); >> > } catch (IOException e) { >> > throw new RuntimeException("error in flink cluster config", e); >> > } >> > return flinkConfig; >> > } >> > >> > >> > The class that I check if the job was restarted: >> > >> > public class ExceptionSimulatorProcessFunction extends >> ProcessFunction >> >
Re: How to know (in code) how many times the job restarted?
No, it didn't work. The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work. I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure. @Override public void initializeState(FunctionInitializationContext context) throws Exception { // unit tests does not open OperatorStateStore if (context.getOperatorStateStore() != null) { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("restarts", Long.class)); List restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(0L); LOG.info("restarts: 0"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } } *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan wrote: > Thanks for sharing, > > I think the problem is that restartsState is never updated: > - on the first attempt, context.isRestored() returns false (and "never > restored" is logged) > - on subsequent attempts, it again returns false, because the state > was never updated before > > Adding > if (!context.isRestored()) { restartsState.add(0L); } > should solve the problem > (it's also better to use state.update instead of state.add if only max > is needed). > > Regards, > Roman > > On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez > wrote: > > > > Sure, here it is. Nothing is mocked. I double-checked. > > > > UnitTestClass {. > > protected static LocalFlinkMiniCluster flink; > > > > @BeforeClass > > public static void prepare() { > > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); > > flink.start(); > > > > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > > } > > > > private static Configuration getFlinkConfiguration() { > > Configuration flinkConfig = new Configuration(); > > flinkConfig.setInteger("local.number-taskmanager", 1); > > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); > > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); > > try { > > flinkConfig.setString("state.checkpoints.dir", "file://" + > tempFolder.newFolder().getAbsolutePath()); > > } catch (IOException e) { > > throw new RuntimeException("error in flink cluster config", e); > > } > > return flinkConfig; > > } > > > > > > The class that I check if the job was restarted: > > > > public class ExceptionSimulatorProcessFunction extends > ProcessFunction > > implements CheckpointedFunction { > > > > final OutputTag outputTag = new OutputTag("side-output") > { > > }; > > private transient ListState restartsState; > > private Long restartsLocal; > > ... > > @Override > > public void processElement(Object value, Context ctx, > Collector out) throws Exception { > > this.currentTimeMillis = System.currentTimeMillis() - > currentTimeMillisBehind; > > > > // If current time is less than the reference time ahead AND we > have the poison auction an exception will throw > > if (this.currentTimeMillis < this.referenceTimeMillisAhead && > POISON__TRANSACTION_ID.equals(value.toString())) { > > > > LOG.error("This exception will trigger until the reference > time [{}] reaches the trigger time [{}]", > > sdfMillis.format(new Date(this.currentTimeMillis)), > > sdfMillis.format(new > Date(this.referenceTimeMillisAhead))); > > > > throw new SimulatedException("Transaction ID: " + > value.toString() + > > " not allowed. This is a simple exception for > testing purposes."); > > } > > out.collect(value); > > > > > > // counts the restarts > > if (restartsState != null) { > > List restoreList = > Lists.newArrayList(restartsState.get()); > > Long attemptsRestart = 0L; > > if (restoreList != null && !restoreList.isEmpty()) { > > attemptsRestart = Collections.max(restoreList); > >
Re: How to know (in code) how many times the job restarted?
Thanks for sharing, I think the problem is that restartsState is never updated: - on the first attempt, context.isRestored() returns false (and "never restored" is logged) - on subsequent attempts, it again returns false, because the state was never updated before Adding if (!context.isRestored()) { restartsState.add(0L); } should solve the problem (it's also better to use state.update instead of state.add if only max is needed). Regards, Roman On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez wrote: > > Sure, here it is. Nothing is mocked. I double-checked. > > UnitTestClass {. > protected static LocalFlinkMiniCluster flink; > > @BeforeClass > public static void prepare() { > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); > flink.start(); > > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > } > > private static Configuration getFlinkConfiguration() { > Configuration flinkConfig = new Configuration(); > flinkConfig.setInteger("local.number-taskmanager", 1); > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); > try { > flinkConfig.setString("state.checkpoints.dir", "file://" + > tempFolder.newFolder().getAbsolutePath()); > } catch (IOException e) { > throw new RuntimeException("error in flink cluster config", e); > } > return flinkConfig; > } > > > The class that I check if the job was restarted: > > public class ExceptionSimulatorProcessFunction extends > ProcessFunction > implements CheckpointedFunction { > > final OutputTag outputTag = new OutputTag("side-output") { > }; > private transient ListState restartsState; > private Long restartsLocal; > ... > @Override > public void processElement(Object value, Context ctx, Collector > out) throws Exception { > this.currentTimeMillis = System.currentTimeMillis() - > currentTimeMillisBehind; > > // If current time is less than the reference time ahead AND we have > the poison auction an exception will throw > if (this.currentTimeMillis < this.referenceTimeMillisAhead && > POISON__TRANSACTION_ID.equals(value.toString())) { > > LOG.error("This exception will trigger until the reference time > [{}] reaches the trigger time [{}]", > sdfMillis.format(new Date(this.currentTimeMillis)), > sdfMillis.format(new > Date(this.referenceTimeMillisAhead))); > > throw new SimulatedException("Transaction ID: " + > value.toString() + > " not allowed. This is a simple exception for testing > purposes."); > } > out.collect(value); > > > // counts the restarts > if (restartsState != null) { > List restoreList = Lists.newArrayList(restartsState.get()); > Long attemptsRestart = 0L; > if (restoreList != null && !restoreList.isEmpty()) { > attemptsRestart = Collections.max(restoreList); > if (restartsLocal < attemptsRestart) { > restartsLocal = attemptsRestart; > ctx.output(outputTag, Long.valueOf(attemptsRestart)); > } > } > LOG.info("Attempts restart: " + attemptsRestart); > } > } > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception {} > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception { > restartsState = context.getOperatorStateStore().getListState(new > ListStateDescriptor("restarts", Long.class)); > > if (context.isRestored()) { > List restoreList = Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(1L); > LOG.info("restarts: 1"); > } else { > Long max = Collections.max(restoreList); > LOG.info("restarts: " + max); > restartsState.add(max + 1); > } > } else { > LOG.info("restarts: never restored"); > } > } > } > > > > > > > > > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan wrote: >> >> Hi, >> >> Could you please share the test code? >> >> I think the returned value might depend on the level on which the >> tests are executed. If it's a regular job then it should return the >> correct value (as with cluster). If the environment in which the code >> is executed is mocked then it can be false. >> >> Regards, >> Roman >> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> wrote: >> > >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone >> > cluster and it returns true when the application recovers. However, in >> >
Re: How to know (in code) how many times the job restarted?
Sure, here it is. Nothing is mocked. I double-checked. UnitTestClass {. protected static LocalFlinkMiniCluster flink; @BeforeClass public static void prepare() { flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); flink.start(); TestStreamEnvironment.setAsContext(flink, PARALLELISM); } private static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); flinkConfig.setInteger("local.number-taskmanager", 1); flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); try { flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); } catch (IOException e) { throw new RuntimeException("error in flink cluster config", e); } return flinkConfig; } The class that I check if the job was restarted: public class ExceptionSimulatorProcessFunction extends ProcessFunction implements CheckpointedFunction { final OutputTag outputTag = new OutputTag("side-output") { }; private transient ListState restartsState; private Long restartsLocal; ... @Override public void processElement(Object value, Context ctx, Collector out) throws Exception { this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; // If current time is less than the reference time ahead AND we have the poison auction an exception will throw if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", sdfMillis.format(new Date(this.currentTimeMillis)), sdfMillis.format(new Date(this.referenceTimeMillisAhead))); throw new SimulatedException("Transaction ID: " + value.toString() + " not allowed. This is a simple exception for testing purposes."); } out.collect(value); // counts the restarts if (restartsState != null) { List restoreList = Lists.newArrayList(restartsState.get()); Long attemptsRestart = 0L; if (restoreList != null && !restoreList.isEmpty()) { attemptsRestart = Collections.max(restoreList); if (restartsLocal < attemptsRestart) { restartsLocal = attemptsRestart; ctx.output(outputTag, Long.valueOf(attemptsRestart)); } } LOG.info("Attempts restart: " + attemptsRestart); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {} @Override public void initializeState(FunctionInitializationContext context) throws Exception { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("restarts", Long.class)); if (context.isRestored()) { List restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(1L); LOG.info("restarts: 1"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } else { LOG.info("restarts: never restored"); } } } On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan wrote: > Hi, > > Could you please share the test code? > > I think the returned value might depend on the level on which the > tests are executed. If it's a regular job then it should return the > correct value (as with cluster). If the environment in which the code > is executed is mocked then it can be false. > > Regards, > Roman > > On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez > wrote: > > > > Yes, I have state on the ProcessFunction. I tested it on a stand-alone > cluster and it returns true when the application recovers. However, in > integration tests it does not returns true. I am using Flink 1.4. Do you > know where it is saying at Flink release 1.13 ( > https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I > cannot see `isRestored()` equals true on integration tests? > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > > > > > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise wrote: > >> > >> Does your ProcessFunction has state? If not it would be in line with > the documentation. > >> > >> Also which Flink version are you using? Before Flink 1.13 empty state > was omitted so I could imagine that `isRestored()` would return false but > it should actually now also return true for empty state. > >> > >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
Re: How to know (in code) how many times the job restarted?
Hi, Could you please share the test code? I think the returned value might depend on the level on which the tests are executed. If it's a regular job then it should return the correct value (as with cluster). If the environment in which the code is executed is mocked then it can be false. Regards, Roman On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez wrote: > > Yes, I have state on the ProcessFunction. I tested it on a stand-alone > cluster and it returns true when the application recovers. However, in > integration tests it does not returns true. I am using Flink 1.4. Do you know > where it is saying at Flink release 1.13 > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot > see `isRestored()` equals true on integration tests? > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > > > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise wrote: >> >> Does your ProcessFunction has state? If not it would be in line with the >> documentation. >> >> Also which Flink version are you using? Before Flink 1.13 empty state was >> omitted so I could imagine that `isRestored()` would return false but it >> should actually now also return true for empty state. >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez >> wrote: >>> >>> So, I was trying to improve by using the CheckpointedFunction as it shows >>> here [1]. But the method isRestored() says in its documentation [2]: >>> >>> "Returns true, if state was restored from the snapshot of a previous >>> execution. This returns always false for stateless tasks." >>> >>> It is weird because I am extending a ProcessFunction which is a >>> RichFunction. >>> >>> public class AuctionExceptionSimulatorProcessFunction extends >>> ProcessFunction >>> implements CheckpointedFunction { >>> ... >>> >>> In the end, I cannot rely on the "isRestored()". Do you know what could be >>> wrong? I used the same implementation method of [1]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> >>> -- >>> -- Felipe Gutierrez >>> -- skype: felipe.o.gutierrez >>> >>> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan wrote: You can also use accumulators [1] to collect the number of restarts (and then access it via client); but side outputs should work as well. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters Regards, Roman On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez wrote: > > I just realised that only the ProcessFunction is enough. I don't need > the CheckpointFunction. > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, > wrote: >> >> Cool! >> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I >> want to count the number of restarts. (yes I am using version 1.4 >> unfortunately). >> >> Because I need to test it in an integration test I am using a side >> output >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell >> me :). This was the way that I solved >> >> Thanks >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >> wrote: >>> >>> Hi Felipe, >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> that depending on the configuration only a pipeline region can be >>> restarted, not the whole job). >>> >>> But if all you want is to check whether it's a first attempt or not, >>> you can also call context.isRestored() from initializeState() [2] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> wrote: >>> > >>> > Hello community, >>> > >>> > Is it possible to know programmatically how many times my Flink >>> > stream job restarted
Re: How to know (in code) how many times the job restarted?
Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 ( https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests? *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise wrote: > Does your ProcessFunction has state? If not it would be in line with the > documentation. > > Also which Flink version are you using? Before Flink 1.13 empty state was > omitted so I could imagine that `isRestored()` would return false but it > should actually now also return true for empty state. > > On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> So, I was trying to improve by using the CheckpointedFunction as it shows >> here [1]. But the method isRestored() says in its documentation [2]: >> >> "Returns true, if state was restored from the snapshot of a previous >> execution. This returns always false for stateless tasks." >> >> It is weird because I am extending a ProcessFunction which is a >> RichFunction. >> >> public class AuctionExceptionSimulatorProcessFunction extends >> ProcessFunction >> implements CheckpointedFunction { >> ... >> >> In the end, I cannot rely on the "isRestored()". Do you know what could >> be wrong? I used the same implementation method of [1]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >> *--* >> *-- Felipe Gutierrez* >> *-- skype: felipe.o.gutierrez* >> >> >> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan >> wrote: >> >>> You can also use accumulators [1] to collect the number of restarts >>> (and then access it via client); but side outputs should work as well. >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >>> >>> Regards, >>> Roman >>> >>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >>> wrote: >>> > >>> > I just realised that only the ProcessFunction is enough. I don't need >>> the CheckpointFunction. >>> > >>> > >>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < >>> felipe.o.gutier...@gmail.com> wrote: >>> >> >>> >> Cool! >>> >> >>> >> I did using this example >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >>> because I don't have a keyed stream on the specific operator that I want to >>> count the number of restarts. (yes I am using version 1.4 unfortunately). >>> >> >>> >> Because I need to test it in an integration test I am using a side >>> output ( >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >>> to attach a sink. I am not sure if you have a better idea to test the >>> restarts on an integration test. If you have a simple idea please tell me >>> :). This was the way that I solved >>> >> >>> >> Thanks >>> >> Felipe >>> >> >>> >> -- >>> >> -- Felipe Gutierrez >>> >> -- skype: felipe.o.gutierrez >>> >> >>> >> >>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >>> wrote: >>> >>> >>> >>> Hi Felipe, >>> >>> >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> >>> that depending on the configuration only a pipeline region can be >>> >>> restarted, not the whole job). >>> >>> >>> >>> But if all you want is to check whether it's a first attempt or not, >>> >>> you can also call context.isRestored() from initializeState() [2] >>> >>> >>> >>> [1] >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> >>> >>> [2] >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> >>> >>> Regards, >>> >>> Roman >>> >>> >>> >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> >>> wrote: >>> >>> > >>> >>> > Hello community, >>> >>> > >>> >>> > Is it possible to know programmatically how many times my Flink >>> stream job restarted since it was running? >>> >>> > >>> >>> > My use case is like this. I have an Unit test that uses checkpoint >>> and I throw one exception in a MapFunction for a given time, i.e.: for the >>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can >>> recover the state and after 2 seconds I don't throw any exception anymore. >>> Then I would like to know how many times the job was restarted. >>> >>> > >>> >>> > Thanks, >>> >>> > Felipe >>> >>> > >>> >>
Re: How to know (in code) how many times the job restarted?
Does your ProcessFunction has state? If not it would be in line with the documentation. Also which Flink version are you using? Before Flink 1.13 empty state was omitted so I could imagine that `isRestored()` would return false but it should actually now also return true for empty state. On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > So, I was trying to improve by using the CheckpointedFunction as it shows > here [1]. But the method isRestored() says in its documentation [2]: > > "Returns true, if state was restored from the snapshot of a previous > execution. This returns always false for stateless tasks." > > It is weird because I am extending a ProcessFunction which is a > RichFunction. > > public class AuctionExceptionSimulatorProcessFunction extends > ProcessFunction > implements CheckpointedFunction { > ... > > In the end, I cannot rely on the "isRestored()". Do you know what could be > wrong? I used the same implementation method of [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan > wrote: > >> You can also use accumulators [1] to collect the number of restarts >> (and then access it via client); but side outputs should work as well. >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >> Regards, >> Roman >> >> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> wrote: >> > >> > I just realised that only the ProcessFunction is enough. I don't need >> the CheckpointFunction. >> > >> > >> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < >> felipe.o.gutier...@gmail.com> wrote: >> >> >> >> Cool! >> >> >> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I want to >> count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> >> >> Because I need to test it in an integration test I am using a side >> output ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell me >> :). This was the way that I solved >> >> >> >> Thanks >> >> Felipe >> >> >> >> -- >> >> -- Felipe Gutierrez >> >> -- skype: felipe.o.gutierrez >> >> >> >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >> wrote: >> >>> >> >>> Hi Felipe, >> >>> >> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> >>> that depending on the configuration only a pipeline region can be >> >>> restarted, not the whole job). >> >>> >> >>> But if all you want is to check whether it's a first attempt or not, >> >>> you can also call context.isRestored() from initializeState() [2] >> >>> >> >>> [1] >> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>> >> >>> [2] >> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> >> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>> wrote: >> >>> > >> >>> > Hello community, >> >>> > >> >>> > Is it possible to know programmatically how many times my Flink >> stream job restarted since it was running? >> >>> > >> >>> > My use case is like this. I have an Unit test that uses checkpoint >> and I throw one exception in a MapFunction for a given time, i.e.: for the >> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can >> recover the state and after 2 seconds I don't throw any exception anymore. >> Then I would like to know how many times the job was restarted. >> >>> > >> >>> > Thanks, >> >>> > Felipe >> >>> > >> >
Re: How to know (in code) how many times the job restarted?
So, I was trying to improve by using the CheckpointedFunction as it shows here [1]. But the method isRestored() says in its documentation [2]: "Returns true, if state was restored from the snapshot of a previous execution. This returns always false for stateless tasks." It is weird because I am extending a ProcessFunction which is a RichFunction. public class AuctionExceptionSimulatorProcessFunction extends ProcessFunction implements CheckpointedFunction { ... In the end, I cannot rely on the "isRestored()". Do you know what could be wrong? I used the same implementation method of [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan wrote: > You can also use accumulators [1] to collect the number of restarts > (and then access it via client); but side outputs should work as well. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters > > Regards, > Roman > > On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez > wrote: > > > > I just realised that only the ProcessFunction is enough. I don't need > the CheckpointFunction. > > > > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < > felipe.o.gutier...@gmail.com> wrote: > >> > >> Cool! > >> > >> I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > >> > >> Because I need to test it in an integration test I am using a side > output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved > >> > >> Thanks > >> Felipe > >> > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> > >> > >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan > wrote: > >>> > >>> Hi Felipe, > >>> > >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware > >>> that depending on the configuration only a pipeline region can be > >>> restarted, not the whole job). > >>> > >>> But if all you want is to check whether it's a first attempt or not, > >>> you can also call context.isRestored() from initializeState() [2] > >>> > >>> [1] > >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > >>> > >>> [2] > >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >>> > >>> Regards, > >>> Roman > >>> > >>> > >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > >>> wrote: > >>> > > >>> > Hello community, > >>> > > >>> > Is it possible to know programmatically how many times my Flink > stream job restarted since it was running? > >>> > > >>> > My use case is like this. I have an Unit test that uses checkpoint > and I throw one exception in a MapFunction for a given time, i.e.: for the > 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can > recover the state and after 2 seconds I don't throw any exception anymore. > Then I would like to know how many times the job was restarted. > >>> > > >>> > Thanks, > >>> > Felipe > >>> > >
Re: How to know (in code) how many times the job restarted?
You can also use accumulators [1] to collect the number of restarts (and then access it via client); but side outputs should work as well. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters Regards, Roman On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez wrote: > > I just realised that only the ProcessFunction is enough. I don't need the > CheckpointFunction. > > > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, > wrote: >> >> Cool! >> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I want to >> count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> Because I need to test it in an integration test I am using a side output >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell me >> :). This was the way that I solved >> >> Thanks >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan wrote: >>> >>> Hi Felipe, >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> that depending on the configuration only a pipeline region can be >>> restarted, not the whole job). >>> >>> But if all you want is to check whether it's a first attempt or not, >>> you can also call context.isRestored() from initializeState() [2] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> wrote: >>> > >>> > Hello community, >>> > >>> > Is it possible to know programmatically how many times my Flink stream >>> > job restarted since it was running? >>> > >>> > My use case is like this. I have an Unit test that uses checkpoint and I >>> > throw one exception in a MapFunction for a given time, i.e.: for the 2 >>> > seconds ahead. Because Flink restarts the job and I have checkpoint I can >>> > recover the state and after 2 seconds I don't throw any exception >>> > anymore. Then I would like to know how many times the job was restarted. >>> > >>> > Thanks, >>> > Felipe >>> >
Re: How to know (in code) how many times the job restarted?
I just realised that only the ProcessFunction is enough. I don't need the CheckpointFunction. On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, wrote: > Cool! > > I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > > Because I need to test it in an integration test I am using a side output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved > > Thanks > Felipe > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan > wrote: > >> Hi Felipe, >> >> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> that depending on the configuration only a pipeline region can be >> restarted, not the whole job). >> >> But if all you want is to check whether it's a first attempt or not, >> you can also call context.isRestored() from initializeState() [2] >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >> [2] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> Regards, >> Roman >> >> >> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> wrote: >> > >> > Hello community, >> > >> > Is it possible to know programmatically how many times my Flink stream >> job restarted since it was running? >> > >> > My use case is like this. I have an Unit test that uses checkpoint and >> I throw one exception in a MapFunction for a given time, i.e.: for the 2 >> seconds ahead. Because Flink restarts the job and I have checkpoint I can >> recover the state and after 2 seconds I don't throw any exception anymore. >> Then I would like to know how many times the job was restarted. >> > >> > Thanks, >> > Felipe >> > >> >
Re: How to know (in code) how many times the job restarted?
Cool! I did using this example https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state because I don't have a keyed stream on the specific operator that I want to count the number of restarts. (yes I am using version 1.4 unfortunately). Because I need to test it in an integration test I am using a side output ( https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) to attach a sink. I am not sure if you have a better idea to test the restarts on an integration test. If you have a simple idea please tell me :). This was the way that I solved Thanks Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan wrote: > Hi Felipe, > > You can use getRuntimeContext().getAttemptNumber() [1] (but beware > that depending on the configuration only a pipeline region can be > restarted, not the whole job). > > But if all you want is to check whether it's a first attempt or not, > you can also call context.isRestored() from initializeState() [2] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > > Regards, > Roman > > > On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > wrote: > > > > Hello community, > > > > Is it possible to know programmatically how many times my Flink stream > job restarted since it was running? > > > > My use case is like this. I have an Unit test that uses checkpoint and I > throw one exception in a MapFunction for a given time, i.e.: for the 2 > seconds ahead. Because Flink restarts the job and I have checkpoint I can > recover the state and after 2 seconds I don't throw any exception anymore. > Then I would like to know how many times the job was restarted. > > > > Thanks, > > Felipe > > >
Re: How to know (in code) how many times the job restarted?
Hi Felipe, You can use getRuntimeContext().getAttemptNumber() [1] (but beware that depending on the configuration only a pipeline region can be restarted, not the whole job). But if all you want is to check whether it's a first attempt or not, you can also call context.isRestored() from initializeState() [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- Regards, Roman On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez wrote: > > Hello community, > > Is it possible to know programmatically how many times my Flink stream job > restarted since it was running? > > My use case is like this. I have an Unit test that uses checkpoint and I > throw one exception in a MapFunction for a given time, i.e.: for the 2 > seconds ahead. Because Flink restarts the job and I have checkpoint I can > recover the state and after 2 seconds I don't throw any exception anymore. > Then I would like to know how many times the job was restarted. > > Thanks, > Felipe >
How to know (in code) how many times the job restarted?
Hello community, Is it possible to know programmatically how many times my Flink stream job restarted since it was running? My use case is like this. I have an Unit test that uses checkpoint and I throw one exception in a MapFunction for a given time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can recover the state and after 2 seconds I don't throw any exception anymore. Then I would like to know how many times the job was restarted. Thanks, Felipe