[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2021-04-22 Thread Paul Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327249#comment-17327249
 ] 

Paul Lin commented on FLINK-13874:
--

Hi [~kkl0u], I have some opinoins on your questions, please see if they make 
sense.

> I may be missing something but we are in the process of recovering from a 
> failure, so I would expect the DFSClient that is trying to acquire the lease 
> to be different from the pre-failure holder, right?

I think the cause may be that Flink caches FileSystems (see 
[FileSystem](https://github.com/apache/flink/blob/fa0be65cf81cf4cb53ddbfb8c5cedb271e264f19/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L235)),
 so if a task recovered from the same taskmanager, it may get the same 
DfsClient as the previous execution.

> Also, I would expect that trying to reacquire a lease that we already have 
> should be allowed by HDFS. Right?
Yes, but under the hook truncate needs a lease recovery, which may release the 
lease forcefully. So if the client is already the leaseholder, it should close 
the file before calling truncate. I think we can close the file in a normal way 
if the exception occurs. WDYT?

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.F

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2020-06-26 Thread JIAN WANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146770#comment-17146770
 ] 

JIAN WANG commented on FLINK-13874:
---

Hi [~gyfora] , I still meet the same issue on flink-1.10.1. I use flink on 
YARN(3.0.0-cdh6.3.2) with StreamingFileSink. 

code part like this:
```

*public* *static*  StreamingFileSink build(String dir, 
BucketAssigner assigner, String prefix) {

        *return* StreamingFileSink

                ._forRowFormat_(*new* Path(dir), *new* 
SimpleStringEncoder())

                .withRollingPolicy(DefaultRollingPolicy._builder_()

                        .withRolloverInterval(TimeUnit.*_HOURS_*.toMillis(2))

                        
.withInactivityInterval(TimeUnit.*_MINUTES_*.toMillis(10))

                        .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB

                        .build())

                .withBucketAssigner(assigner)

                
.withOutputFileConfig(OutputFileConfig._builder_().withPartPrefix(prefix).build())

                .build();

    }
```

The error is 
```

java.io.IOException: Problem while truncating file: 
hdfs:///business_log/hashtag/2020-06-25/.hashtag-122-37.inprogress.8e65f69c-b5ba-4466-a844-ccc0a5a93de2
```

Due to this issue, it can not restart from the latest checkpoint and savepoint.

 

 

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   a

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918530#comment-16918530
 ] 

Kostas Kloudas commented on FLINK-13874:


Thanks and keep us posted on this because it seems like many users may bump 
into it.

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918528#comment-16918528
 ] 

Gyula Fora commented on FLINK-13874:


We will investigate this further in the future to see what in HDFS might cause 
it, I will close this issue for now.

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918526#comment-16918526
 ] 

Gyula Fora commented on FLINK-13874:


This seems like an issue with HDFS as the lease cannot be recovered for certain 
files for a long period of time. Something with the Block recovery process 
might be off. These files cannot be even closed manually using the hdfs client. 
Only a restart of HDFS solves the problem in many cases

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
> 

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918512#comment-16918512
 ] 

Kostas Kloudas commented on FLINK-13874:


Hi [~gyfora], but I still do not get how come and the same DFSClient already 
has the lease.

I may be missing something but we are in the process of recovering from a 
failure, so I would expect the DFSClient that is trying to acquire the lease to 
be different from the pre-failure holder, right?

Also, I would expect that trying to reacquire a lease that we already have 
should be allowed by HDFS. Right?

 

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at 

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918043#comment-16918043
 ] 

Gyula Fora commented on FLINK-13874:


I increased the lease timeout to 5 minutes, it still failed sometimes but it 
could recover afterwards. Namenode logs don't seem to contain any additional 
information on what happened, why it was slow. It just looks normal...

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917833#comment-16917833
 ] 

Gyula Fora commented on FLINK-13874:


And while the lease revoking process seem to time out for certain files, for 
others it runs in 1-2 seconds, within the same taskmanager. 

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.secur

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917819#comment-16917819
 ] 

Gyula Fora commented on FLINK-13874:


I added some extra logging to figure out what the problem is.
What seems to happen is that the lease revoke timeout is reached before before 
we move to the truncate, so the file is never closed before truncating:

[https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L341]

 

I wonder what might cause it to take so long, and what can we do in these 
cases. Probably throw an error or at least log a warning?

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.ap

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-27 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916795#comment-16916795
 ] 

Gyula Fora commented on FLINK-13874:


Seems like it: 
[https://github.com/apache/hadoop/blob/branch-2.7.7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java#L2908]

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivilege

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-27 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916787#comment-16916787
 ] 

Kostas Kloudas commented on FLINK-13874:


Thanks for looking into it [~gyfora] . I did not dig into it yet but in the 
trace you posted there is the line:

Failed to TRUNCATE_FILE ... for *DFSClient_NONMAPREDUCE_-1189574442_56* on 
172.31.114.177 because *DFSClient_NONMAPREDUCE_-1189574442_56 is already the 
current lease* *holder*.

The client seems to be the same so can this be that we are trying to get the 
lease twice from the same task and the lease is not "reentrant"?

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngi