Re: Spark SQL : Join operation failure
It might be a memory issue. Try adding .persist(MEMORY_AND_DISK_ONLY) so that if the RDD can't fit into memory it will persist parts of the RDD into disk. cm_go.registerTempTable("x") ko.registerTempTable("y") joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2") joined_df.persist(StorageLevel.MEMORY_AND_DISK_ONLY) joined_df.write.save("/user/data/output") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-operation-failure-tp28414p28422.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark SQL : Join operation failure
Your error message is not clear about what really happens. Is your container killed by Yarn, or it indeed runs OOM? When I run the spark job with big data, here is normally what I will do: 1) Enable GC output. You need to monitor the GC output in the executor, to understand the GC pressure. If you see the feq full GC, you know your job is in danger. 2) Monitor the statistics of tasks in feq full GC executor. How many records are processing so far, what is the spill read/write bytes. Is the OOM only happening in one task with much higher statistics than the rest? This normally means data skew. If lots of task all have GC pressure, then your setting is just not enough for job. 3) In your case, you first want to know what kind of join Spark is using for your outer join. Does it make sense for your data? Wrong join way could lead to wrong way to do the job. Yong From: jatinpreet Sent: Wednesday, February 22, 2017 1:11 AM To: user@spark.apache.org Subject: Spark SQL : Join operation failure Hi, I am having a hard time running outer join operation on two parquet datasets. The dataset size is large ~500GB with a lot of culumns in tune of 1000. As per YARN administer imposed limits in the queue, I can have a total of 20 vcores and 8GB memory per executor. I specified meory overhead and increased number of shuffle partitions to no avail. This is how I submitted the job with pyspark, spark-submit --master yarn-cluster --executor-memory 5500m --num-executors 19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue ./ The relevant code is, cm_go.registerTempTable("x") ko.registerTempTable("y") joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2") joined_df.write.save("/user/data/output") I am getting errors like these: ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container marked as failed: container_e36_1487531133522_0058_01_06 on host: dn2.bigdatalab.org. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_e36_1487531133522_0058_01_06 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:933) at org.apache.hadoop.util.Shell.run(Shell.java:844) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 52 -- FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(
Spark SQL : Join operation failure
Hi, I am having a hard time running outer join operation on two parquet datasets. The dataset size is large ~500GB with a lot of culumns in tune of 1000. As per YARN administer imposed limits in the queue, I can have a total of 20 vcores and 8GB memory per executor. I specified meory overhead and increased number of shuffle partitions to no avail. This is how I submitted the job with pyspark, spark-submit --master yarn-cluster --executor-memory 5500m --num-executors 19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue ./ The relevant code is, cm_go.registerTempTable("x") ko.registerTempTable("y") joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2") joined_df.write.save("/user/data/output") I am getting errors like these: ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container marked as failed: container_e36_1487531133522_0058_01_06 on host: dn2.bigdatalab.org. Exit status: 52. Diagnostics: Exception from container-launch. Container id: container_e36_1487531133522_0058_01_06 Exit code: 52 Stack trace: ExitCodeException exitCode=52: at org.apache.hadoop.util.Shell.runCommand(Shell.java:933) at org.apache.hadoop.util.Shell.run(Shell.java:844) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 52 -- FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ) I would appreciate if someone can help me out on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-operation-failure-tp28414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org