[ https://issues.apache.org/jira/browse/MAPREDUCE-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015628#comment-18015628 ]
ASF GitHub Bot commented on MAPREDUCE-7508: ------------------------------------------- liangyu-1 commented on code in PR #7859: URL: https://github.com/apache/hadoop/pull/7859#discussion_r2293267533 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java: ########## @@ -369,6 +369,9 @@ public InputSplit[] getSplits(JobConf job, int numSplits) } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } + if (blkLocations.length == 0){ Review Comment: > Thanks. Got it. Make sense to me. +1 to check in. BTW, the root cause here is invoke `listStatus` to get file status and invoke another interface `getFileBlockLocations` to get block location, but file has changed between this steps, right? If it is true, is it proper to use `blkLocations` only as condition at L364 rather than `file.length` which could be not the correct result here? Thanks again. Thanks for your review. It is a good solution to use `blkLocations` only as condition at L364, if blkLocations array is empty, `file.length` is also empty, this will not effect the later opretions. > FileInputFormat can throw ArrayIndexOutofBoundsException because of some > concurrent execution. > ---------------------------------------------------------------------------------------------- > > Key: MAPREDUCE-7508 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7508 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: mapreduce-client > Reporter: liang yu > Priority: Major > Labels: pull-request-available > > Using Spark Streaming (version 2.4.0), Hadoop Mapreduce-client (version 2.6.5) > {*}Scenario{*}: > I am using Spark Streaming to process files stored in HDFS. In my setup, the > upstream system sometimes starts two identical tasks that attempt to create > and write to the same HDFS file simultaneously. This can lead to conflicts > where a file is created and written to twice in quick succession. > {*}Problem{*}: > When Spark scans for files, it uses the FileInputFormat.getSplits() method to > split the file. The first step in getSplits is to retrieve the file's length. > If the file length is not zero, the next step is to get the block locations > array for that file. However, if the two upstream programs rapidly create and > write to the same file (i.e., the file is overwritten or appended to almost > simultaneously), a race condition may occur: > The file's length is already non-zero, > but calling getFileBlockLocations() returns an empty array because the file > is being overwritten or is not yet fully written. > When this happens, subsequent logic in getSplits (such as accessing the last > element of the block locations array) will throw an > ArrayIndexOutOfBoundsException because the block locations array is > unexpectedly empty. > {*}Summary{*}: > This issue can occur when multiple upstream writers operate on the same HDFS > file nearly simultaneously. As a result, Spark jobs may intermittently fail > due to an unhandled empty block locations array in > FileInputFormat.getSplits() when processing files that are in the process of > being overwritten or not yet fully written. > > *Exception Stacktrace:* > {code:java} > [ERROR] 2025-08-06 15:22:02 org.apache.spark.deploy.yarn.ApplicationMaster:91 > - User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 0 > java.lang.ArrayIndexOutOfBoundsException: 0 at > org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:449) > at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at > scala.Option.getOrElse(Option.scala:121) at > org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at > org.apache.spark.streaming.dstream.MyFileInputDStream.org$apache$spark$streaming$dstream$MyFileInputDStream$$filesToRDD(MyFileInputDStream.scala:350) > at > org.apache.spark.streaming.dstream.MyFileInputDStream.compute(MyFileInputDStream.scala:155) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [WARN] > 2025-08-06 15:22:32 org.apache.hadoop.util.ShutdownHookManager:128 - > ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException > java.util.concurrent.TimeoutException at > java.util.concurrent.FutureTask.get(FutureTask.java:205) at > org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124) > at > org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95) > [ERROR] 2025-08-06 15:22:32 org.apache.spark.util.Utils:91 - Uncaught > exception in thread shutdown-hook-0 java.lang.InterruptedException at > java.lang.Object.wait(Native Method) at > java.lang.Thread.join(Thread.java:1252) at > java.lang.Thread.join(Thread.java:1326) at > org.apache.spark.streaming.util.RecurringTimer.stop(RecurringTimer.scala:86) > at > org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:137) > at > org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123) > at > org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681) > at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) at > org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680) > at > org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714) > at > org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599) > at > org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org