Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Patrick Wendell
Hey Yiannis,

If you just perform a count on each "name", "date" pair... can it succeed?
If so, can you do a count and then order by to find the largest one?

I'm wondering if there is a single pathologically large group here that is
somehow causing OOM.

Also, to be clear, you are getting GC limit warnings on the executors, not
the driver. Correct?

- Patrick

On Mon, Mar 23, 2015 at 10:21 AM, Martin Goodson 
wrote:

> Have you tried to repartition() your original data to make more partitions
> before you aggregate?
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
> On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas 
> wrote:
>
>> Hi Yin,
>>
>> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
>> without any success.
>> I cannot figure out how to increase the number of mapPartitions tasks.
>>
>> Thanks a lot
>>
>> On 20 March 2015 at 18:44, Yin Huai  wrote:
>>
>>> spark.sql.shuffle.partitions only control the number of tasks in the
>>> second stage (the number of reducers). For your case, I'd say that the
>>> number of tasks in the first state (number of mappers) will be the number
>>> of files you have.
>>>
>>> Actually, have you changed "spark.executor.memory" (it controls the
>>> memory for an executor of your application)? I did not see it in your
>>> original email. The difference between worker memory and executor memory
>>> can be found at (
>>> http://spark.apache.org/docs/1.3.0/spark-standalone.html),
>>>
>>> SPARK_WORKER_MEMORY
>>> Total amount of memory to allow Spark applications to use on the
>>> machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that
>>> each application's individual memory is configured using its
>>> spark.executor.memory property.
>>>
>>>
>>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
>>> wrote:
>>>
 Actually I realized that the correct way is:

 sqlContext.sql("set spark.sql.shuffle.partitions=1000")

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> the way I set the configuration is:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>
> it is the correct way right?
> In the mapPartitions task (the first task which is launched), I get
> again the same number of tasks and again the same error. :(
>
> Thanks a lot!
>
> On 19 March 2015 at 17:40, Yiannis Gkoufas 
> wrote:
>
>> Hi Yin,
>>
>> thanks a lot for that! Will give it a shot and let you know.
>>
>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>
>>> Was the OOM thrown during the execution of first stage (map) or the
>>> second stage (reduce)? If it was the second stage, can you increase the
>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>
>>> This setting controls the number of reduces Spark SQL will use and
>>> the default is 200. Maybe there are too many distinct values and the 
>>> memory
>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>> will help.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>>> johngou...@gmail.com> wrote:
>>>
 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, "Yin Huai"  wrote:

> Seems there are too many distinct groups processed in a task,
> which trigger the problem.
>
> How many files do your dataset have and how large is a file? Seems
> your query will be executed with two stages, table scan and map-side
> aggregation in the first stage and the final round of reduce-side
> aggregation in the second stage. Can you take a look at the numbers of
> tasks launched in these two stages?
>
> Thanks,
>
> Yin
>
> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
> johngou...@gmail.com> wrote:
>
>> Hi there, I set the executor memory to 8g but it didn't help
>>
>> On 18 March 2015 at 13:59, Cheng Lian 
>> wrote:
>>
>>> You should probably increase executor memory by setting
>>> "spark.executor.memory".
>>>
>>> Full list of available configurations can be found here
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> Cheng
>>>
>>>
>>> On 3/18/15 9:15 PM, Yiannis Gkoufas

Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Martin Goodson
Have you tried to repartition() your original data to make more partitions
before you aggregate?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]

On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas 
wrote:

> Hi Yin,
>
> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
> without any success.
> I cannot figure out how to increase the number of mapPartitions tasks.
>
> Thanks a lot
>
> On 20 March 2015 at 18:44, Yin Huai  wrote:
>
>> spark.sql.shuffle.partitions only control the number of tasks in the
>> second stage (the number of reducers). For your case, I'd say that the
>> number of tasks in the first state (number of mappers) will be the number
>> of files you have.
>>
>> Actually, have you changed "spark.executor.memory" (it controls the
>> memory for an executor of your application)? I did not see it in your
>> original email. The difference between worker memory and executor memory
>> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
>> ),
>>
>> SPARK_WORKER_MEMORY
>> Total amount of memory to allow Spark applications to use on the machine,
>> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
>> application's individual memory is configured using its
>> spark.executor.memory property.
>>
>>
>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
>> wrote:
>>
>>> Actually I realized that the correct way is:
>>>
>>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>>
>>> but I am still experiencing the same behavior/error.
>>>
>>> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>>>
 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf("spark.sql.shuffle.partitions","1000");

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> thanks a lot for that! Will give it a shot and let you know.
>
> On 19 March 2015 at 16:30, Yin Huai  wrote:
>
>> Was the OOM thrown during the execution of first stage (map) or the
>> second stage (reduce)? If it was the second stage, can you increase the
>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>
>> This setting controls the number of reduces Spark SQL will use and
>> the default is 200. Maybe there are too many distinct values and the 
>> memory
>> pressure on every task (of those 200 reducers) is pretty high. You can
>> start with 400 and increase it until the OOM disappears. Hopefully this
>> will help.
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>> johngou...@gmail.com> wrote:
>>
>>> Hi Yin,
>>>
>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>>> each. The number of tasks launched is equal to the number of parquet 
>>> files.
>>> Do you have any idea on how to deal with this situation?
>>>
>>> Thanks a lot
>>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>>
 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
 johngou...@gmail.com> wrote:

> Hi there, I set the executor memory to 8g but it didn't help
>
> On 18 March 2015 at 13:59, Cheng Lian 
> wrote:
>
>> You should probably increase executor memory by setting
>> "spark.executor.memory".
>>
>> Full list of available configurations can be found here
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> Cheng
>>
>>
>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>
>>> Hi there,
>>>
>>> I was trying the new DataFrame API with some basic operations on
>>> a parquet dataset.
>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>>> in a standalone cluster mode.
>>> The code is the following:
>>>
>>> val people = sqlContext.parquetFile("/data.parquet");
>>> val res = people.groupBy("name","date").
>>> agg(sum("power"),sum("supply")).take(10);
>>> System.out.println(res);
>>>
>>> The

Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Yiannis Gkoufas
Hi Yin,

Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
without any success.
I cannot figure out how to increase the number of mapPartitions tasks.

Thanks a lot

On 20 March 2015 at 18:44, Yin Huai  wrote:

> spark.sql.shuffle.partitions only control the number of tasks in the
> second stage (the number of reducers). For your case, I'd say that the
> number of tasks in the first state (number of mappers) will be the number
> of files you have.
>
> Actually, have you changed "spark.executor.memory" (it controls the
> memory for an executor of your application)? I did not see it in your
> original email. The difference between worker memory and executor memory
> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
> ),
>
> SPARK_WORKER_MEMORY
> Total amount of memory to allow Spark applications to use on the machine,
> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
> application's individual memory is configured using its
> spark.executor.memory property.
>
>
> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
> wrote:
>
>> Actually I realized that the correct way is:
>>
>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>
>> but I am still experiencing the same behavior/error.
>>
>> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>>
>>> Hi Yin,
>>>
>>> the way I set the configuration is:
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>>
>>> it is the correct way right?
>>> In the mapPartitions task (the first task which is launched), I get
>>> again the same number of tasks and again the same error. :(
>>>
>>> Thanks a lot!
>>>
>>> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>>>
 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai  wrote:

> Was the OOM thrown during the execution of first stage (map) or the
> second stage (reduce)? If it was the second stage, can you increase the
> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>
> This setting controls the number of reduces Spark SQL will use and the
> default is 200. Maybe there are too many distinct values and the memory
> pressure on every task (of those 200 reducers) is pretty high. You can
> start with 400 and increase it until the OOM disappears. Hopefully this
> will help.
>
> Thanks,
>
> Yin
>
>
> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas  > wrote:
>
>> Hi Yin,
>>
>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>> each. The number of tasks launched is equal to the number of parquet 
>> files.
>> Do you have any idea on how to deal with this situation?
>>
>> Thanks a lot
>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>
>>> Seems there are too many distinct groups processed in a task, which
>>> trigger the problem.
>>>
>>> How many files do your dataset have and how large is a file? Seems
>>> your query will be executed with two stages, table scan and map-side
>>> aggregation in the first stage and the final round of reduce-side
>>> aggregation in the second stage. Can you take a look at the numbers of
>>> tasks launched in these two stages?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>> johngou...@gmail.com> wrote:
>>>
 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian 
 wrote:

> You should probably increase executor memory by setting
> "spark.executor.memory".
>
> Full list of available configurations can be found here
> http://spark.apache.org/docs/latest/configuration.html
>
> Cheng
>
>
> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>
>> Hi there,
>>
>> I was trying the new DataFrame API with some basic operations on
>> a parquet dataset.
>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>> in a standalone cluster mode.
>> The code is the following:
>>
>> val people = sqlContext.parquetFile("/data.parquet");
>> val res = people.groupBy("name","date").
>> agg(sum("power"),sum("supply")).take(10);
>> System.out.println(res);
>>
>> The dataset consists of 16 billion entries.
>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>> exceeded
>>
>> My configuration is:
>>
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.driver.memory6g
>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>> spark.shuffle.managersort
>>>

Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yin Huai
spark.sql.shuffle.partitions only control the number of tasks in the second
stage (the number of reducers). For your case, I'd say that the number of
tasks in the first state (number of mappers) will be the number of files
you have.

Actually, have you changed "spark.executor.memory" (it controls the memory
for an executor of your application)? I did not see it in your original
email. The difference between worker memory and executor memory can be
found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html),

SPARK_WORKER_MEMORY
Total amount of memory to allow Spark applications to use on the machine,
e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
application's individual memory is configured using its
spark.executor.memory property.


On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
wrote:

> Actually I realized that the correct way is:
>
> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>
> but I am still experiencing the same behavior/error.
>
> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>
>> Hi Yin,
>>
>> the way I set the configuration is:
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>
>> it is the correct way right?
>> In the mapPartitions task (the first task which is launched), I get again
>> the same number of tasks and again the same error. :(
>>
>> Thanks a lot!
>>
>> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>>
>>> Hi Yin,
>>>
>>> thanks a lot for that! Will give it a shot and let you know.
>>>
>>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>>
 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
> The number of tasks launched is equal to the number of parquet files. Do
> you have any idea on how to deal with this situation?
>
> Thanks a lot
> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>
>> Seems there are too many distinct groups processed in a task, which
>> trigger the problem.
>>
>> How many files do your dataset have and how large is a file? Seems
>> your query will be executed with two stages, table scan and map-side
>> aggregation in the first stage and the final round of reduce-side
>> aggregation in the second stage. Can you take a look at the numbers of
>> tasks launched in these two stages?
>>
>> Thanks,
>>
>> Yin
>>
>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>> johngou...@gmail.com> wrote:
>>
>>> Hi there, I set the executor memory to 8g but it didn't help
>>>
>>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>>
 You should probably increase executor memory by setting
 "spark.executor.memory".

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

> Hi there,
>
> I was trying the new DataFrame API with some basic operations on a
> parquet dataset.
> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
> a standalone cluster mode.
> The code is the following:
>
> val people = sqlContext.parquetFile("/data.parquet");
> val res = people.groupBy("name","date").
> agg(sum("power"),sum("supply")).take(10);
> System.out.println(res);
>
> The dataset consists of 16 billion entries.
> The error I get is java.lang.OutOfMemoryError: GC overhead limit
> exceeded
>
> My configuration is:
>
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.driver.memory6g
> spark.executor.extraJavaOptions -XX:+UseCompressedOops
> spark.shuffle.managersort
>
> Any idea how can I workaround this?
>
> Thanks a lot
>


>>>
>>

>>>
>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Actually I realized that the correct way is:

sqlContext.sql("set spark.sql.shuffle.partitions=1000")

but I am still experiencing the same behavior/error.

On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:

> Hi Yin,
>
> the way I set the configuration is:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>
> it is the correct way right?
> In the mapPartitions task (the first task which is launched), I get again
> the same number of tasks and again the same error. :(
>
> Thanks a lot!
>
> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>
>> Hi Yin,
>>
>> thanks a lot for that! Will give it a shot and let you know.
>>
>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>
>>> Was the OOM thrown during the execution of first stage (map) or the
>>> second stage (reduce)? If it was the second stage, can you increase the
>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>
>>> This setting controls the number of reduces Spark SQL will use and the
>>> default is 200. Maybe there are too many distinct values and the memory
>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>> will help.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
>>> wrote:
>>>
 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, "Yin Huai"  wrote:

> Seems there are too many distinct groups processed in a task, which
> trigger the problem.
>
> How many files do your dataset have and how large is a file? Seems
> your query will be executed with two stages, table scan and map-side
> aggregation in the first stage and the final round of reduce-side
> aggregation in the second stage. Can you take a look at the numbers of
> tasks launched in these two stages?
>
> Thanks,
>
> Yin
>
> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
> johngou...@gmail.com> wrote:
>
>> Hi there, I set the executor memory to 8g but it didn't help
>>
>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>
>>> You should probably increase executor memory by setting
>>> "spark.executor.memory".
>>>
>>> Full list of available configurations can be found here
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> Cheng
>>>
>>>
>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>
 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
 a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile("/data.parquet");
 val res = people.groupBy("name","date").
 agg(sum("power"),sum("supply")).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot

>>>
>>>
>>
>
>>>
>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Hi Yin,

the way I set the configuration is:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions","1000");

it is the correct way right?
In the mapPartitions task (the first task which is launched), I get again
the same number of tasks and again the same error. :(

Thanks a lot!

On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:

> Hi Yin,
>
> thanks a lot for that! Will give it a shot and let you know.
>
> On 19 March 2015 at 16:30, Yin Huai  wrote:
>
>> Was the OOM thrown during the execution of first stage (map) or the
>> second stage (reduce)? If it was the second stage, can you increase the
>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>
>> This setting controls the number of reduces Spark SQL will use and the
>> default is 200. Maybe there are too many distinct values and the memory
>> pressure on every task (of those 200 reducers) is pretty high. You can
>> start with 400 and increase it until the OOM disappears. Hopefully this
>> will help.
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi Yin,
>>>
>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
>>> The number of tasks launched is equal to the number of parquet files. Do
>>> you have any idea on how to deal with this situation?
>>>
>>> Thanks a lot
>>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>>
 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas >>> > wrote:

> Hi there, I set the executor memory to 8g but it didn't help
>
> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>
>> You should probably increase executor memory by setting
>> "spark.executor.memory".
>>
>> Full list of available configurations can be found here
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> Cheng
>>
>>
>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>
>>> Hi there,
>>>
>>> I was trying the new DataFrame API with some basic operations on a
>>> parquet dataset.
>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>>> standalone cluster mode.
>>> The code is the following:
>>>
>>> val people = sqlContext.parquetFile("/data.parquet");
>>> val res = people.groupBy("name","date").
>>> agg(sum("power"),sum("supply")).take(10);
>>> System.out.println(res);
>>>
>>> The dataset consists of 16 billion entries.
>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>> exceeded
>>>
>>> My configuration is:
>>>
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.driver.memory6g
>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>> spark.shuffle.managersort
>>>
>>> Any idea how can I workaround this?
>>>
>>> Thanks a lot
>>>
>>
>>
>

>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-19 Thread Yiannis Gkoufas
Hi Yin,

thanks a lot for that! Will give it a shot and let you know.

On 19 March 2015 at 16:30, Yin Huai  wrote:

> Was the OOM thrown during the execution of first stage (map) or the second
> stage (reduce)? If it was the second stage, can you increase the value
> of spark.sql.shuffle.partitions and see if the OOM disappears?
>
> This setting controls the number of reduces Spark SQL will use and the
> default is 200. Maybe there are too many distinct values and the memory
> pressure on every task (of those 200 reducers) is pretty high. You can
> start with 400 and increase it until the OOM disappears. Hopefully this
> will help.
>
> Thanks,
>
> Yin
>
>
> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
> wrote:
>
>> Hi Yin,
>>
>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
>> The number of tasks launched is equal to the number of parquet files. Do
>> you have any idea on how to deal with this situation?
>>
>> Thanks a lot
>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>
>>> Seems there are too many distinct groups processed in a task, which
>>> trigger the problem.
>>>
>>> How many files do your dataset have and how large is a file? Seems your
>>> query will be executed with two stages, table scan and map-side aggregation
>>> in the first stage and the final round of reduce-side aggregation in the
>>> second stage. Can you take a look at the numbers of tasks launched in these
>>> two stages?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
>>> wrote:
>>>
 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian  wrote:

> You should probably increase executor memory by setting
> "spark.executor.memory".
>
> Full list of available configurations can be found here
> http://spark.apache.org/docs/latest/configuration.html
>
> Cheng
>
>
> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>
>> Hi there,
>>
>> I was trying the new DataFrame API with some basic operations on a
>> parquet dataset.
>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>> standalone cluster mode.
>> The code is the following:
>>
>> val people = sqlContext.parquetFile("/data.parquet");
>> val res = people.groupBy("name","date").
>> agg(sum("power"),sum("supply")).take(10);
>> System.out.println(res);
>>
>> The dataset consists of 16 billion entries.
>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>> exceeded
>>
>> My configuration is:
>>
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.driver.memory6g
>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>> spark.shuffle.managersort
>>
>> Any idea how can I workaround this?
>>
>> Thanks a lot
>>
>
>

>>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-19 Thread Yin Huai
Was the OOM thrown during the execution of first stage (map) or the second
stage (reduce)? If it was the second stage, can you increase the value
of spark.sql.shuffle.partitions and see if the OOM disappears?

This setting controls the number of reduces Spark SQL will use and the
default is 200. Maybe there are too many distinct values and the memory
pressure on every task (of those 200 reducers) is pretty high. You can
start with 400 and increase it until the OOM disappears. Hopefully this
will help.

Thanks,

Yin


On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
wrote:

> Hi Yin,
>
> Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The
> number of tasks launched is equal to the number of parquet files. Do you
> have any idea on how to deal with this situation?
>
> Thanks a lot
> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>
>> Seems there are too many distinct groups processed in a task, which
>> trigger the problem.
>>
>> How many files do your dataset have and how large is a file? Seems your
>> query will be executed with two stages, table scan and map-side aggregation
>> in the first stage and the final round of reduce-side aggregation in the
>> second stage. Can you take a look at the numbers of tasks launched in these
>> two stages?
>>
>> Thanks,
>>
>> Yin
>>
>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi there, I set the executor memory to 8g but it didn't help
>>>
>>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>>
 You should probably increase executor memory by setting
 "spark.executor.memory".

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

> Hi there,
>
> I was trying the new DataFrame API with some basic operations on a
> parquet dataset.
> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
> standalone cluster mode.
> The code is the following:
>
> val people = sqlContext.parquetFile("/data.parquet");
> val res = people.groupBy("name","date").agg(sum("power"),sum("supply")
> ).take(10);
> System.out.println(res);
>
> The dataset consists of 16 billion entries.
> The error I get is java.lang.OutOfMemoryError: GC overhead limit
> exceeded
>
> My configuration is:
>
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.driver.memory6g
> spark.executor.extraJavaOptions -XX:+UseCompressedOops
> spark.shuffle.managersort
>
> Any idea how can I workaround this?
>
> Thanks a lot
>


>>>
>>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi Yin,

Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The
number of tasks launched is equal to the number of parquet files. Do you
have any idea on how to deal with this situation?

Thanks a lot
On 18 Mar 2015 17:35, "Yin Huai"  wrote:

> Seems there are too many distinct groups processed in a task, which
> trigger the problem.
>
> How many files do your dataset have and how large is a file? Seems your
> query will be executed with two stages, table scan and map-side aggregation
> in the first stage and the final round of reduce-side aggregation in the
> second stage. Can you take a look at the numbers of tasks launched in these
> two stages?
>
> Thanks,
>
> Yin
>
> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
> wrote:
>
>> Hi there, I set the executor memory to 8g but it didn't help
>>
>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>
>>> You should probably increase executor memory by setting
>>> "spark.executor.memory".
>>>
>>> Full list of available configurations can be found here
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> Cheng
>>>
>>>
>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>
 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile("/data.parquet");
 val res = people.groupBy("name","date").agg(sum("power"),sum("supply")
 ).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot

>>>
>>>
>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yin Huai
Seems there are too many distinct groups processed in a task, which trigger
the problem.

How many files do your dataset have and how large is a file? Seems your
query will be executed with two stages, table scan and map-side aggregation
in the first stage and the final round of reduce-side aggregation in the
second stage. Can you take a look at the numbers of tasks launched in these
two stages?

Thanks,

Yin

On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
wrote:

> Hi there, I set the executor memory to 8g but it didn't help
>
> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>
>> You should probably increase executor memory by setting
>> "spark.executor.memory".
>>
>> Full list of available configurations can be found here
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> Cheng
>>
>>
>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>
>>> Hi there,
>>>
>>> I was trying the new DataFrame API with some basic operations on a
>>> parquet dataset.
>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>>> standalone cluster mode.
>>> The code is the following:
>>>
>>> val people = sqlContext.parquetFile("/data.parquet");
>>> val res = people.groupBy("name","date").agg(sum("power"),sum("supply")
>>> ).take(10);
>>> System.out.println(res);
>>>
>>> The dataset consists of 16 billion entries.
>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> My configuration is:
>>>
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.driver.memory6g
>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>> spark.shuffle.managersort
>>>
>>> Any idea how can I workaround this?
>>>
>>> Thanks a lot
>>>
>>
>>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi there, I set the executor memory to 8g but it didn't help

On 18 March 2015 at 13:59, Cheng Lian  wrote:

> You should probably increase executor memory by setting
> "spark.executor.memory".
>
> Full list of available configurations can be found here
> http://spark.apache.org/docs/latest/configuration.html
>
> Cheng
>
>
> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>
>> Hi there,
>>
>> I was trying the new DataFrame API with some basic operations on a
>> parquet dataset.
>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>> standalone cluster mode.
>> The code is the following:
>>
>> val people = sqlContext.parquetFile("/data.parquet");
>> val res = people.groupBy("name","date").agg(sum("power"),sum("supply")
>> ).take(10);
>> System.out.println(res);
>>
>> The dataset consists of 16 billion entries.
>> The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> My configuration is:
>>
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.driver.memory6g
>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>> spark.shuffle.managersort
>>
>> Any idea how can I workaround this?
>>
>> Thanks a lot
>>
>
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Cheng Lian
You should probably increase executor memory by setting 
"spark.executor.memory".


Full list of available configurations can be found here 
http://spark.apache.org/docs/latest/configuration.html


Cheng

On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

Hi there,

I was trying the new DataFrame API with some basic operations on a 
parquet dataset.
I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a 
standalone cluster mode.

The code is the following:

val people = sqlContext.parquetFile("/data.parquet");
val res = 
people.groupBy("name","date").agg(sum("power"),sum("supply")).take(10);

System.out.println(res);

The dataset consists of 16 billion entries.
The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded

My configuration is:

spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory6g
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.shuffle.managersort

Any idea how can I workaround this?

Thanks a lot



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi there,

I was trying the new DataFrame API with some basic operations on a parquet
dataset.
I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
standalone cluster mode.
The code is the following:

val people = sqlContext.parquetFile("/data.parquet");
val res =
people.groupBy("name","date").agg(sum("power"),sum("supply")).take(10);
System.out.println(res);

The dataset consists of 16 billion entries.
The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded

My configuration is:

spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.driver.memory6g
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.shuffle.managersort

Any idea how can I workaround this?

Thanks a lot