[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015628#comment-18015628
 ] 

ASF GitHub Bot commented on MAPREDUCE-7508:
-------------------------------------------

liangyu-1 commented on code in PR #7859:
URL: https://github.com/apache/hadoop/pull/7859#discussion_r2293267533


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java:
##########
@@ -369,6 +369,9 @@ public InputSplit[] getSplits(JobConf job, int numSplits)
         } else {
           blkLocations = fs.getFileBlockLocations(file, 0, length);
         }
+        if (blkLocations.length == 0){

Review Comment:
   > Thanks. Got it. Make sense to me. +1 to check in. BTW, the root cause here 
is invoke `listStatus` to get file status and invoke another interface 
`getFileBlockLocations` to get block location, but file has changed between 
this steps, right? If it is true, is it proper to use `blkLocations` only as 
condition at L364 rather than `file.length` which could be not the correct 
result here? Thanks again.
   
   Thanks for your review. It is a good solution to use `blkLocations` only as 
condition at L364, if blkLocations array is empty, `file.length` is also empty, 
this will not effect the later opretions.





> FileInputFormat can throw ArrayIndexOutofBoundsException because of some 
> concurrent execution.
> ----------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7508
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7508
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mapreduce-client
>            Reporter: liang yu
>            Priority: Major
>              Labels: pull-request-available
>
> Using Spark Streaming (version 2.4.0), Hadoop Mapreduce-client (version 2.6.5)
> {*}Scenario{*}:
> I am using Spark Streaming to process files stored in HDFS. In my setup, the 
> upstream system sometimes starts two identical tasks that attempt to create 
> and write to the same HDFS file simultaneously. This can lead to conflicts 
> where a file is created and written to twice in quick succession.
> {*}Problem{*}:
> When Spark scans for files, it uses the FileInputFormat.getSplits() method to 
> split the file. The first step in getSplits is to retrieve the file's length. 
> If the file length is not zero, the next step is to get the block locations 
> array for that file. However, if the two upstream programs rapidly create and 
> write to the same file (i.e., the file is overwritten or appended to almost 
> simultaneously), a race condition may occur:
> The file's length is already non-zero,
> but calling getFileBlockLocations() returns an empty array because the file 
> is being overwritten or is not yet fully written.
> When this happens, subsequent logic in getSplits (such as accessing the last 
> element of the block locations array) will throw an 
> ArrayIndexOutOfBoundsException because the block locations array is 
> unexpectedly empty.
> {*}Summary{*}:
> This issue can occur when multiple upstream writers operate on the same HDFS 
> file nearly simultaneously. As a result, Spark jobs may intermittently fail 
> due to an unhandled empty block locations array in 
> FileInputFormat.getSplits() when processing files that are in the process of 
> being overwritten or not yet fully written.
>  
> *Exception Stacktrace:*
> {code:java}
> [ERROR] 2025-08-06 15:22:02 org.apache.spark.deploy.yarn.ApplicationMaster:91 
> - User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 0 
> java.lang.ArrayIndexOutOfBoundsException: 0 at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:449)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130) 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at 
> scala.Option.getOrElse(Option.scala:121) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at 
> org.apache.spark.streaming.dstream.MyFileInputDStream.org$apache$spark$streaming$dstream$MyFileInputDStream$$filesToRDD(MyFileInputDStream.scala:350)
>  at 
> org.apache.spark.streaming.dstream.MyFileInputDStream.compute(MyFileInputDStream.scala:155)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) 
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [WARN] 
> 2025-08-06 15:22:32 org.apache.hadoop.util.ShutdownHookManager:128 - 
> ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException 
> java.util.concurrent.TimeoutException at 
> java.util.concurrent.FutureTask.get(FutureTask.java:205) at 
> org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
>  at 
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95) 
> [ERROR] 2025-08-06 15:22:32 org.apache.spark.util.Utils:91 - Uncaught 
> exception in thread shutdown-hook-0 java.lang.InterruptedException at 
> java.lang.Object.wait(Native Method) at 
> java.lang.Thread.join(Thread.java:1252) at 
> java.lang.Thread.join(Thread.java:1326) at 
> org.apache.spark.streaming.util.RecurringTimer.stop(RecurringTimer.scala:86) 
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:137)
>  at 
> org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681)
>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) at 
> org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680) 
> at 
> org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599)
>  at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to