Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
We are running spark on yarn with combined memory  1TB and when trying to
cache a table partition(which is  100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!

The stack trace of the failed collect from UI is:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
This is the log output:

2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'

2014-11-12 19:07:17,455 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps

2014-11-12 19:07:17,756 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68

2014-11-12 19:07:18,292 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)

2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:07:22,916 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:10:04,112 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated

2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52

2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s

2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure

2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages

2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:10:08,989 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at

Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote:

 This is the log output:

 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
 (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
 SELECT * FROM xyz where date_prefix = 20141112'

 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
 (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
 deprecated. Instead, use mapreduce.job.maps

 2014-11-12 19:07:17,756 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
 TableReader.scala:68

 2014-11-12 19:07:18,292 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
 (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
 Exchange.scala:86)

 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
 with 1 output partitions (allowLocal=false)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
 SparkPlan.scala:84)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
 mapPartitions at Exchange.scala:86), which has no missing parents

 2014-11-12 19:07:22,916 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
 finished in 161.113 s

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - looking for newly runnable stages

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - running: Set()

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - failed: Set()

 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
 SparkPlan.scala:84), which is now runnable

 2014-11-12 19:10:04,112 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
 (MappedRDD[16] at map at SparkPlan.scala:84)

 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
 (Logging.scala:logError(75)) - Lost executor 52 on
 ip-10-61-175-167.ec2.internal: remote Akka client disassociated

 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
 (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
 [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
 (Logging.scala:logError(75)) - Asked to remove non-existent executor 52

 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
 - Stage 0 is now unavailable on executor 52 (460/461, false)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
 SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
 (mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) -