Hi Eleanore, Exactly-once is not affected but the pipeline can fail to checkpoint after the maximum number of state cells have been reached. We are working on a fix [1].
Cheers, Max [1] https://github.com/apache/beam/pull/11478 On 22.04.20 07:19, Eleanore Jin wrote: > Hi Maxi, > > I assume this will impact the Exactly Once Semantics that beam provided > as in the KafkaExactlyOnceSink, the processElement method is also > annotated with @RequiresStableInput? > > Thanks a lot! > Eleanore > > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > Hi Stephen, > > Thanks for reporting the issue! David, good catch! > > I think we have to resort to only using a single state cell for > buffering on checkpoints, instead of using a new one for every > checkpoint. I was under the assumption that, if the state cell was > cleared, it would not be checkpointed but that does not seem to be > the case. > > Thanks, > Max > > On 21.04.20 09:29, David Morávek wrote: > > Hi Stephen, > > > > nice catch and awesome report! ;) This definitely needs a proper fix. > > I've created a new JIRA to track the issue and will try to resolve it > > soon as this seems critical to me. > > > > https://issues.apache.org/jira/browse/BEAM-9794 > > > > Thanks, > > D. > > > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel > <stephenpate...@gmail.com <mailto:stephenpate...@gmail.com> > > <mailto:stephenpate...@gmail.com > <mailto:stephenpate...@gmail.com>>> wrote: > > > > I was able to reproduce this in a unit test: > > > > @Test > > > > *public* *void* test() *throws* InterruptedException, > > ExecutionException { > > > > FlinkPipelineOptions options = > > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*); > > > > options.setCheckpointingInterval(10L); > > > > options.setParallelism(1); > > > > options.setStreaming(*true*); > > > > options.setRunner(FlinkRunner.*class*); > > > > options.setFlinkMaster("[local]"); > > > > options.setStateBackend(*new* > > MemoryStateBackend(Integer.*/MAX_VALUE/*)); > > > > Pipeline pipeline = Pipeline./create/(options); > > > > pipeline > > > > .apply(Create./of/((Void) *null*)) > > > > .apply( > > > > ParDo./of/( > > > > *new* DoFn<Void, Void>() { > > > > > > *private* *static* *final* *long* > > */serialVersionUID/* = 1L; > > > > > > @RequiresStableInput > > > > @ProcessElement > > > > *public* *void* processElement() {} > > > > })); > > > > pipeline.run(); > > > > } > > > > > > It took a while to get to checkpoint 32,767, but eventually it > did, > > and it failed with the same error I listed above. > > > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel > > <stephenpate...@gmail.com <mailto:stephenpate...@gmail.com> > <mailto:stephenpate...@gmail.com <mailto:stephenpate...@gmail.com>>> > wrote: > > > > I have a Beam Pipeline (2.14) running on Flink (1.8.0, > > emr-5.26.0) that uses the RequiresStableInput feature. > > > > Currently it's configured to checkpoint once a minute, and > after > > around 32000-33000 checkpoints, it fails with: > > > > 2020-04-15 13:15:02,920 INFO > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > - Triggering checkpoint 32701 @ 1586956502911 for job > > 9953424f21e240112dd23ab4f8320b60. > > 2020-04-15 13:15:05,762 INFO > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > - Completed checkpoint 32701 for job > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in > 2667 ms). > > 2020-04-15 13:16:02,919 INFO > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > - Triggering checkpoint 32702 @ 1586956562911 for job > > 9953424f21e240112dd23ab4f8320b60. > > 2020-04-15 13:16:03,147 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - <operator_name> (1/2) > > (f4737add01961f8b42b8eb4e791b83ba) switched from > RUNNING to > > FAILED. > > AsynchronousException{java.lang.Exception: Could not > > materialize checkpoint 32702 for operator <operator_name> > > (1/2).} > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > > at > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.Exception: Could not materialize > > checkpoint 32702 for operator <operator_name> (1/2). > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > > ... 6 more > > Caused by: java.util.concurrent.ExecutionException: > > java.lang.IllegalArgumentException > > at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > > at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > > at > > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) > > at > > > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > > ... 5 more > > Caused by: java.lang.IllegalArgumentException > > at > > > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > > at > > > > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) > > at > > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) > > at > > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > > at > > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > > at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) > > ... 7 more > > > > > > The exception comes from > > > here: > https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 > > > > In the Flink Runner code, I can see that each checkpoint will > > result in a new OperatorState (or KeyedState if the stream is > > keyed): > > > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103 > > > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143 > > > > This seems to be the reason the pipeline will eventually > die. > > > > While a workaround might be to increase the time between > > checkpoints, it seems like any pipeline running on flink, > using > > the RequiresStableInput is limited in the amount of time > that it > > can run without being started from scratch. > > >