Hi,

I have a Apache Beam application deployed on Amazon KDA (Managed Flink Cluster). The application basically reads from Kinesis, window data into a fixed duration of size ~30s, then publish data back to PubSub.

       pipeline
                .apply("Read from Kinesis",  new KinesisIORead())
                .apply("Windowing", Window.into(FixedWindows.of(Duration.standardSeconds(30))))
                .apply(WithKeys.of(DUMMY_KEY))
                .apply(GroupIntoBatches.ofSize(5))
                .apply(Values.create())
                .apply("Map values to single object", ParDo.of(new GroupedMessage()))
                .apply("Write to Pub/Sub", new PubSubWrite()));

I'm using:

beam-sdks-java-core:2.31.0, beam-runners-flink-1.11:2.31.0, beam-sdks-java-io-kafka:2.31.0


Checkpointing configs are as follows:

CheckpointingEnabled: true,
CheckpointInterval: 60000,
MinPauseBetweenCheckpoints: 5000

FasterCopy is also enabled.

16 task slots are allocated for the application.


The pipeline usually runs fine for about 15-20 mins, then start making intermittent checkpoint failures.

The exception is:

Error while processing checkpoint acknowledgement message

org.apache.flink.util.SerializedThrowable: s3://3db4bd0e0169500d35dc925c1fa9414b79d097b8/a15fb789b6dab24701c19f42a61b1cf7-939927294066-1628593971563/checkpoints/a15fa789b6dab24701c19f41a61b1cf7/chk-20/_metadata already exists     at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:758) ~[?:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
    at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) ~[?:?]     at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[?:?]     at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    ... 8 more
Wrapped by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 20. Failure reason: Failure to finalize checkpoint.     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:819) ~[flink-dist_2.12-1.11.1.jar:1.11.1]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]

But I can't seem to find any Flink metric that correlate with these intermittent checkpoint failures.


I really appreciate any insights on troubleshooting this.


--
Thanks & regards,

Gayan Weerakutti

linkedin.com/in/gayanweerakutti <https://www.linkedin.com/in/gayanweerakutti/>










Reply via email to