The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already.
I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson <ilike...@gmail.com> wrote: > All stated symptoms are consistent with GC pressure (other nodes timeout > trying to connect because of a long stop-the-world), quite possibly due to > groupByKey. groupByKey is a very expensive operation as it may bring all > the data for a particular partition into memory (in particular, it cannot > spill values for a single key, so if you have a single very skewed key you > can get behavior like this). > > On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc <paul.sz...@gmail.com> > wrote: > >> But groupbykey will repartition according to numer of keys as I >> understand how it works. How do you know that you haven't reached the >> groupbykey phase? Are you using a profiler or do yoi base that assumption >> only on logs? >> >> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <arun.lut...@gmail.com> >> napisał: >> >> A correction to my first post: >>> >>> There is also a repartition right before groupByKey to help avoid >>> too-many-open-files error: >>> >>> >>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() >>> >>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com> >>> wrote: >>> >>>> The job fails before getting to groupByKey. >>>> >>>> I see a lot of timeout errors in the yarn logs, like: >>>> >>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 >>>> attempts >>>> akka.pattern.AskTimeoutException: Timed out >>>> >>>> and >>>> >>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 >>>> attempts >>>> java.util.concurrent.TimeoutException: Futures timed out after [30 >>>> seconds] >>>> >>>> and some of these are followed by: >>>> >>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver >>>> Disassociated [akka.tcp://sparkExecutor@...] -> >>>> [akka.tcp://sparkDriver@...] disassociated! Shutting down. >>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 >>>> in stage 1.0 (TID 336601) >>>> java.io.FileNotFoundException: >>>> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0 >>>> (No such file or directory) >>>> >>>> >>>> >>>> >>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> >>>> wrote: >>>> >>>>> I would first check whether there is any possibility that after doing >>>>> groupbykey one of the groups does not fit in one of the executors' memory. >>>>> >>>>> To back up my theory, instead of doing groupbykey + map try >>>>> reducebykey + mapvalues. >>>>> >>>>> Let me know if that helped. >>>>> >>>>> Pawel Szulc >>>>> http://rabbitonweb.com >>>>> >>>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik < >>>>> arun.lut...@gmail.com> napisał: >>>>> >>>>> So, actually I am removing the persist for now, because there is >>>>>> significant filtering that happens after calling textFile()... but I will >>>>>> keep that option in mind. >>>>>> >>>>>> I just tried a few different combinations of number of executors, >>>>>> executor memory, and more importantly, number of tasks... *all three >>>>>> times it failed when approximately 75.1% of the tasks were completed (no >>>>>> matter how many tasks resulted from repartitioning the data in >>>>>> textfile(..., N))*. Surely this is a strong clue to something? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` >>>>>>> generates many small objects that lead to very long GC time, causing the >>>>>>> executor losts, heartbeat not received, and GC overhead limit exceeded >>>>>>> messages. >>>>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can >>>>>>> also try `OFF_HEAP` (and use Tachyon). >>>>>>> >>>>>>> Burak >>>>>>> >>>>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> My program in pseudocode looks like this: >>>>>>>> >>>>>>>> val conf = new SparkConf().setAppName("Test") >>>>>>>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>>>>>>> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >>>>>>>> .set("spark.shuffle.manager","SORT") // preferred setting for >>>>>>>> optimized joins >>>>>>>> .set("spark.shuffle.consolidateFiles","true") // helpful for >>>>>>>> "too many files open" >>>>>>>> .set("spark.mesos.coarse", "true") // helpful for >>>>>>>> MapOutputTracker errors? >>>>>>>> .set("spark.akka.frameSize","500") // helpful when using >>>>>>>> consildateFiles=true >>>>>>>> .set("spark.akka.askTimeout", "30") >>>>>>>> .set("spark.shuffle.compress","false") // >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>> .set("spark.file.transferTo","false") // >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>> .set("spark.core.connection.ack.wait.timeout","600") // >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>>>>> .set("spark.speculation","true") >>>>>>>> .set("spark.worker.timeout","600") // >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>>>> .set("spark.akka.timeout","300") // >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>>>>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>>>>>>> .set("spark.driver.maxResultSize","2048") // in response to >>>>>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is >>>>>>>> bigger than spark.driver.maxResultSize (1024.0 MB) >>>>>>>> .set("spark.serializer", >>>>>>>> "org.apache.spark.serializer.KryoSerializer") >>>>>>>> >>>>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >>>>>>>> .set("spark.kryo.registrationRequired", "true") >>>>>>>> >>>>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel >>>>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...) >>>>>>>> >>>>>>>> val rdd2 = >>>>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>>>>>> -1)...filter(...) >>>>>>>> >>>>>>>> >>>>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>>>>>> >>>>>>>> >>>>>>>> I run the code with: >>>>>>>> --num-executors 500 \ >>>>>>>> --driver-memory 20g \ >>>>>>>> --executor-memory 20g \ >>>>>>>> --executor-cores 32 \ >>>>>>>> >>>>>>>> >>>>>>>> I'm using kryo serialization on everything, including broadcast >>>>>>>> variables. >>>>>>>> >>>>>>>> Spark creates 145k tasks, and the first stage includes everything >>>>>>>> before groupByKey(). It fails before getting to groupByKey. I have >>>>>>>> tried >>>>>>>> doubling and tripling the number of partitions when calling textFile, >>>>>>>> with >>>>>>>> no success. >>>>>>>> >>>>>>>> Very similar code (trivial changes, to accomodate different input) >>>>>>>> worked on a smaller input (~8TB)... Not that it was easy to get that >>>>>>>> working. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Errors vary, here is what I am getting right now: >>>>>>>> >>>>>>>> ERROR SendingConnection: Exception while reading SendingConnection >>>>>>>> ... java.nio.channels.ClosedChannelException >>>>>>>> (^ guessing that is symptom of something else) >>>>>>>> >>>>>>>> WARN BlockManagerMasterActor: Removing BlockManager >>>>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds >>>>>>>> 120000ms >>>>>>>> (^ guessing that is symptom of something else) >>>>>>>> >>>>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) >>>>>>>> shutting down ActorSystem [sparkDriver] >>>>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Other times I will get messages about "executor lost..." about 1 >>>>>>>> message per second, after ~~50k tasks complete, until there are almost >>>>>>>> no >>>>>>>> executors left and progress slows to nothing. >>>>>>>> >>>>>>>> I ran with verbose GC info; I do see failing yarn containers that >>>>>>>> have multiple (like 30) "Full GC" messages but I don't know how to >>>>>>>> interpret if that is the problem. Typical Full GC time taken seems >>>>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Suggestions, please? >>>>>>>> >>>>>>>> Huge thanks for useful suggestions, >>>>>>>> Arun >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>> >