But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs?
sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <arun.lut...@gmail.com> napisał: > A correction to my first post: > > There is also a repartition right before groupByKey to help avoid > too-many-open-files error: > > > rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() > > On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> The job fails before getting to groupByKey. >> >> I see a lot of timeout errors in the yarn logs, like: >> >> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts >> akka.pattern.AskTimeoutException: Timed out >> >> and >> >> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts >> java.util.concurrent.TimeoutException: Futures timed out after [30 >> seconds] >> >> and some of these are followed by: >> >> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver >> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...] >> disassociated! Shutting down. >> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in >> stage 1.0 (TID 336601) >> java.io.FileNotFoundException: >> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0 >> (No such file or directory) >> >> >> >> >> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> >> wrote: >> >>> I would first check whether there is any possibility that after doing >>> groupbykey one of the groups does not fit in one of the executors' memory. >>> >>> To back up my theory, instead of doing groupbykey + map try reducebykey >>> + mapvalues. >>> >>> Let me know if that helped. >>> >>> Pawel Szulc >>> http://rabbitonweb.com >>> >>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com> >>> napisał: >>> >>> So, actually I am removing the persist for now, because there is >>>> significant filtering that happens after calling textFile()... but I will >>>> keep that option in mind. >>>> >>>> I just tried a few different combinations of number of executors, >>>> executor memory, and more importantly, number of tasks... *all three >>>> times it failed when approximately 75.1% of the tasks were completed (no >>>> matter how many tasks resulted from repartitioning the data in >>>> textfile(..., N))*. Surely this is a strong clue to something? >>>> >>>> >>>> >>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` >>>>> generates many small objects that lead to very long GC time, causing the >>>>> executor losts, heartbeat not received, and GC overhead limit exceeded >>>>> messages. >>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can >>>>> also try `OFF_HEAP` (and use Tachyon). >>>>> >>>>> Burak >>>>> >>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com> >>>>> wrote: >>>>> >>>>>> My program in pseudocode looks like this: >>>>>> >>>>>> val conf = new SparkConf().setAppName("Test") >>>>>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>>>>> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >>>>>> .set("spark.shuffle.manager","SORT") // preferred setting for >>>>>> optimized joins >>>>>> .set("spark.shuffle.consolidateFiles","true") // helpful for >>>>>> "too many files open" >>>>>> .set("spark.mesos.coarse", "true") // helpful for >>>>>> MapOutputTracker errors? >>>>>> .set("spark.akka.frameSize","500") // helpful when using >>>>>> consildateFiles=true >>>>>> .set("spark.akka.askTimeout", "30") >>>>>> .set("spark.shuffle.compress","false") // >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>> .set("spark.file.transferTo","false") // >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>> .set("spark.core.connection.ack.wait.timeout","600") // >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>> .set("spark.speculation","true") >>>>>> .set("spark.worker.timeout","600") // >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>> .set("spark.akka.timeout","300") // >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>>>>> .set("spark.driver.maxResultSize","2048") // in response to >>>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is >>>>>> bigger than spark.driver.maxResultSize (1024.0 MB) >>>>>> .set("spark.serializer", >>>>>> "org.apache.spark.serializer.KryoSerializer") >>>>>> >>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >>>>>> .set("spark.kryo.registrationRequired", "true") >>>>>> >>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel >>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...) >>>>>> >>>>>> val rdd2 = >>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>>>> -1)...filter(...) >>>>>> >>>>>> >>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>>>> >>>>>> >>>>>> I run the code with: >>>>>> --num-executors 500 \ >>>>>> --driver-memory 20g \ >>>>>> --executor-memory 20g \ >>>>>> --executor-cores 32 \ >>>>>> >>>>>> >>>>>> I'm using kryo serialization on everything, including broadcast >>>>>> variables. >>>>>> >>>>>> Spark creates 145k tasks, and the first stage includes everything >>>>>> before groupByKey(). It fails before getting to groupByKey. I have tried >>>>>> doubling and tripling the number of partitions when calling textFile, >>>>>> with >>>>>> no success. >>>>>> >>>>>> Very similar code (trivial changes, to accomodate different input) >>>>>> worked on a smaller input (~8TB)... Not that it was easy to get that >>>>>> working. >>>>>> >>>>>> >>>>>> >>>>>> Errors vary, here is what I am getting right now: >>>>>> >>>>>> ERROR SendingConnection: Exception while reading SendingConnection >>>>>> ... java.nio.channels.ClosedChannelException >>>>>> (^ guessing that is symptom of something else) >>>>>> >>>>>> WARN BlockManagerMasterActor: Removing BlockManager >>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms >>>>>> (^ guessing that is symptom of something else) >>>>>> >>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) >>>>>> shutting down ActorSystem [sparkDriver] >>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >>>>>> >>>>>> >>>>>> >>>>>> Other times I will get messages about "executor lost..." about 1 >>>>>> message per second, after ~~50k tasks complete, until there are almost no >>>>>> executors left and progress slows to nothing. >>>>>> >>>>>> I ran with verbose GC info; I do see failing yarn containers that >>>>>> have multiple (like 30) "Full GC" messages but I don't know how to >>>>>> interpret if that is the problem. Typical Full GC time taken seems >>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs] >>>>>> >>>>>> >>>>>> >>>>>> Suggestions, please? >>>>>> >>>>>> Huge thanks for useful suggestions, >>>>>> Arun >>>>>> >>>>> >>>>> >>>> >> >