Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
I found that the problem was due to garbage collection in filter(). Using
Hive to do the filter solved the problem.

A lot of other problems went away when I upgraded to Spark 1.2.0, which
compresses various task overhead data (HighlyCompressedMapStatus etc.).

It has been running very very smoothly with these two changes.

I'm fairly sure that I tried coalesce(), it resulted into tasks that were
too big, the code has evolved too much to easily double check it now.

On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik  wrote:

>  Very interesting and relevant thread for production level usage of spark.
>
>
>
> @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?
>
>
>
> Thanks,
>
>
>
> Kapil Malik | kma...@adobe.com | 33430 / 8800836581
>
>
>
> *From:* Daniel Mahler [mailto:dmah...@gmail.com]
> *Sent:* 13 April 2015 15:42
> *To:* Arun Luthra
> *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
> *Subject:* Re: Problem getting program to run on 15TB input
>
>
>
> Sometimes a large number of partitions leads to memory problems.
>
> Something like
>
>
>
> val rdd1 = sc.textFile(file1).coalesce(500). ...
>
> val rdd2 = sc.textFile(file2).coalesce(500). ...
>
>
>
> may help.
>
>
>
>
>
> On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra  wrote:
>
> Everything works smoothly if I do the 99%-removal filter in Hive first.
> So, all the baggage from garbage collection was breaking it.
>
>
>
> Is there a way to filter() out 99% of the data without having to garbage
> collect 99% of the RDD?
>
>
>
> On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra  wrote:
>
> I tried a shorter simper version of the program, with just 1 RDD,
>  essentially it is:
>
>
>
> sc.textFile(..., N).map().filter().map( blah => (id,
> 1L)).reduceByKey().saveAsTextFile(...)
>
>
>
> Here is a typical GC log trace from one of the yarn container logs:
>
>
>
> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
> real=0.02 secs]
>
> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
> real=0.04 secs]
>
> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
> real=0.08 secs]
>
> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
> real=0.02 secs]
>
> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
> real=0.02 secs]
>
> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
> real=0.02 secs]
>
>
>
> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>
>
>
> Question: The filter() is removing 99% of the data. Does this 99% of the
> data get GC'ed?
>
>
>
> Now, I was able to finally get to reduceByKey() by reducing the number of
> executor-cores (to 2), based on suggestions at
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
> . This makes everything before reduceByKey() run pretty smoothly.
>
>
>
> I ran this with more executor-memory and less executors (most important
> thing was fewer executor-cores):
>
>
>
> --num-executors 150 \
>
> --driver-memory 15g \
>
> --executor-memory 110g \
>
> --executor-cores 32 \
>
>
>
> But then, reduceByKey() fails with:
>
> java.lang.OutOfMemoryError: Java heap space
>
>
>
>
>
>
>
>
>
> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
> wrote:
>
> The Spark UI names the line number and name of the operation (repartition
> in this case) that it is performing. Only if this information is wrong
> (just a possibility), could it have started groupByKey already.
>
>
>
> I will try to analyze the amount of skew in the data by using reduceByKey
> (or simply countByKey) which is relatively inexpensive. For the purposes of
> this algorithm I can simply log and remove keys with huge counts, before
> doing groupByKey.
>
>
>
> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
> wrote:
>
> All stated symptoms are consistent with GC pressure (other nodes timeout
> trying to connect because of a long stop-the-world), quite possibly due to
> groupByKey. groupByKey is a very expensive operation as it m

RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
Very interesting and relevant thread for production level usage of spark.

@Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?

Thanks,

Kapil Malik | kma...@adobe.com<mailto:kma...@adobe.com> | 33430 / 8800836581

From: Daniel Mahler [mailto:dmah...@gmail.com]
Sent: 13 April 2015 15:42
To: Arun Luthra
Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
Subject: Re: Problem getting program to run on 15TB input

Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra 
mailto:arun.lut...@gmail.com>> wrote:
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all 
the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage 
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra 
mailto:arun.lut...@gmail.com>> wrote:
I tried a shorter simper version of the program, with just 1 RDD,  essentially 
it is:

sc.textFile(..., N).map().filter().map( blah => (id, 
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)] 
9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, 
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)] 
9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, 
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)] 
9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, 
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)] 
9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, 
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)] 
9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, 
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)] 
9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, 
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the data 
get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of 
executor-cores (to 2), based on suggestions at 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important thing 
was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
mailto:arun.lut...@gmail.com>> wrote:
The Spark UI names the line number and name of the operation (repartition in 
this case) that it is performing. Only if this information is wrong (just a 
possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey (or 
simply countByKey) which is relatively inexpensive. For the purposes of this 
algorithm I can simply log and remove keys with huge counts, before doing 
groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
mailto:ilike...@gmail.com>> wrote:
All stated symptoms are consistent with GC pressure (other nodes timeout trying 
to connect because of a long stop-the-world), quite possibly due to groupByKey. 
groupByKey is a very expensive operation as it may bring all the data for a 
particular partition into memory (in particular, it cannot spill values for a 
single key, so if you have a single very skewed key you can get behavior like 
this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
mailto:paul.sz...@gmail.com>> wrote:

But groupbykey will repartition according to numer of keys as I understand how 
it works. How do you know that you haven't reached the groupbykey phase? Are 
you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
mailto:arun.lut...@gmail.com>> napisał:

A correction to my first post:

There is also a repartition right before groupByKey to help avoid 
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
mailto:arun.lut...@gmail.com>> wrote:
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts

Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra  wrote:

> Everything works smoothly if I do the 99%-removal filter in Hive first.
> So, all the baggage from garbage collection was breaking it.
>
> Is there a way to filter() out 99% of the data without having to garbage
> collect 99% of the RDD?
>
> On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra  wrote:
>
>> I tried a shorter simper version of the program, with just 1 RDD,
>>  essentially it is:
>>
>> sc.textFile(..., N).map().filter().map( blah => (id,
>> 1L)).reduceByKey().saveAsTextFile(...)
>>
>> Here is a typical GC log trace from one of the yarn container logs:
>>
>> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
>> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
>> real=0.02 secs]
>> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
>> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
>> real=0.04 secs]
>> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
>> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
>> real=0.08 secs]
>> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
>> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
>> real=0.02 secs]
>> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
>> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
>> real=0.02 secs]
>> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
>> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
>> real=0.02 secs]
>>
>> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>>
>> Question: The filter() is removing 99% of the data. Does this 99% of the
>> data get GC'ed?
>>
>> Now, I was able to finally get to reduceByKey() by reducing the number of
>> executor-cores (to 2), based on suggestions at
>> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
>> . This makes everything before reduceByKey() run pretty smoothly.
>>
>> I ran this with more executor-memory and less executors (most important
>> thing was fewer executor-cores):
>>
>> --num-executors 150 \
>> --driver-memory 15g \
>> --executor-memory 110g \
>> --executor-cores 32 \
>>
>> But then, reduceByKey() fails with:
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
>> wrote:
>>
>>> The Spark UI names the line number and name of the operation
>>> (repartition in this case) that it is performing. Only if this information
>>> is wrong (just a possibility), could it have started groupByKey already.
>>>
>>> I will try to analyze the amount of skew in the data by using
>>> reduceByKey (or simply countByKey) which is relatively inexpensive. For the
>>> purposes of this algorithm I can simply log and remove keys with huge
>>> counts, before doing groupByKey.
>>>
>>> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
>>> wrote:
>>>
 All stated symptoms are consistent with GC pressure (other nodes
 timeout trying to connect because of a long stop-the-world), quite possibly
 due to groupByKey. groupByKey is a very expensive operation as it may bring
 all the data for a particular partition into memory (in particular, it
 cannot spill values for a single key, so if you have a single very skewed
 key you can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
 wrote:

> But groupbykey will repartition according to numer of keys as I
> understand how it works. How do you know that you haven't reached the
> groupbykey phase? Are you using a profiler or do yoi base that assumption
> only on logs?
>
> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <
> arun.lut...@gmail.com> napisał:
>
> A correction to my first post:
>>
>> There is also a repartition right before groupByKey to help avoid
>> too-many-open-files error:
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>> wrote:
>>
>>> The job fails before getting to groupByKey.
>>>
>>> I see a lot of timeout errors in the yarn logs, like:
>>>
>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>> attempts
>>> akka.pattern.AskTimeoutException: Timed out
>>>
>>> and
>>>
>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>> attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>>
>>> and some of these are followed by:
>>>
>>> 15/02/28 12:48:02 ERROR execut

Re: Problem getting program to run on 15TB input

2015-03-02 Thread Arun Luthra
Everything works smoothly if I do the 99%-removal filter in Hive first. So,
all the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra  wrote:

> I tried a shorter simper version of the program, with just 1 RDD,
>  essentially it is:
>
> sc.textFile(..., N).map().filter().map( blah => (id,
> 1L)).reduceByKey().saveAsTextFile(...)
>
> Here is a typical GC log trace from one of the yarn container logs:
>
> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
> real=0.02 secs]
> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
> real=0.04 secs]
> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
> real=0.08 secs]
> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
> real=0.02 secs]
> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
> real=0.02 secs]
> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
> real=0.02 secs]
>
> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>
> Question: The filter() is removing 99% of the data. Does this 99% of the
> data get GC'ed?
>
> Now, I was able to finally get to reduceByKey() by reducing the number of
> executor-cores (to 2), based on suggestions at
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
> . This makes everything before reduceByKey() run pretty smoothly.
>
> I ran this with more executor-memory and less executors (most important
> thing was fewer executor-cores):
>
> --num-executors 150 \
> --driver-memory 15g \
> --executor-memory 110g \
> --executor-cores 32 \
>
> But then, reduceByKey() fails with:
>
> java.lang.OutOfMemoryError: Java heap space
>
>
>
>
> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
> wrote:
>
>> The Spark UI names the line number and name of the operation (repartition
>> in this case) that it is performing. Only if this information is wrong
>> (just a possibility), could it have started groupByKey already.
>>
>> I will try to analyze the amount of skew in the data by using reduceByKey
>> (or simply countByKey) which is relatively inexpensive. For the purposes of
>> this algorithm I can simply log and remove keys with huge counts, before
>> doing groupByKey.
>>
>> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
>> wrote:
>>
>>> All stated symptoms are consistent with GC pressure (other nodes timeout
>>> trying to connect because of a long stop-the-world), quite possibly due to
>>> groupByKey. groupByKey is a very expensive operation as it may bring all
>>> the data for a particular partition into memory (in particular, it cannot
>>> spill values for a single key, so if you have a single very skewed key you
>>> can get behavior like this).
>>>
>>> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
>>> wrote:
>>>
 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <
 arun.lut...@gmail.com> napisał:

 A correction to my first post:
>
> There is also a repartition right before groupByKey to help avoid
> too-many-open-files error:
>
>
> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
> wrote:
>
>> The job fails before getting to groupByKey.
>>
>> I see a lot of timeout errors in the yarn logs, like:
>>
>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>> attempts
>> akka.pattern.AskTimeoutException: Timed out
>>
>> and
>>
>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>> attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>
>> and some of these are followed by:
>>
>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkExecutor@...] ->
>> [akka.tcp://sparkDriver@...] disassociated! Shutting down.
>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
>> in stage 1.0 (TID 336601)
>> java.io.FileNotFoundException:
>> /hadoop/yarn/local//spark-local-201502281234

Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD,
 essentially it is:

sc.textFile(..., N).map().filter().map( blah => (id,
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the
data get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of
executor-cores (to 2), based on suggestions at
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
. This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important
thing was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra  wrote:

> The Spark UI names the line number and name of the operation (repartition
> in this case) that it is performing. Only if this information is wrong
> (just a possibility), could it have started groupByKey already.
>
> I will try to analyze the amount of skew in the data by using reduceByKey
> (or simply countByKey) which is relatively inexpensive. For the purposes of
> this algorithm I can simply log and remove keys with huge counts, before
> doing groupByKey.
>
> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
> wrote:
>
>> All stated symptoms are consistent with GC pressure (other nodes timeout
>> trying to connect because of a long stop-the-world), quite possibly due to
>> groupByKey. groupByKey is a very expensive operation as it may bring all
>> the data for a particular partition into memory (in particular, it cannot
>> spill values for a single key, so if you have a single very skewed key you
>> can get behavior like this).
>>
>> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
>> wrote:
>>
>>> But groupbykey will repartition according to numer of keys as I
>>> understand how it works. How do you know that you haven't reached the
>>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>>> only on logs?
>>>
>>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
>>> napisał:
>>>
>>> A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
 wrote:

> The job fails before getting to groupByKey.
>
> I see a lot of timeout errors in the yarn logs, like:
>
> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
> attempts
> akka.pattern.AskTimeoutException: Timed out
>
> and
>
> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
> attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30
> seconds]
>
> and some of these are followed by:
>
> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@...] ->
> [akka.tcp://sparkDriver@...] disassociated! Shutting down.
> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
> in stage 1.0 (TID 336601)
> java.io.FileNotFoundException:
> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
> (No such file or directory)
>
>
>
>
> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
> wrote:
>
>> I would first check whether  there is any possibility that after
>> doing groupbykey one of the groups does not fit in one of the executors'
>> memory.
>>
>> To back up my theory, instead of doing groupbykey + map try
>> reducebykey + mapvalues.
>>
>> Let me know if that hel

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson  wrote:

> All stated symptoms are consistent with GC pressure (other nodes timeout
> trying to connect because of a long stop-the-world), quite possibly due to
> groupByKey. groupByKey is a very expensive operation as it may bring all
> the data for a particular partition into memory (in particular, it cannot
> spill values for a single key, so if you have a single very skewed key you
> can get behavior like this).
>
> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
> wrote:
>
>> But groupbykey will repartition according to numer of keys as I
>> understand how it works. How do you know that you haven't reached the
>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>> only on logs?
>>
>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
>> napisał:
>>
>> A correction to my first post:
>>>
>>> There is also a repartition right before groupByKey to help avoid
>>> too-many-open-files error:
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>>> wrote:
>>>
 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] ->
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
 wrote:

> I would first check whether  there is any possibility that after doing
> groupbykey one of the groups does not fit in one of the executors' memory.
>
> To back up my theory, instead of doing groupbykey + map try
> reducebykey + mapvalues.
>
> Let me know if that helped.
>
> Pawel Szulc
> http://rabbitonweb.com
>
> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
> arun.lut...@gmail.com> napisał:
>
> So, actually I am removing the persist for now, because there is
>> significant filtering that happens after calling textFile()... but I will
>> keep that option in mind.
>>
>> I just tried a few different combinations of number of executors,
>> executor memory, and more importantly, number of tasks... *all three
>> times it failed when approximately 75.1% of the tasks were completed (no
>> matter how many tasks resulted from repartitioning the data in
>> textfile(..., N))*. Surely this is a strong clue to something?
>>
>>
>>
>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz 
>> wrote:
>>
>>> Hi,
>>>
>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>> generates many small objects that lead to very long GC time, causing the
>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>> messages.
>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>> also try `OFF_HEAP` (and use Tachyon).
>>>
>>> Burak
>>>
>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra >> > wrote:
>>>
 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName("Test")
   .set("spark.storage.memoryFraction","0.2") // default 0.6
   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
   .set("spark.shuffle.manager","SORT") // preferred setting for
 optimized joins
   .set("spark.shuffle.consolidateFiles","true") // helpful for
 "too many files open"
   .set("spark.mesos.coarse", "true") // helpful for
 MapOutputTracker errors?
   .set("spark.akka.frameSize","500") // helpful when using
 consildateFiles=true
   .set("spark.akka.askTimeout", "3

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
spill values for a single key, so if you have a single very skewed key you
can get behavior like this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc  wrote:

> But groupbykey will repartition according to numer of keys as I understand
> how it works. How do you know that you haven't reached the groupbykey
> phase? Are you using a profiler or do yoi base that assumption only on logs?
>
> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
> napisał:
>
> A correction to my first post:
>>
>> There is also a repartition right before groupByKey to help avoid
>> too-many-open-files error:
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>> wrote:
>>
>>> The job fails before getting to groupByKey.
>>>
>>> I see a lot of timeout errors in the yarn logs, like:
>>>
>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>> attempts
>>> akka.pattern.AskTimeoutException: Timed out
>>>
>>> and
>>>
>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>> attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>>
>>> and some of these are followed by:
>>>
>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>>> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
>>> disassociated! Shutting down.
>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
>>> stage 1.0 (TID 336601)
>>> java.io.FileNotFoundException:
>>> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>>> (No such file or directory)
>>>
>>>
>>>
>>>
>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
>>> wrote:
>>>
 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
 arun.lut...@gmail.com> napisał:

 So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
> I just tried a few different combinations of number of executors,
> executor memory, and more importantly, number of tasks... *all three
> times it failed when approximately 75.1% of the tasks were completed (no
> matter how many tasks resulted from repartitioning the data in
> textfile(..., N))*. Surely this is a strong clue to something?
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>
>> Hi,
>>
>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>> generates many small objects that lead to very long GC time, causing the
>> executor losts, heartbeat not received, and GC overhead limit exceeded
>> messages.
>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>> also try `OFF_HEAP` (and use Tachyon).
>>
>> Burak
>>
>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>> wrote:
>>
>>> My program in pseudocode looks like this:
>>>
>>> val conf = new SparkConf().setAppName("Test")
>>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>   .set("spark.shuffle.consolidateFiles","true") // helpful for
>>> "too many files open"
>>>   .set("spark.mesos.coarse", "true") // helpful for
>>> MapOutputTracker errors?
>>>   .set("spark.akka.frameSize","500") // helpful when using
>>> consildateFiles=true
>>>   .set("spark.akka.askTimeout", "30")
>>>   .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.speculation","true")
>>>   .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
napisał:

> A correction to my first post:
>
> There is also a repartition right before groupByKey to help avoid
> too-many-open-files error:
>
>
> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
> wrote:
>
>> The job fails before getting to groupByKey.
>>
>> I see a lot of timeout errors in the yarn logs, like:
>>
>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
>> akka.pattern.AskTimeoutException: Timed out
>>
>> and
>>
>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>
>> and some of these are followed by:
>>
>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
>> disassociated! Shutting down.
>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
>> stage 1.0 (TID 336601)
>> java.io.FileNotFoundException:
>> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>> (No such file or directory)
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
>> wrote:
>>
>>> I would first check whether  there is any possibility that after doing
>>> groupbykey one of the groups does not fit in one of the executors' memory.
>>>
>>> To back up my theory, instead of doing groupbykey + map try reducebykey
>>> + mapvalues.
>>>
>>> Let me know if that helped.
>>>
>>> Pawel Szulc
>>> http://rabbitonweb.com
>>>
>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
>>> napisał:
>>>
>>> So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:

> Hi,
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
> generates many small objects that lead to very long GC time, causing the
> executor losts, heartbeat not received, and GC overhead limit exceeded
> messages.
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
> also try `OFF_HEAP` (and use Tachyon).
>
> Burak
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
> wrote:
>
>> My program in pseudocode looks like this:
>>
>> val conf = new SparkConf().setAppName("Test")
>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>>   .set("spark.shuffle.consolidateFiles","true") // helpful for
>> "too many files open"
>>   .set("spark.mesos.coarse", "true") // helpful for
>> MapOutputTracker errors?
>>   .set("spark.akka.frameSize","500") // helpful when using
>> consildateFiles=true
>>   .set("spark.akka.askTimeout", "30")
>>   .set("spark.shuffle.compress","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.file.transferTo","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.core.connection.ack.wait.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.speculation","true")
>>   .set("spark.worker.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.akka.timeout","300") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>   .set("spark.driver.maxResultSize","2048") // in response to
>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>   .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post:

There is also a repartition right before groupByKey to help avoid
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra  wrote:

> The job fails before getting to groupByKey.
>
> I see a lot of timeout errors in the yarn logs, like:
>
> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
> akka.pattern.AskTimeoutException: Timed out
>
> and
>
> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
> and some of these are followed by:
>
> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
> disassociated! Shutting down.
> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
> stage 1.0 (TID 336601)
> java.io.FileNotFoundException:
> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
> (No such file or directory)
>
>
>
>
> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc  wrote:
>
>> I would first check whether  there is any possibility that after doing
>> groupbykey one of the groups does not fit in one of the executors' memory.
>>
>> To back up my theory, instead of doing groupbykey + map try reducebykey +
>> mapvalues.
>>
>> Let me know if that helped.
>>
>> Pawel Szulc
>> http://rabbitonweb.com
>>
>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
>> napisał:
>>
>> So, actually I am removing the persist for now, because there is
>>> significant filtering that happens after calling textFile()... but I will
>>> keep that option in mind.
>>>
>>> I just tried a few different combinations of number of executors,
>>> executor memory, and more importantly, number of tasks... *all three
>>> times it failed when approximately 75.1% of the tasks were completed (no
>>> matter how many tasks resulted from repartitioning the data in
>>> textfile(..., N))*. Surely this is a strong clue to something?
>>>
>>>
>>>
>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>>>
 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
 wrote:

> My program in pseudocode looks like this:
>
> val conf = new SparkConf().setAppName("Test")
>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>   .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>   .set("spark.shuffle.consolidateFiles","true") // helpful for
> "too many files open"
>   .set("spark.mesos.coarse", "true") // helpful for
> MapOutputTracker errors?
>   .set("spark.akka.frameSize","500") // helpful when using
> consildateFiles=true
>   .set("spark.akka.askTimeout", "30")
>   .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.speculation","true")
>   .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>   .set("spark.driver.maxResultSize","2048") // in response to
> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
> bigger than spark.driver.maxResultSize (1024.0 MB)
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>   .set("spark.kryo.registrationRequired", "true")
>
> val rdd1 = sc.textFile(file1).persist(StorageLevel
> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...)
>
> val rdd2 =
> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).sav

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out

and

15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

and some of these are followed by:

15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
disassociated! Shutting down.
15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
stage 1.0 (TID 336601)
java.io.FileNotFoundException:
/hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
(No such file or directory)




On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc  wrote:

> I would first check whether  there is any possibility that after doing
> groupbykey one of the groups does not fit in one of the executors' memory.
>
> To back up my theory, instead of doing groupbykey + map try reducebykey +
> mapvalues.
>
> Let me know if that helped.
>
> Pawel Szulc
> http://rabbitonweb.com
>
> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
> napisał:
>
> So, actually I am removing the persist for now, because there is
>> significant filtering that happens after calling textFile()... but I will
>> keep that option in mind.
>>
>> I just tried a few different combinations of number of executors,
>> executor memory, and more importantly, number of tasks... *all three
>> times it failed when approximately 75.1% of the tasks were completed (no
>> matter how many tasks resulted from repartitioning the data in
>> textfile(..., N))*. Surely this is a strong clue to something?
>>
>>
>>
>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>>
>>> Hi,
>>>
>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>> generates many small objects that lead to very long GC time, causing the
>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>> messages.
>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>> also try `OFF_HEAP` (and use Tachyon).
>>>
>>> Burak
>>>
>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>>> wrote:
>>>
 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName("Test")
   .set("spark.storage.memoryFraction","0.2") // default 0.6
   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
   .set("spark.shuffle.manager","SORT") // preferred setting for
 optimized joins
   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
 many files open"
   .set("spark.mesos.coarse", "true") // helpful for
 MapOutputTracker errors?
   .set("spark.akka.frameSize","500") // helpful when using
 consildateFiles=true
   .set("spark.akka.askTimeout", "30")
   .set("spark.shuffle.compress","false") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.file.transferTo","false") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.core.connection.ack.wait.timeout","600") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.speculation","true")
   .set("spark.worker.timeout","600") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set("spark.akka.timeout","300") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
   .set("spark.driver.maxResultSize","2048") // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set("spark.serializer",
 "org.apache.spark.serializer.KryoSerializer")

 .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
   .set("spark.kryo.registrationRequired", "true")

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have tried

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
napisał:

> So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
> I just tried a few different combinations of number of executors, executor
> memory, and more importantly, number of tasks... *all three times it
> failed when approximately 75.1% of the tasks were completed (no matter how
> many tasks resulted from repartitioning the data in textfile(..., N))*.
> Surely this is a strong clue to something?
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>
>> Hi,
>>
>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>> generates many small objects that lead to very long GC time, causing the
>> executor losts, heartbeat not received, and GC overhead limit exceeded
>> messages.
>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
>> try `OFF_HEAP` (and use Tachyon).
>>
>> Burak
>>
>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>> wrote:
>>
>>> My program in pseudocode looks like this:
>>>
>>> val conf = new SparkConf().setAppName("Test")
>>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>>> many files open"
>>>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>>> errors?
>>>   .set("spark.akka.frameSize","500") // helpful when using
>>> consildateFiles=true
>>>   .set("spark.akka.askTimeout", "30")
>>>   .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.speculation","true")
>>>   .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>   .set("spark.akka.timeout","300") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>>   .set("spark.driver.maxResultSize","2048") // in response to error:
>>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>   .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>   .set("spark.kryo.registrationRequired", "true")
>>>
>>> val rdd1 = 
>>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>> val rdd2 =
>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>>
>>> I run the code with:
>>>   --num-executors 500 \
>>>   --driver-memory 20g \
>>>   --executor-memory 20g \
>>>   --executor-cores 32 \
>>>
>>>
>>> I'm using kryo serialization on everything, including broadcast
>>> variables.
>>>
>>> Spark creates 145k tasks, and the first stage includes everything before
>>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>>> and tripling the number of partitions when calling textFile, with no
>>> success.
>>>
>>> Very similar code (trivial changes, to accomodate different input)
>>> worked on a smaller input (~8TB)... Not that it was easy to get that
>>> working.
>>>
>>>
>>>
>>> Errors vary, here is what I am getting right now:
>>>
>>> ERROR SendingConnection: Exception while reading SendingConnection
>>> ... java.nio.channels.ClosedChannelException
>>> (^ guessing that is symptom of something else)
>>>
>>> WARN BlockManagerMasterActor: Removing BlockManager
>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
>>> (^ guessing that is symptom of something else)
>>>
>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>>> down ActorSystem [sparkDriver]
>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>
>>>
>>>
>>> Other times I will get messages about "executor lost..." about 1 message
>>> per second, after ~~50k tasks complete, unti

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:

> Hi,
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
> many small objects that lead to very long GC time, causing the executor
> losts, heartbeat not received, and GC overhead limit exceeded messages.
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
> try `OFF_HEAP` (and use Tachyon).
>
> Burak
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
> wrote:
>
>> My program in pseudocode looks like this:
>>
>> val conf = new SparkConf().setAppName("Test")
>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>> many files open"
>>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>> errors?
>>   .set("spark.akka.frameSize","500") // helpful when using
>> consildateFiles=true
>>   .set("spark.akka.askTimeout", "30")
>>   .set("spark.shuffle.compress","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.file.transferTo","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.core.connection.ack.wait.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.speculation","true")
>>   .set("spark.worker.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.akka.timeout","300") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>   .set("spark.driver.maxResultSize","2048") // in response to error:
>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>> spark.driver.maxResultSize (1024.0 MB)
>>   .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>   .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>   .set("spark.kryo.registrationRequired", "true")
>>
>> val rdd1 = 
>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>> val rdd2 =
>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>>
>> I run the code with:
>>   --num-executors 500 \
>>   --driver-memory 20g \
>>   --executor-memory 20g \
>>   --executor-cores 32 \
>>
>>
>> I'm using kryo serialization on everything, including broadcast
>> variables.
>>
>> Spark creates 145k tasks, and the first stage includes everything before
>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>> and tripling the number of partitions when calling textFile, with no
>> success.
>>
>> Very similar code (trivial changes, to accomodate different input) worked
>> on a smaller input (~8TB)... Not that it was easy to get that working.
>>
>>
>>
>> Errors vary, here is what I am getting right now:
>>
>> ERROR SendingConnection: Exception while reading SendingConnection
>> ... java.nio.channels.ClosedChannelException
>> (^ guessing that is symptom of something else)
>>
>> WARN BlockManagerMasterActor: Removing BlockManager
>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
>> (^ guessing that is symptom of something else)
>>
>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>> down ActorSystem [sparkDriver]
>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>>
>>
>> Other times I will get messages about "executor lost..." about 1 message
>> per second, after ~~50k tasks complete, until there are almost no executors
>> left and progress slows to nothing.
>>
>> I ran with verbose GC info; I do see failing yarn containers that have
>> multiple (like 30) "Full GC" messages but I don't know how to interpret if
>> that is the problem. Typical Full GC time taken seems ok: [Times:
>> user=23.30 sys=0.06, real=1.94 secs]
>>
>>
>>
>> Suggestions, please?
>>
>> Huge thanks for useful suggestions,
>> Arun
>>
>
>


Re: Problem getting program to run on 15TB input

2015-02-27 Thread Burak Yavuz
Hi,

Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
many small objects that lead to very long GC time, causing the executor
losts, heartbeat not received, and GC overhead limit exceeded messages.
Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
try `OFF_HEAP` (and use Tachyon).

Burak

On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra  wrote:

> My program in pseudocode looks like this:
>
> val conf = new SparkConf().setAppName("Test")
>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>   .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
> many files open"
>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
> errors?
>   .set("spark.akka.frameSize","500") // helpful when using
> consildateFiles=true
>   .set("spark.akka.askTimeout", "30")
>   .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.speculation","true")
>   .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>   .set("spark.driver.maxResultSize","2048") // in response to error:
> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>   .set("spark.kryo.registrationRequired", "true")
>
> val rdd1 = 
> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
> val rdd2 =
> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
>
> I run the code with:
>   --num-executors 500 \
>   --driver-memory 20g \
>   --executor-memory 20g \
>   --executor-cores 32 \
>
>
> I'm using kryo serialization on everything, including broadcast variables.
>
> Spark creates 145k tasks, and the first stage includes everything before
> groupByKey(). It fails before getting to groupByKey. I have tried doubling
> and tripling the number of partitions when calling textFile, with no
> success.
>
> Very similar code (trivial changes, to accomodate different input) worked
> on a smaller input (~8TB)... Not that it was easy to get that working.
>
>
>
> Errors vary, here is what I am getting right now:
>
> ERROR SendingConnection: Exception while reading SendingConnection
> ... java.nio.channels.ClosedChannelException
> (^ guessing that is symptom of something else)
>
> WARN BlockManagerMasterActor: Removing BlockManager
> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
> (^ guessing that is symptom of something else)
>
> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
> down ActorSystem [sparkDriver]
> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
>
>
> Other times I will get messages about "executor lost..." about 1 message
> per second, after ~~50k tasks complete, until there are almost no executors
> left and progress slows to nothing.
>
> I ran with verbose GC info; I do see failing yarn containers that have
> multiple (like 30) "Full GC" messages but I don't know how to interpret if
> that is the problem. Typical Full GC time taken seems ok: [Times:
> user=23.30 sys=0.06, real=1.94 secs]
>
>
>
> Suggestions, please?
>
> Huge thanks for useful suggestions,
> Arun
>