[ https://issues.apache.org/jira/browse/MAPREDUCE-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013137#comment-18013137 ]
ASF GitHub Bot commented on MAPREDUCE-7508: ------------------------------------------- liangyu-1 commented on code in PR #7859: URL: https://github.com/apache/hadoop/pull/7859#discussion_r2265543550 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java: ########## @@ -369,6 +369,9 @@ public InputSplit[] getSplits(JobConf job, int numSplits) } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } + if (blkLocations.length == 0){ Review Comment: @Hexiaoqiao thanks for review. It is a rare corner case, only happens when using spark streaming monitors a path where the upstream system sometimes starts two identical tasks that attempt to create and write to the same HDFS file simultaneously. This can lead to conflicts where a file is created and written to twice in quick succession. When Spark scans for files, it uses the FileInputFormat.getSplits() method to split the file. The first step in getSplits is to retrieve the file's length. If the file length is not zero, the next step is to get the block locations array for that file. However, if the two upstream programs rapidly create and write to the same file (i.e., the file is overwritten or appended to almost simultaneously), a race condition may occur: The file's length is already non-zero, but calling getFileBlockLocations() returns an empty array because the file is being overwritten or is not yet fully written. > FileInputFormat can throw ArrayIndexOutofBoundsException because of some > concurrent execution. > ---------------------------------------------------------------------------------------------- > > Key: MAPREDUCE-7508 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7508 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: mapreduce-client > Reporter: liang yu > Priority: Major > Labels: pull-request-available > > Using Spark Streaming (version 2.4.0), Hadoop Mapreduce-client (version 2.6.5) > {*}Scenario{*}: > I am using Spark Streaming to process files stored in HDFS. In my setup, the > upstream system sometimes starts two identical tasks that attempt to create > and write to the same HDFS file simultaneously. This can lead to conflicts > where a file is created and written to twice in quick succession. > {*}Problem{*}: > When Spark scans for files, it uses the FileInputFormat.getSplits() method to > split the file. The first step in getSplits is to retrieve the file's length. > If the file length is not zero, the next step is to get the block locations > array for that file. However, if the two upstream programs rapidly create and > write to the same file (i.e., the file is overwritten or appended to almost > simultaneously), a race condition may occur: > The file's length is already non-zero, > but calling getFileBlockLocations() returns an empty array because the file > is being overwritten or is not yet fully written. > When this happens, subsequent logic in getSplits (such as accessing the last > element of the block locations array) will throw an > ArrayIndexOutOfBoundsException because the block locations array is > unexpectedly empty. > {*}Summary{*}: > This issue can occur when multiple upstream writers operate on the same HDFS > file nearly simultaneously. As a result, Spark jobs may intermittently fail > due to an unhandled empty block locations array in > FileInputFormat.getSplits() when processing files that are in the process of > being overwritten or not yet fully written. > > *Exception Stacktrace:* > {code:java} > [ERROR] 2025-08-06 15:22:02 org.apache.spark.deploy.yarn.ApplicationMaster:91 > - User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 0 > java.lang.ArrayIndexOutOfBoundsException: 0 at > org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:449) > at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at > scala.Option.getOrElse(Option.scala:121) at > org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at > org.apache.spark.streaming.dstream.MyFileInputDStream.org$apache$spark$streaming$dstream$MyFileInputDStream$$filesToRDD(MyFileInputDStream.scala:350) > at > org.apache.spark.streaming.dstream.MyFileInputDStream.compute(MyFileInputDStream.scala:155) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [WARN] > 2025-08-06 15:22:32 org.apache.hadoop.util.ShutdownHookManager:128 - > ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException > java.util.concurrent.TimeoutException at > java.util.concurrent.FutureTask.get(FutureTask.java:205) at > org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124) > at > org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95) > [ERROR] 2025-08-06 15:22:32 org.apache.spark.util.Utils:91 - Uncaught > exception in thread shutdown-hook-0 java.lang.InterruptedException at > java.lang.Object.wait(Native Method) at > java.lang.Thread.join(Thread.java:1252) at > java.lang.Thread.join(Thread.java:1326) at > org.apache.spark.streaming.util.RecurringTimer.stop(RecurringTimer.scala:86) > at > org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:137) > at > org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123) > at > org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681) > at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) at > org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680) > at > org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714) > at > org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599) > at > org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org