Sometimes a large number of partitions leads to memory problems. Something like
val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Everything works smoothly if I do the 99%-removal filter in Hive first. > So, all the baggage from garbage collection was breaking it. > > Is there a way to filter() out 99% of the data without having to garbage > collect 99% of the RDD? > > On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra <arun.lut...@gmail.com> wrote: > >> I tried a shorter simper version of the program, with just 1 RDD, >> essentially it is: >> >> sc.textFile(..., N).map().filter().map( blah => (id, >> 1L)).reduceByKey().saveAsTextFile(...) >> >> Here is a typical GC log trace from one of the yarn container logs: >> >> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)] >> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, >> real=0.02 secs] >> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)] >> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, >> real=0.04 secs] >> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)] >> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, >> real=0.08 secs] >> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)] >> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, >> real=0.02 secs] >> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)] >> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, >> real=0.02 secs] >> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)] >> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, >> real=0.02 secs] >> >> So ~9GB is getting GC'ed every few seconds. Which seems like a lot. >> >> Question: The filter() is removing 99% of the data. Does this 99% of the >> data get GC'ed? >> >> Now, I was able to finally get to reduceByKey() by reducing the number of >> executor-cores (to 2), based on suggestions at >> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html >> . This makes everything before reduceByKey() run pretty smoothly. >> >> I ran this with more executor-memory and less executors (most important >> thing was fewer executor-cores): >> >> --num-executors 150 \ >> --driver-memory 15g \ >> --executor-memory 110g \ >> --executor-cores 32 \ >> >> But then, reduceByKey() fails with: >> >> java.lang.OutOfMemoryError: Java heap space >> >> >> >> >> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra <arun.lut...@gmail.com> >> wrote: >> >>> The Spark UI names the line number and name of the operation >>> (repartition in this case) that it is performing. Only if this information >>> is wrong (just a possibility), could it have started groupByKey already. >>> >>> I will try to analyze the amount of skew in the data by using >>> reduceByKey (or simply countByKey) which is relatively inexpensive. For the >>> purposes of this algorithm I can simply log and remove keys with huge >>> counts, before doing groupByKey. >>> >>> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson <ilike...@gmail.com> >>> wrote: >>> >>>> All stated symptoms are consistent with GC pressure (other nodes >>>> timeout trying to connect because of a long stop-the-world), quite possibly >>>> due to groupByKey. groupByKey is a very expensive operation as it may bring >>>> all the data for a particular partition into memory (in particular, it >>>> cannot spill values for a single key, so if you have a single very skewed >>>> key you can get behavior like this). >>>> >>>> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc <paul.sz...@gmail.com> >>>> wrote: >>>> >>>>> But groupbykey will repartition according to numer of keys as I >>>>> understand how it works. How do you know that you haven't reached the >>>>> groupbykey phase? Are you using a profiler or do yoi base that assumption >>>>> only on logs? >>>>> >>>>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik < >>>>> arun.lut...@gmail.com> napisał: >>>>> >>>>> A correction to my first post: >>>>>> >>>>>> There is also a repartition right before groupByKey to help avoid >>>>>> too-many-open-files error: >>>>>> >>>>>> >>>>>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>>>> >>>>>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> The job fails before getting to groupByKey. >>>>>>> >>>>>>> I see a lot of timeout errors in the yarn logs, like: >>>>>>> >>>>>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 >>>>>>> attempts >>>>>>> akka.pattern.AskTimeoutException: Timed out >>>>>>> >>>>>>> and >>>>>>> >>>>>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 >>>>>>> attempts >>>>>>> java.util.concurrent.TimeoutException: Futures timed out after [30 >>>>>>> seconds] >>>>>>> >>>>>>> and some of these are followed by: >>>>>>> >>>>>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: >>>>>>> Driver Disassociated [akka.tcp://sparkExecutor@...] -> >>>>>>> [akka.tcp://sparkDriver@...] disassociated! Shutting down. >>>>>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task >>>>>>> 421027.0 in stage 1.0 (TID 336601) >>>>>>> java.io.FileNotFoundException: >>>>>>> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0 >>>>>>> (No such file or directory) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I would first check whether there is any possibility that after >>>>>>>> doing groupbykey one of the groups does not fit in one of the >>>>>>>> executors' >>>>>>>> memory. >>>>>>>> >>>>>>>> To back up my theory, instead of doing groupbykey + map try >>>>>>>> reducebykey + mapvalues. >>>>>>>> >>>>>>>> Let me know if that helped. >>>>>>>> >>>>>>>> Pawel Szulc >>>>>>>> http://rabbitonweb.com >>>>>>>> >>>>>>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik < >>>>>>>> arun.lut...@gmail.com> napisał: >>>>>>>> >>>>>>>> So, actually I am removing the persist for now, because there is >>>>>>>>> significant filtering that happens after calling textFile()... but I >>>>>>>>> will >>>>>>>>> keep that option in mind. >>>>>>>>> >>>>>>>>> I just tried a few different combinations of number of executors, >>>>>>>>> executor memory, and more importantly, number of tasks... *all >>>>>>>>> three times it failed when approximately 75.1% of the tasks were >>>>>>>>> completed >>>>>>>>> (no matter how many tasks resulted from repartitioning the data in >>>>>>>>> textfile(..., N))*. Surely this is a strong clue to something? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` >>>>>>>>>> generates many small objects that lead to very long GC time, causing >>>>>>>>>> the >>>>>>>>>> executor losts, heartbeat not received, and GC overhead limit >>>>>>>>>> exceeded >>>>>>>>>> messages. >>>>>>>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You >>>>>>>>>> can also try `OFF_HEAP` (and use Tachyon). >>>>>>>>>> >>>>>>>>>> Burak >>>>>>>>>> >>>>>>>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra < >>>>>>>>>> arun.lut...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> My program in pseudocode looks like this: >>>>>>>>>>> >>>>>>>>>>> val conf = new SparkConf().setAppName("Test") >>>>>>>>>>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>>>>>>>>>> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >>>>>>>>>>> .set("spark.shuffle.manager","SORT") // preferred setting >>>>>>>>>>> for optimized joins >>>>>>>>>>> .set("spark.shuffle.consolidateFiles","true") // helpful >>>>>>>>>>> for "too many files open" >>>>>>>>>>> .set("spark.mesos.coarse", "true") // helpful for >>>>>>>>>>> MapOutputTracker errors? >>>>>>>>>>> .set("spark.akka.frameSize","500") // helpful when using >>>>>>>>>>> consildateFiles=true >>>>>>>>>>> .set("spark.akka.askTimeout", "30") >>>>>>>>>>> .set("spark.shuffle.compress","false") // >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>>>>> .set("spark.file.transferTo","false") // >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>>>>> .set("spark.core.connection.ack.wait.timeout","600") // >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>>>>> .set("spark.speculation","true") >>>>>>>>>>> .set("spark.worker.timeout","600") // >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>>>>>>> .set("spark.akka.timeout","300") // >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>>>>>>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>>>>>>>>>> .set("spark.driver.maxResultSize","2048") // in response >>>>>>>>>>> to error: Total size of serialized results of 39901 tasks (1024.0 >>>>>>>>>>> MB) is >>>>>>>>>>> bigger than spark.driver.maxResultSize (1024.0 MB) >>>>>>>>>>> .set("spark.serializer", >>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer") >>>>>>>>>>> >>>>>>>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >>>>>>>>>>> .set("spark.kryo.registrationRequired", "true") >>>>>>>>>>> >>>>>>>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel >>>>>>>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...) >>>>>>>>>>> >>>>>>>>>>> val rdd2 = >>>>>>>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>>>>>>>>> -1)...filter(...) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I run the code with: >>>>>>>>>>> --num-executors 500 \ >>>>>>>>>>> --driver-memory 20g \ >>>>>>>>>>> --executor-memory 20g \ >>>>>>>>>>> --executor-cores 32 \ >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'm using kryo serialization on everything, including broadcast >>>>>>>>>>> variables. >>>>>>>>>>> >>>>>>>>>>> Spark creates 145k tasks, and the first stage includes >>>>>>>>>>> everything before groupByKey(). It fails before getting to >>>>>>>>>>> groupByKey. I >>>>>>>>>>> have tried doubling and tripling the number of partitions when >>>>>>>>>>> calling >>>>>>>>>>> textFile, with no success. >>>>>>>>>>> >>>>>>>>>>> Very similar code (trivial changes, to accomodate different >>>>>>>>>>> input) worked on a smaller input (~8TB)... Not that it was easy to >>>>>>>>>>> get that >>>>>>>>>>> working. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Errors vary, here is what I am getting right now: >>>>>>>>>>> >>>>>>>>>>> ERROR SendingConnection: Exception while reading >>>>>>>>>>> SendingConnection ... java.nio.channels.ClosedChannelException >>>>>>>>>>> (^ guessing that is symptom of something else) >>>>>>>>>>> >>>>>>>>>>> WARN BlockManagerMasterActor: Removing BlockManager >>>>>>>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds >>>>>>>>>>> 120000ms >>>>>>>>>>> (^ guessing that is symptom of something else) >>>>>>>>>>> >>>>>>>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) >>>>>>>>>>> shutting down ActorSystem [sparkDriver] >>>>>>>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Other times I will get messages about "executor lost..." about 1 >>>>>>>>>>> message per second, after ~~50k tasks complete, until there are >>>>>>>>>>> almost no >>>>>>>>>>> executors left and progress slows to nothing. >>>>>>>>>>> >>>>>>>>>>> I ran with verbose GC info; I do see failing yarn containers >>>>>>>>>>> that have multiple (like 30) "Full GC" messages but I don't know >>>>>>>>>>> how to >>>>>>>>>>> interpret if that is the problem. Typical Full GC time taken seems >>>>>>>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Suggestions, please? >>>>>>>>>>> >>>>>>>>>>> Huge thanks for useful suggestions, >>>>>>>>>>> Arun >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >