Too many failed collects when trying to cache a table in SparkSQL
We are running spark on yarn with combined memory 1TB and when trying to cache a table partition(which is 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Too many failed collects when trying to cache a table in SparkSQL
This is the log output: 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS SELECT * FROM xyz where date_prefix = 20141112' 2014-11-12 19:07:17,455 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 2014-11-12 19:07:17,756 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at TableReader.scala:68 2014-11-12 19:07:18,292 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at Exchange.scala:86) 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) with 1 output partitions (allowLocal=false) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at SparkPlan.scala:84) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86), which has no missing parents 2014-11-12 19:07:22,916 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at DAGScheduler.scala:838 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 161.113 s 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:10:04,112 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at DAGScheduler.scala:838 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 52 on ip-10-61-175-167.ec2.internal: remote Akka client disassociated 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) - Stage 0 is now unavailable on executor 52 (460/461, false) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at SparkPlan.scala:84) as failed due to a fetch failure from Stage 0 (mapPartitions at Exchange.scala:86) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84) failed in 4.571 s 2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch failure 2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Resubmitting failed stages 2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86), which has no missing parents 2014-11-12 19:10:08,989 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
Re: Too many failed collects when trying to cache a table in SparkSQL
On re running the cache statement, from the logs I see that when collect(stage 1) fails it always leads to mapPartition(stage 0) for one partition to be re-run. This can be seen from the collect log as well on the container log: rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 The data is lzo compressed sequence file with compressed size ~ 26G. Is there a way to understand why shuffle keeps failing for one partition. I believe we have enough memory to store the uncompressed data in memory. On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote: This is the log output: 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS SELECT * FROM xyz where date_prefix = 20141112' 2014-11-12 19:07:17,455 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 2014-11-12 19:07:17,756 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at TableReader.scala:68 2014-11-12 19:07:18,292 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at Exchange.scala:86) 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) with 1 output partitions (allowLocal=false) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at SparkPlan.scala:84) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86), which has no missing parents 2014-11-12 19:07:22,916 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at DAGScheduler.scala:838 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 161.113 s 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:10:04,112 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at DAGScheduler.scala:838 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 52 on ip-10-61-175-167.ec2.internal: remote Akka client disassociated 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) - Stage 0 is now unavailable on executor 52 (460/461, false) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at SparkPlan.scala:84) as failed due to a fetch failure from Stage 0 (mapPartitions at Exchange.scala:86) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) -