[jira] [Updated] (FLINK-33932) Support retry mechanism for rocksdb uploader

2024-01-04 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-33932:
---
Description: 
Rocksdb uploader will throw exception and decline the current checkpoint if 
there are errors when uploading to remote file system like hdfs.

The exception is as below:
{noformat}
 
2023-12-19 08:46:00,197 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
checkpoint 2 by task 
5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job 
a025f19e at 
application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
org.apache.flink.util.SerializedThrowable: 
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
checkpoint failed.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
Calc[133] (184/500)#0.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
Could not flush to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 ~[?:?]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.net.ConnectException: Connection timed out
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) 
~[?:?]
    at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    

[jira] [Updated] (FLINK-33932) Support retry mechanism for rocksdb uploader

2023-12-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33932:
---
Labels: pull-request-available  (was: )

> Support retry mechanism for rocksdb uploader
> 
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
>
> Rocksdb uploader will throw exception and decline the current checkpoint if 
> there are errors when uploading to remote file system like hdfs.
> The exception is as below:
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.net.ConnectException: Connection timed out
>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) 
> ~[?:?]
>     at 
>