Sure, here it is. Nothing is mocked. I double-checked. UnitTestClass {..... protected static LocalFlinkMiniCluster flink;
@BeforeClass public static void prepare() { flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); flink.start(); TestStreamEnvironment.setAsContext(flink, PARALLELISM); } private static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); flinkConfig.setInteger("local.number-taskmanager", 1); flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); try { flinkConfig.setString("state.checkpoints.dir", "file://" + tempFolder.newFolder().getAbsolutePath()); } catch (IOException e) { throw new RuntimeException("error in flink cluster config", e); } return flinkConfig; } The class that I check if the job was restarted: public class ExceptionSimulatorProcessFunction extends ProcessFunction<Object..., Object...> implements CheckpointedFunction { final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { }; private transient ListState<Long> restartsState; private Long restartsLocal; ... @Override public void processElement(Object value, Context ctx, Collector<Object> out) throws Exception { this.currentTimeMillis = System.currentTimeMillis() - currentTimeMillisBehind; // If current time is less than the reference time ahead AND we have the poison auction an exception will throw if (this.currentTimeMillis < this.referenceTimeMillisAhead && POISON__TRANSACTION_ID.equals(value.toString())) { LOG.error("This exception will trigger until the reference time [{}] reaches the trigger time [{}]", sdfMillis.format(new Date(this.currentTimeMillis)), sdfMillis.format(new Date(this.referenceTimeMillisAhead))); throw new SimulatedException("Transaction ID: " + value.toString() + " not allowed. This is a simple exception for testing purposes."); } out.collect(value); // counts the restarts if (restartsState != null) { List<Long> restoreList = Lists.newArrayList(restartsState.get()); Long attemptsRestart = 0L; if (restoreList != null && !restoreList.isEmpty()) { attemptsRestart = Collections.max(restoreList); if (restartsLocal < attemptsRestart) { restartsLocal = attemptsRestart; ctx.output(outputTag, Long.valueOf(attemptsRestart)); } } LOG.info("Attempts restart: " + attemptsRestart); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {} @Override public void initializeState(FunctionInitializationContext context) throws Exception { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); if (context.isRestored()) { List<Long> restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(1L); LOG.info("restarts: 1"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } else { LOG.info("restarts: never restored"); } } } On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> wrote: > Hi, > > Could you please share the test code? > > I think the returned value might depend on the level on which the > tests are executed. If it's a regular job then it should return the > correct value (as with cluster). If the environment in which the code > is executed is mocked then it can be false. > > Regards, > Roman > > On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > Yes, I have state on the ProcessFunction. I tested it on a stand-alone > cluster and it returns true when the application recovers. However, in > integration tests it does not returns true. I am using Flink 1.4. Do you > know where it is saying at Flink release 1.13 ( > https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I > cannot see `isRestored()` equals true on integration tests? > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > > > > > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote: > >> > >> Does your ProcessFunction has state? If not it would be in line with > the documentation. > >> > >> Also which Flink version are you using? Before Flink 1.13 empty state > was omitted so I could imagine that `isRestored()` would return false but > it should actually now also return true for empty state. > >> > >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >>> > >>> So, I was trying to improve by using the CheckpointedFunction as it > shows here [1]. But the method isRestored() says in its documentation [2]: > >>> > >>> "Returns true, if state was restored from the snapshot of a previous > execution. This returns always false for stateless tasks." > >>> > >>> It is weird because I am extending a ProcessFunction which is a > RichFunction. > >>> > >>> public class AuctionExceptionSimulatorProcessFunction extends > ProcessFunction<KeyedReportingData, KeyedReportingData> > >>> implements CheckpointedFunction { > >>> ... > >>> > >>> In the end, I cannot rely on the "isRestored()". Do you know what > could be wrong? I used the same implementation method of [1]. > >>> > >>> [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > >>> [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >>> > >>> > >>> -- > >>> -- Felipe Gutierrez > >>> -- skype: felipe.o.gutierrez > >>> > >>> > >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> > wrote: > >>>> > >>>> You can also use accumulators [1] to collect the number of restarts > >>>> (and then access it via client); but side outputs should work as well. > >>>> > >>>> [1] > >>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters > >>>> > >>>> Regards, > >>>> Roman > >>>> > >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez > >>>> <felipe.o.gutier...@gmail.com> wrote: > >>>> > > >>>> > I just realised that only the ProcessFunction is enough. I don't > need the CheckpointFunction. > >>>> > > >>>> > > >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < > felipe.o.gutier...@gmail.com> wrote: > >>>> >> > >>>> >> Cool! > >>>> >> > >>>> >> I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > >>>> >> > >>>> >> Because I need to test it in an integration test I am using a side > output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved.... > >>>> >> > >>>> >> Thanks > >>>> >> Felipe > >>>> >> > >>>> >> -- > >>>> >> -- Felipe Gutierrez > >>>> >> -- skype: felipe.o.gutierrez > >>>> >> > >>>> >> > >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan < > ro...@apache.org> wrote: > >>>> >>> > >>>> >>> Hi Felipe, > >>>> >>> > >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware > >>>> >>> that depending on the configuration only a pipeline region can be > >>>> >>> restarted, not the whole job). > >>>> >>> > >>>> >>> But if all you want is to check whether it's a first attempt or > not, > >>>> >>> you can also call context.isRestored() from initializeState() [2] > >>>> >>> > >>>> >>> [1] > >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > >>>> >>> > >>>> >>> [2] > >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >>>> >>> > >>>> >>> Regards, > >>>> >>> Roman > >>>> >>> > >>>> >>> > >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > >>>> >>> <felipe.o.gutier...@gmail.com> wrote: > >>>> >>> > > >>>> >>> > Hello community, > >>>> >>> > > >>>> >>> > Is it possible to know programmatically how many times my Flink > stream job restarted since it was running? > >>>> >>> > > >>>> >>> > My use case is like this. I have an Unit test that uses > checkpoint and I throw one exception in a MapFunction for a given time, > i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have > checkpoint I can recover the state and after 2 seconds I don't throw any > exception anymore. Then I would like to know how many times the job was > restarted. > >>>> >>> > > >>>> >>> > Thanks, > >>>> >>> > Felipe > >>>> >>> > >