[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327249#comment-17327249 ] Paul Lin commented on FLINK-13874: -- Hi [~kkl0u], I have some opinoins on your questions, please see if they make sense. > I may be missing something but we are in the process of recovering from a > failure, so I would expect the DFSClient that is trying to acquire the lease > to be different from the pre-failure holder, right? I think the cause may be that Flink caches FileSystems (see [FileSystem](https://github.com/apache/flink/blob/fa0be65cf81cf4cb53ddbfb8c5cedb271e264f19/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L235)), so if a task recovered from the same taskmanager, it may get the same DfsClient as the previous execution. > Also, I would expect that trying to reacquire a lease that we already have > should be allowed by HDFS. Right? Yes, but under the hook truncate needs a lease recovery, which may release the lease forcefully. So if the client is already the leaseholder, it should close the file before calling truncate. I think we can close the file in a normal way if the exception occurs. WDYT? > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.F
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146770#comment-17146770 ] JIAN WANG commented on FLINK-13874: --- Hi [~gyfora] , I still meet the same issue on flink-1.10.1. I use flink on YARN(3.0.0-cdh6.3.2) with StreamingFileSink. code part like this: ``` *public* *static* StreamingFileSink build(String dir, BucketAssigner assigner, String prefix) { *return* StreamingFileSink ._forRowFormat_(*new* Path(dir), *new* SimpleStringEncoder()) .withRollingPolicy(DefaultRollingPolicy._builder_() .withRolloverInterval(TimeUnit.*_HOURS_*.toMillis(2)) .withInactivityInterval(TimeUnit.*_MINUTES_*.toMillis(10)) .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB .build()) .withBucketAssigner(assigner) .withOutputFileConfig(OutputFileConfig._builder_().withPartPrefix(prefix).build()) .build(); } ``` The error is ``` java.io.IOException: Problem while truncating file: hdfs:///business_log/hashtag/2020-06-25/.hashtag-122-37.inprogress.8e65f69c-b5ba-4466-a844-ccc0a5a93de2 ``` Due to this issue, it can not restart from the latest checkpoint and savepoint. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > a
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918530#comment-16918530 ] Kostas Kloudas commented on FLINK-13874: Thanks and keep us posted on this because it seems like many users may bump into it. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) >
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918528#comment-16918528 ] Gyula Fora commented on FLINK-13874: We will investigate this further in the future to see what in HDFS might cause it, I will close this issue for now. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918526#comment-16918526 ] Gyula Fora commented on FLINK-13874: This seems like an issue with HDFS as the lease cannot be recovered for certain files for a long period of time. Something with the Block recovery process might be off. These files cannot be even closed manually using the hdfs client. Only a restart of HDFS solves the problem in many cases > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) >
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918512#comment-16918512 ] Kostas Kloudas commented on FLINK-13874: Hi [~gyfora], but I still do not get how come and the same DFSClient already has the lease. I may be missing something but we are in the process of recovering from a failure, so I would expect the DFSClient that is trying to acquire the lease to be different from the pre-failure holder, right? Also, I would expect that trying to reacquire a lease that we already have should be allowed by HDFS. Right? > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918043#comment-16918043 ] Gyula Fora commented on FLINK-13874: I increased the lease timeout to 5 minutes, it still failed sometimes but it could recover afterwards. Namenode logs don't seem to contain any additional information on what happened, why it was slow. It just looks normal... > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) >
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917833#comment-16917833 ] Gyula Fora commented on FLINK-13874: And while the lease revoking process seem to time out for certain files, for others it runs in 1-2 seconds, within the same taskmanager. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.secur
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917819#comment-16917819 ] Gyula Fora commented on FLINK-13874: I added some extra logging to figure out what the problem is. What seems to happen is that the lease revoke timeout is reached before before we move to the truncate, so the file is never closed before truncating: [https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L341] I wonder what might cause it to take so long, and what can we do in these cases. Probably throw an error or at least log a warning? > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.ap
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916795#comment-16916795 ] Gyula Fora commented on FLINK-13874: Seems like it: [https://github.com/apache/hadoop/blob/branch-2.7.7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java#L2908] > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivilege
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916787#comment-16916787 ] Kostas Kloudas commented on FLINK-13874: Thanks for looking into it [~gyfora] . I did not dig into it yet but in the trace you posted there is the line: Failed to TRUNCATE_FILE ... for *DFSClient_NONMAPREDUCE_-1189574442_56* on 172.31.114.177 because *DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease* *holder*. The client seems to be the same so can this be that we are trying to get the lease twice from the same task and the lease is not "reentrant"? > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngi