Thanks for sharing, I think the problem is that restartsState is never updated: - on the first attempt, context.isRestored() returns false (and "never restored" is logged) - on subsequent attempts, it again returns false, because the state was never updated before
Adding if (!context.isRestored()) { restartsState.add(0L); } should solve the problem (it's also better to use state.update instead of state.add if only max is needed). Regards, Roman On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > Sure, here it is. Nothing is mocked. I double-checked. > > UnitTestClass {..... > protected static LocalFlinkMiniCluster flink; > > @BeforeClass > public static void prepare() { > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); > flink.start(); > > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > } > > private static Configuration getFlinkConfiguration() { > Configuration flinkConfig = new Configuration(); > flinkConfig.setInteger("local.number-taskmanager", 1); > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); > try { > flinkConfig.setString("state.checkpoints.dir", "file://" + > tempFolder.newFolder().getAbsolutePath()); > } catch (IOException e) { > throw new RuntimeException("error in flink cluster config", e); > } > return flinkConfig; > } > > > The class that I check if the job was restarted: > > public class ExceptionSimulatorProcessFunction extends > ProcessFunction<Object..., Object...> > implements CheckpointedFunction { > > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { > }; > private transient ListState<Long> restartsState; > private Long restartsLocal; > ... > @Override > public void processElement(Object value, Context ctx, Collector<Object> > out) throws Exception { > this.currentTimeMillis = System.currentTimeMillis() - > currentTimeMillisBehind; > > // If current time is less than the reference time ahead AND we have > the poison auction an exception will throw > if (this.currentTimeMillis < this.referenceTimeMillisAhead && > POISON__TRANSACTION_ID.equals(value.toString())) { > > LOG.error("This exception will trigger until the reference time > [{}] reaches the trigger time [{}]", > sdfMillis.format(new Date(this.currentTimeMillis)), > sdfMillis.format(new > Date(this.referenceTimeMillisAhead))); > > throw new SimulatedException("Transaction ID: " + > value.toString() + > " not allowed. This is a simple exception for testing > purposes."); > } > out.collect(value); > > > // counts the restarts > if (restartsState != null) { > List<Long> restoreList = Lists.newArrayList(restartsState.get()); > Long attemptsRestart = 0L; > if (restoreList != null && !restoreList.isEmpty()) { > attemptsRestart = Collections.max(restoreList); > if (restartsLocal < attemptsRestart) { > restartsLocal = attemptsRestart; > ctx.output(outputTag, Long.valueOf(attemptsRestart)); > } > } > LOG.info("Attempts restart: " + attemptsRestart); > } > } > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception {} > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception { > restartsState = context.getOperatorStateStore().getListState(new > ListStateDescriptor<Long>("restarts", Long.class)); > > if (context.isRestored()) { > List<Long> restoreList = Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(1L); > LOG.info("restarts: 1"); > } else { > Long max = Collections.max(restoreList); > LOG.info("restarts: " + max); > restartsState.add(max + 1); > } > } else { > LOG.info("restarts: never restored"); > } > } > } > > > > > > > > > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi, >> >> Could you please share the test code? >> >> I think the returned value might depend on the level on which the >> tests are executed. If it's a regular job then it should return the >> correct value (as with cluster). If the environment in which the code >> is executed is mocked then it can be false. >> >> Regards, >> Roman >> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone >> > cluster and it returns true when the application recovers. However, in >> > integration tests it does not returns true. I am using Flink 1.4. Do you >> > know where it is saying at Flink release 1.13 >> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I >> > cannot see `isRestored()` equals true on integration tests? >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > >> > >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote: >> >> >> >> Does your ProcessFunction has state? If not it would be in line with the >> >> documentation. >> >> >> >> Also which Flink version are you using? Before Flink 1.13 empty state was >> >> omitted so I could imagine that `isRestored()` would return false but it >> >> should actually now also return true for empty state. >> >> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez >> >> <felipe.o.gutier...@gmail.com> wrote: >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it >> >>> shows here [1]. But the method isRestored() says in its documentation >> >>> [2]: >> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous >> >>> execution. This returns always false for stateless tasks." >> >>> >> >>> It is weird because I am extending a ProcessFunction which is a >> >>> RichFunction. >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData> >> >>> implements CheckpointedFunction { >> >>> ... >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what could >> >>> be wrong? I used the same implementation method of [1]. >> >>> >> >>> [1] >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >>> [2] >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> >> >>> -- >> >>> -- Felipe Gutierrez >> >>> -- skype: felipe.o.gutierrez >> >>> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> >> >>> wrote: >> >>>> >> >>>> You can also use accumulators [1] to collect the number of restarts >> >>>> (and then access it via client); but side outputs should work as well. >> >>>> >> >>>> [1] >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >>>> >> >>>> Regards, >> >>>> Roman >> >>>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >>>> <felipe.o.gutier...@gmail.com> wrote: >> >>>> > >> >>>> > I just realised that only the ProcessFunction is enough. I don't need >> >>>> > the CheckpointFunction. >> >>>> > >> >>>> > >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, >> >>>> > <felipe.o.gutier...@gmail.com> wrote: >> >>>> >> >> >>>> >> Cool! >> >>>> >> >> >>>> >> I did using this example >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> >>>> >> because I don't have a keyed stream on the specific operator that I >> >>>> >> want to count the number of restarts. (yes I am using version 1.4 >> >>>> >> unfortunately). >> >>>> >> >> >>>> >> Because I need to test it in an integration test I am using a side >> >>>> >> output >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> >>>> >> to attach a sink. I am not sure if you have a better idea to test >> >>>> >> the restarts on an integration test. If you have a simple idea >> >>>> >> please tell me :). This was the way that I solved.... >> >>>> >> >> >>>> >> Thanks >> >>>> >> Felipe >> >>>> >> >> >>>> >> -- >> >>>> >> -- Felipe Gutierrez >> >>>> >> -- skype: felipe.o.gutierrez >> >>>> >> >> >>>> >> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <ro...@apache.org> >> >>>> >> wrote: >> >>>> >>> >> >>>> >>> Hi Felipe, >> >>>> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >> >>>> >>> that depending on the configuration only a pipeline region can be >> >>>> >>> restarted, not the whole job). >> >>>> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or not, >> >>>> >>> you can also call context.isRestored() from initializeState() [2] >> >>>> >>> >> >>>> >>> [1] >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>>> >>> >> >>>> >>> [2] >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>>> >>> >> >>>> >>> Regards, >> >>>> >>> Roman >> >>>> >>> >> >>>> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: >> >>>> >>> > >> >>>> >>> > Hello community, >> >>>> >>> > >> >>>> >>> > Is it possible to know programmatically how many times my Flink >> >>>> >>> > stream job restarted since it was running? >> >>>> >>> > >> >>>> >>> > My use case is like this. I have an Unit test that uses >> >>>> >>> > checkpoint and I throw one exception in a MapFunction for a given >> >>>> >>> > time, i.e.: for the 2 seconds ahead. Because Flink restarts the >> >>>> >>> > job and I have checkpoint I can recover the state and after 2 >> >>>> >>> > seconds I don't throw any exception anymore. Then I would like to >> >>>> >>> > know how many times the job was restarted. >> >>>> >>> > >> >>>> >>> > Thanks, >> >>>> >>> > Felipe >> >>>> >>> >