[jira] [Comment Edited] (FLINK-33932) Support retry mechanism for rocksdb uploader
[ https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800258#comment-17800258 ] xiangyu feng edited comment on FLINK-33932 at 12/29/23 12:24 PM: - Hi [~dianer17], thanks for creating this. IMHO, this retry is very useful to improve the success rate of checkpoint. We can introduce a default fixed delay retry mechanism here. I have implemented a poc in this pr: [https://github.com/apache/flink/pull/23986], WDYT? was (Author: JIRAUSER301129): Hi [~dianer17], thanks for creating this. IMHO, this retry is very useful to improve the success rate of checkpoint. We can introduce a default fixed delay retry mechanism without adding any configuration, thus we don't need a flip for this jira. I have implemented a poc in this pr: [https://github.com/apache/flink/pull/23986], WDYT? > Support retry mechanism for rocksdb uploader > > > Key: FLINK-33932 > URL: https://issues.apache.org/jira/browse/FLINK-33932 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > > Rocksdb uploader will throw exception and decline the current checkpoint if > there are errors when uploading to remote file system like hdfs. > The exception is as below: > 2023-12-19 08:46:00,197 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline > checkpoint 2 by task > 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of > job a025f19e at > application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ > fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). > org.apache.flink.util.SerializedThrowable: > org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task > checkpoint failed. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: > Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> > Calc[133] (184/500)#0. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 4 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: java.io.IOException: Could not flush > to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] > at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] > at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 3 more > Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: > Could not flush to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at >
[jira] [Comment Edited] (FLINK-33932) Support retry mechanism for rocksdb uploader
[ https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800258#comment-17800258 ] xiangyu feng edited comment on FLINK-33932 at 12/25/23 8:55 AM: Hi [~dianer17], thanks for creating this. IMHO, this retry is very useful to improve the success rate of checkpoint. We can introduce a default fixed delay retry mechanism without adding any configuration, thus we don't need a flip for this jira. I have implemented a poc in this pr: [https://github.com/apache/flink/pull/23986], WDYT? was (Author: JIRAUSER301129): Hi [~dianer17], thanks for creating this. IMHO, this retry is very useful to improve the success rate of checkpoint. We can introduce a default fixed delay retry mechanism without adding any configuration, thus we don't need a flip for this jira. I have implemented a poc in this pr: [https://github.com/apache/flink/pull/23986,] WDYT? > Support retry mechanism for rocksdb uploader > > > Key: FLINK-33932 > URL: https://issues.apache.org/jira/browse/FLINK-33932 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > > Rocksdb uploader will throw exception and decline the current checkpoint if > there are errors when uploading to remote file system like hdfs. > The exception is as below: > 2023-12-19 08:46:00,197 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline > checkpoint 2 by task > 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of > job a025f19e at > application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ > fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). > org.apache.flink.util.SerializedThrowable: > org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task > checkpoint failed. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: > Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> > Calc[133] (184/500)#0. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 4 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: java.io.IOException: Could not flush > to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] > at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] > at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 3 more > Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: > Could not flush to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at >