I was able to reproduce this in a unit test:
@Test
>
> *public* *void* test() *throws* InterruptedException,
> ExecutionException {
>
> FlinkPipelineOptions options = PipelineOptionsFactory.*as*
> (FlinkPipelineOptions.*class*);
>
> options.setCheckpointingInterval(10L);
>
> options.setParallelism(1);
>
> options.setStreaming(*true*);
>
> options.setRunner(FlinkRunner.*class*);
>
> options.setFlinkMaster("[local]");
>
> options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
> ));
>
> Pipeline pipeline = Pipeline.*create*(options);
>
> pipeline
>
> .apply(Create.*of*((Void) *null*))
>
> .apply(
>
> ParDo.*of*(
>
> *new* DoFn<Void, Void>() {
>
>
> *private* *static* *final* *long* *serialVersionUID* =
> 1L;
>
>
> @RequiresStableInput
>
> @ProcessElement
>
> *public* *void* processElement() {}
>
> }));
>
> pipeline.run();
>
> }
>
It took a while to get to checkpoint 32,767, but eventually it did, and it
failed with the same error I listed above.
On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel <[email protected]>
wrote:
> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
> uses the RequiresStableInput feature.
>
> Currently it's configured to checkpoint once a minute, and after around
> 32000-33000 checkpoints, it fails with:
>
>> 2020-04-15 13:15:02,920 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>> 2020-04-15 13:15:05,762 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>> in 2667 ms).
>> 2020-04-15 13:16:02,919 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>> 2020-04-15 13:16:03,147 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph -
>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>> RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 32702 for operator <operator_name> (1/2).}
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>> for operator <operator_name> (1/2).
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.IllegalArgumentException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> ... 5 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>> at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>> ... 7 more
>
>
> The exception comes from here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>
> In the Flink Runner code, I can see that each checkpoint will result in a
> new OperatorState (or KeyedState if the stream is keyed):
>
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>
> This seems to be the reason the pipeline will eventually die.
>
> While a workaround might be to increase the time between checkpoints, it
> seems like any pipeline running on flink, using the RequiresStableInput is
> limited in the amount of time that it can run without being started from
> scratch.
>
>