Youjun Yuan created FLINK-24187:
-----------------------------------

             Summary: Could not commit s3 file after JM restart during state 
initialization
                 Key: FLINK-24187
                 URL: https://issues.apache.org/jira/browse/FLINK-24187
             Project: Flink
          Issue Type: Bug
          Components: FileSystems
    Affects Versions: 1.12.1
            Reporter: Youjun Yuan


we have a SQL job which consumes from Kafka, and write hive table, data stored 
in S3.

One day the zookeeper leader failed over, caused Flink job restart. However the 
job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64
 to 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371
 at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) 
~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) 
[?:1.8.0_242] Caused by: java.io.IOException: 
java.util.concurrent.CancellationException at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) 
~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more Caused 
by: java.util.concurrent.CancellationException at 
java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_242] at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242] at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:262)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:249)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:169)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) 
~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to