Re: Custom return code
Any hint? 2016-08-31 20:40 GMT+02:00 Pierre Villard <pierre.villard...@gmail.com>: > Hi, > > I am using Spark 1.5.2 and I am submitting a job (jar file) using > spark-submit command in a yarn cluster mode. I'd like the command to return > a custom return code. > > In the run method, if I do: > sys.exit(myCode) > the command will always return 0. > > The only way to have something not equal to 0 is to throw an exception and > this will return 1. > > Is there a way to have a custom return code from the job application? > > Thanks a lot! >
Custom return code
Hi, I am using Spark 1.5.2 and I am submitting a job (jar file) using spark-submit command in a yarn cluster mode. I'd like the command to return a custom return code. In the run method, if I do: sys.exit(myCode) the command will always return 0. The only way to have something not equal to 0 is to throw an exception and this will return 1. Is there a way to have a custom return code from the job application? Thanks a lot!
Spark driver memory keeps growing
Hi, I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast variables access. The thing is I am not sure to understand why the broadcast keeps growing and why it does at this place of code. Basically, I have a large input file, each line having a key. I group by key my lines to have one object with all data related to a given key. Then I am doing a map to iterate over my objects, and for each object, I iterate over a collection which is a broadcast variable. The exception is thrown when iterating on the broadcast variable. Here is a quick example: Input file : key1,row1 key2,row1 key1,row2 key2,row2 Broadcast variable is a list: List(1,2,3) I group by key my input : key1, (row1,row2) key2, (row1,row2) Then I do a map myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object, item) ) I know the groupByKey is a very costly operation but I am not sure I can avoid it since the 'executeFunction' needs to have all the lines for a given key to be executed. Besides the stage where the groupByKey is performed is successfully completed when the exception is thrown. Here is an extract from the logs: 16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:18:37 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:19:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.4 GB so far) 16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:20:53 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.6 GB so far) 16/08/04 03:21:11 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.7 GB so far) 16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:44:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.4 GB so far) 16/08/04 03:53:03 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.8 GB so far) 16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 4.1 GB so far) 16/08/04 04:20:52 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/08/04 04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID 1109) java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) ... at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30) Any suggestion regarding what could explain this behavior? Thanks!
Re: Does saveAsHadoopFile depend on master?
Hi guys! I solved my issue, that was indeed a permission problem. Thanks for your help. 2016-06-22 9:27 GMT+02:00 Spico Florin <spicoflo...@gmail.com>: > Hi! > I had a similar issue when the user that submit the job to the spark > cluster didn't have permission to write into the hdfs. If you have the hdfs > GUI then you can check which users are and what permissions. Also can in > hdfs browser:( > http://stackoverflow.com/questions/27996034/opening-a-hdfs-file-in-browser. > ) if your folder structure was created. > > If you have yarn, an example ciuld be spark-submit --class "yopur.class" > --master yarn-client --num-executors 12 --executor-memory 16g > --driver-memory 8g --executor-cores 8 .jar > hdfs:output > > I hope it helps. > Regards,\ Florin > > On Wed, Jun 22, 2016 at 4:57 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> Please check the driver and executor log, there should be logs about >> where the data is written. >> >> >> >> On Wed, Jun 22, 2016 at 2:03 AM, Pierre Villard < >> pierre.villard...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a Spark job writing files to HDFS using .saveAsHadoopFile method. >>> >>> If I run my job in local/client mode, it works as expected and I get all >>> my files written in HDFS. However if I change to yarn/cluster mode, I don't >>> see any error logs (the job is successful) and there is no files written to >>> HDFS. >>> >>> Is there any reason for this behavior? Any thoughts on how to track down >>> what is happening here? >>> >>> Thanks! >>> >>> Pierre. >>> >> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > >
Does saveAsHadoopFile depend on master?
Hi, I have a Spark job writing files to HDFS using .saveAsHadoopFile method. If I run my job in local/client mode, it works as expected and I get all my files written in HDFS. However if I change to yarn/cluster mode, I don't see any error logs (the job is successful) and there is no files written to HDFS. Is there any reason for this behavior? Any thoughts on how to track down what is happening here? Thanks! Pierre.
MetadataFetchFailedException: Missing an output location for shuffle 0
Hi, I have set up a spark job and it keeps failing even though I tried a lot of different configurations regarding memory parameters (as suggested in other threads I read). My configuration: Cluster of 4 machines: 4vCPU, 16Go RAM. YARN version: 2.7.1 Spark version: 1.5.2 I tried a lot of configurations regarding the memory by executor, and the YARN executor overhead. The last try was : 4 executors, 8Go by executor, 4Go overhead (spark.yarn.executor.memoryOverhead). My data initially represents about 10Go only but I am joining multiple datasets and the 10Go are not representative of the final dataset on which I am trying to do some machine learning. As far as I looked into the logs, YARN does not seem to be killing the container. Besides, when persisting RDDs I changed the storage level to MEMORY_AND_DISK_SER. Aat the end it keeps failing with the following stack trace: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) For the record, when executing the job with a much smaller input dataset, everything is OK. Any idea? Thanks! Pierre