Have you tried to repartition() your original data to make more partitions
before you aggregate?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]

On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas <johngou...@gmail.com>
wrote:

> Hi Yin,
>
> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
> without any success.
> I cannot figure out how to increase the number of mapPartitions tasks.
>
> Thanks a lot
>
> On 20 March 2015 at 18:44, Yin Huai <yh...@databricks.com> wrote:
>
>> spark.sql.shuffle.partitions only control the number of tasks in the
>> second stage (the number of reducers). For your case, I'd say that the
>> number of tasks in the first state (number of mappers) will be the number
>> of files you have.
>>
>> Actually, have you changed "spark.executor.memory" (it controls the
>> memory for an executor of your application)? I did not see it in your
>> original email. The difference between worker memory and executor memory
>> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
>> ),
>>
>> SPARK_WORKER_MEMORY
>> Total amount of memory to allow Spark applications to use on the machine,
>> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
>> application's individual memory is configured using its
>> spark.executor.memory property.
>>
>>
>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com>
>> wrote:
>>
>>> Actually I realized that the correct way is:
>>>
>>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>>
>>> but I am still experiencing the same behavior/error.
>>>
>>> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>>>
>>>> Hi Yin,
>>>>
>>>> the way I set the configuration is:
>>>>
>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>>>
>>>> it is the correct way right?
>>>> In the mapPartitions task (the first task which is launched), I get
>>>> again the same number of tasks and again the same error. :(
>>>>
>>>> Thanks a lot!
>>>>
>>>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Yin,
>>>>>
>>>>> thanks a lot for that! Will give it a shot and let you know.
>>>>>
>>>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote:
>>>>>
>>>>>> Was the OOM thrown during the execution of first stage (map) or the
>>>>>> second stage (reduce)? If it was the second stage, can you increase the
>>>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>>>>
>>>>>> This setting controls the number of reduces Spark SQL will use and
>>>>>> the default is 200. Maybe there are too many distinct values and the 
>>>>>> memory
>>>>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>>>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>>>>> will help.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yin
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>>>>>> johngou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Yin,
>>>>>>>
>>>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>>>>>>> each. The number of tasks launched is equal to the number of parquet 
>>>>>>> files.
>>>>>>> Do you have any idea on how to deal with this situation?
>>>>>>>
>>>>>>> Thanks a lot
>>>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote:
>>>>>>>
>>>>>>>> Seems there are too many distinct groups processed in a task, which
>>>>>>>> trigger the problem.
>>>>>>>>
>>>>>>>> How many files do your dataset have and how large is a file? Seems
>>>>>>>> your query will be executed with two stages, table scan and map-side
>>>>>>>> aggregation in the first stage and the final round of reduce-side
>>>>>>>> aggregation in the second stage. Can you take a look at the numbers of
>>>>>>>> tasks launched in these two stages?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Yin
>>>>>>>>
>>>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>>>>>>> johngou...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi there, I set the executor memory to 8g but it didn't help
>>>>>>>>>
>>>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You should probably increase executor memory by setting
>>>>>>>>>> "spark.executor.memory".
>>>>>>>>>>
>>>>>>>>>> Full list of available configurations can be found here
>>>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>>>
>>>>>>>>>> Cheng
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi there,
>>>>>>>>>>>
>>>>>>>>>>> I was trying the new DataFrame API with some basic operations on
>>>>>>>>>>> a parquet dataset.
>>>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>>>>>>>>>>> in a standalone cluster mode.
>>>>>>>>>>> The code is the following:
>>>>>>>>>>>
>>>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet");
>>>>>>>>>>> val res = people.groupBy("name","date").
>>>>>>>>>>> agg(sum("power"),sum("supply")).take(10);
>>>>>>>>>>> System.out.println(res);
>>>>>>>>>>>
>>>>>>>>>>> The dataset consists of 16 billion entries.
>>>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>>>>>>>>>> exceeded
>>>>>>>>>>>
>>>>>>>>>>> My configuration is:
>>>>>>>>>>>
>>>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>>>>>>> spark.driver.memory    6g
>>>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>>>>>>>>>> spark.shuffle.manager    sort
>>>>>>>>>>>
>>>>>>>>>>> Any idea how can I workaround this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to