Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one.
Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.edu<http://demeter-csmau08-19.demeter.hpc.mssm.edu>): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.<init>(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dump<https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0>[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in * A.persist() * A.count() // succeeds * screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v * I don't know why each task reports "8 TB" of "Input"; that metric seems like it is always ludicrously high and I don't pay attention to it typically. * Each task shuffle-writes 3.5GB, for a total of 4.9TB * Does that mean that 4.9TB is the uncompressed size of the file that A was read from? * 4.9TB is pretty close to the total amount of memory I've configured the job to use: (50GB/executor) * (100 executors) ~= 5TB. * Is that a coincidence, or are my executors shuffle-writing an amount equal to all of their memory for some reason? * val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x => x).persist() * my expectation is that ~all elements pass the filter step, so B should ~equal to A, just to give a sense of the expected memory blowup. * B.count() * this fails while executing .groupBy(...) above I've found a few discussions of issues whose manifestations look *like* this, but nothing that is obviously the same issue. The closest hit I've seen is "Stage failure in BlockManager...<http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E>"[2] on this list on 8/20; some key excerpts: * "likely due to a bug in shuffle file consolidation" * "hopefully fixed in 1.1 with this patch: https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd" * 78f2af5<https://github.com/apache/spark/commit/78f2af5>[3] implements pieces of #1609<https://github.com/apache/spark/pull/1609>[4], on which mridulm has a comment<https://github.com/apache/spark/pull/1609#issuecomment-54393908>[5] saying: "it got split into four issues, two of which got committed, not sure of the other other two .... And the first one was regressed upon in 1.1.already." * "Until 1.0.3 or 1.1 are released, the simplest solution is to disable spark.shuffle.consolidateFiles." * I've not tried this yet as I'm waiting on a re-run with some other parameters tweaked first. * Also, I can't tell if it's expected that this was fixed, known that it subsequently regressed, etc., so hoping for some guidance there. So! Anyone else seen this? Is this related to the "bug in shuffle file consolidation"? Was it fixed? Did it regress? Are my confs or other steps unreasonable in some way? Any assistance would be appreciated, thanks. -Ryan [1] https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0 [2] http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E [3] https://github.com/apache/spark/commit/78f2af5 [4] https://github.com/apache/spark/pull/1609 [5] https://github.com/apache/spark/pull/1609#issuecomment-54393908