Sure, here it is. Nothing is mocked. I double-checked.

UnitTestClass {.....
protected static LocalFlinkMiniCluster flink;

@BeforeClass
public static void prepare() {
    flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
    flink.start();

    TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}

private static Configuration getFlinkConfiguration() {
    Configuration flinkConfig = new Configuration();
    flinkConfig.setInteger("local.number-taskmanager", 1);
    flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
    flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
    flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
    try {
        flinkConfig.setString("state.checkpoints.dir", "file://" +
tempFolder.newFolder().getAbsolutePath());
    } catch (IOException e) {
        throw new RuntimeException("error in flink cluster config", e);
    }
    return flinkConfig;
}


The class that I check if the job was restarted:

public class ExceptionSimulatorProcessFunction extends
ProcessFunction<Object..., Object...>
        implements CheckpointedFunction {

    final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
    };
    private transient ListState<Long> restartsState;
    private Long restartsLocal;
    ...
    @Override
    public void processElement(Object value, Context ctx, Collector<Object>
out) throws Exception {
        this.currentTimeMillis = System.currentTimeMillis() -
currentTimeMillisBehind;

        // If current time is less than the reference time ahead AND we
have the poison auction an exception will throw
        if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
POISON__TRANSACTION_ID.equals(value.toString())) {

            LOG.error("This exception will trigger until the reference time
[{}] reaches the trigger time [{}]",
                    sdfMillis.format(new Date(this.currentTimeMillis)),
                    sdfMillis.format(new
Date(this.referenceTimeMillisAhead)));

            throw new SimulatedException("Transaction ID: " +
value.toString() +
                    " not allowed. This is a simple exception for testing
purposes.");
        }
        out.collect(value);


        // counts the restarts
        if (restartsState != null) {
            List<Long> restoreList =
Lists.newArrayList(restartsState.get());
            Long attemptsRestart = 0L;
            if (restoreList != null && !restoreList.isEmpty()) {
                attemptsRestart = Collections.max(restoreList);
                if (restartsLocal < attemptsRestart) {
                    restartsLocal = attemptsRestart;
                    ctx.output(outputTag, Long.valueOf(attemptsRestart));
                }
            }
            LOG.info("Attempts restart: " + attemptsRestart);
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws
Exception {}

    @Override
    public void initializeState(FunctionInitializationContext context)
throws Exception {
        restartsState = context.getOperatorStateStore().getListState(new
ListStateDescriptor<Long>("restarts", Long.class));

        if (context.isRestored()) {
            List<Long> restoreList =
Lists.newArrayList(restartsState.get());
            if (restoreList == null || restoreList.isEmpty()) {
                restartsState.add(1L);
                LOG.info("restarts: 1");
            } else {
                Long max = Collections.max(restoreList);
                LOG.info("restarts: " + max);
                restartsState.add(max + 1);
            }
        } else {
            LOG.info("restarts: never restored");
        }
    }
}








On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> Could you please share the test code?
>
> I think the returned value might depend on the level on which the
> tests are executed. If it's a regular job then it should return the
> correct value (as with cluster). If the environment in which the code
> is executed is mocked then it can be false.
>
> Regards,
> Roman
>
> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone
> cluster and it returns true when the application recovers. However, in
> integration tests it does not returns true. I am using Flink 1.4. Do you
> know where it is saying at Flink release 1.13 (
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
> cannot see `isRestored()` equals true on integration tests?
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> >
> >
> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote:
> >>
> >> Does your ProcessFunction has state? If not it would be in line with
> the documentation.
> >>
> >> Also which Flink version are you using? Before Flink 1.13 empty state
> was omitted so I could imagine that `isRestored()` would return false but
> it should actually now also return true for empty state.
> >>
> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> >>>
> >>> So, I was trying to improve by using the CheckpointedFunction as it
> shows here [1]. But the method isRestored() says in its documentation [2]:
> >>>
> >>> "Returns true, if state was restored from the snapshot of a previous
> execution. This returns always false for stateless tasks."
> >>>
> >>> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
> >>>
> >>> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction<KeyedReportingData, KeyedReportingData>
> >>>         implements CheckpointedFunction {
> >>> ...
> >>>
> >>> In the end, I cannot rely on the "isRestored()". Do you know what
> could be wrong? I used the same implementation method of [1].
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> >>> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>>
> >>>
> >>> --
> >>> -- Felipe Gutierrez
> >>> -- skype: felipe.o.gutierrez
> >>>
> >>>
> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>>
> >>>> You can also use accumulators [1] to collect the number of restarts
> >>>> (and then access it via client); but side outputs should work as well.
> >>>>
> >>>> [1]
> >>>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
> >>>>
> >>>> Regards,
> >>>> Roman
> >>>>
> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
> >>>> <felipe.o.gutier...@gmail.com> wrote:
> >>>> >
> >>>> > I just realised that only the ProcessFunction is enough. I don't
> need the CheckpointFunction.
> >>>> >
> >>>> >
> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
> felipe.o.gutier...@gmail.com> wrote:
> >>>> >>
> >>>> >> Cool!
> >>>> >>
> >>>> >> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
> >>>> >>
> >>>> >> Because I need to test it in an integration test I am using a side
> output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved....
> >>>> >>
> >>>> >> Thanks
> >>>> >> Felipe
> >>>> >>
> >>>> >> --
> >>>> >> -- Felipe Gutierrez
> >>>> >> -- skype: felipe.o.gutierrez
> >>>> >>
> >>>> >>
> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <
> ro...@apache.org> wrote:
> >>>> >>>
> >>>> >>> Hi Felipe,
> >>>> >>>
> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
> >>>> >>> that depending on the configuration only a pipeline region can be
> >>>> >>> restarted, not the whole job).
> >>>> >>>
> >>>> >>> But if all you want is to check whether it's a first attempt or
> not,
> >>>> >>> you can also call context.isRestored() from initializeState() [2]
> >>>> >>>
> >>>> >>> [1]
> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
> >>>> >>>
> >>>> >>> [2]
> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>>> >>>
> >>>> >>> Regards,
> >>>> >>> Roman
> >>>> >>>
> >>>> >>>
> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
> >>>> >>> >
> >>>> >>> > Hello community,
> >>>> >>> >
> >>>> >>> > Is it possible to know programmatically how many times my Flink
> stream job restarted since it was running?
> >>>> >>> >
> >>>> >>> > My use case is like this. I have an Unit test that uses
> checkpoint and I throw one exception in a MapFunction for a given time,
> i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have
> checkpoint I can recover the state and after 2 seconds I don't throw any
> exception anymore. Then I would like to know how many times the job was
> restarted.
> >>>> >>> >
> >>>> >>> > Thanks,
> >>>> >>> > Felipe
> >>>> >>> >
>

Reply via email to