Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
015 15:42 > *To:* Arun Luthra > *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org > *Subject:* Re: Problem getting program to run on 15TB input > > > > Sometimes a large number of partitions leads to memory problems. > > Something like > > > &g

RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
April 2015 15:42 To: Arun Luthra Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org Subject: Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... va

Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra wrote: > Everything works smoothly if I do the 99%-removal fi

Re: Problem getting program to run on 15TB input

2015-03-02 Thread Arun Luthra
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra wrote: > I tried a sh

Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah => (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply coun

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spil

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik napisał: > A correction

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra wrote: > The jo

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com s

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all thre

Re: Problem getting program to run on 15TB input

2015-02-27 Thread Burak Yavuz
Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try

Problem getting program to run on 15TB input

2015-02-27 Thread Arun Luthra
My program in pseudocode looks like this: val conf = new SparkConf().setAppName("Test") .set("spark.storage.memoryFraction","0.2") // default 0.6 .set("spark.shuffle.memoryFraction","0.12") // default 0.2 .set("spark.shuffle.manager","SORT") // preferred setting for optimized