Re: partitionBy causing OOM

2017-09-26 Thread Amit Sela
Thanks for all the answers!
It looks like increasing the heap a little, and setting spark.sql.
shuffle.partitions to a much lower number (I used the recommended
input_size_mb/128 formula) did the trick.
As for partitionBy, unless I use repartition("dt") before the writer, it
actually writes more than one output file per "dt" partition so I guess the
same "dt" value is spread across multiple partitions, right?

On Mon, Sep 25, 2017 at 11:07 PM ayan guha  wrote:

> Another possible option would be creating partitioned table in hive and
> use dynamic partitioning while inserting. This will not require spark to do
> explocit partition by
>
> On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Amit,
>>
>> Spark keeps the partition that it is working on in memory (and does not
>> spill to disk even if it is running OOM). Also since you are getting OOM
>> when using partitionBy (and not when you just use flatMap), there should be
>> one (or few) dates on which your partition size is bigger than the heap.
>> You can do a count on dates to check if there is skewness in your data.
>>
>> The way out would be increase the heap size or use columns in partitionBy
>> (like date + hour) to distribute the data better.
>>
>> Hope this helps!
>>
>> Thanks
>> Ankur
>>
>> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:
>>
>>> Hi, Amit,
>>>
>>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>>> The default is 200 change this property could change the task number
>>> when you are using DataFrame API.
>>>
>>> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>>>
>>> I'm trying to run a simple pyspark application that reads from file
>>> (json), flattens it (explode) and writes back to file (json) partitioned by
>>> date using DataFrameWriter.partitionBy(*cols).
>>>
>>> I keep getting OOMEs like:
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>>> at
>>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>>> ...
>>>
>>> Explode could make the underlying RDD grow a lot, and maybe in an
>>> unbalanced way sometimes,
>>> adding to that partitioning by date (in daily ETLs for instance) would
>>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>>
>>> If I'm not using "partitionBy" with the writer (still exploding)
>>> everything works fine.
>>>
>>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>>> both in python and scala).
>>>
>>> Thanks!
>>>
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: partitionBy causing OOM

2017-09-25 Thread ayan guha
Another possible option would be creating partitioned table in hive and use
dynamic partitioning while inserting. This will not require spark to do
explocit partition by

On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Amit,
>
> Spark keeps the partition that it is working on in memory (and does not
> spill to disk even if it is running OOM). Also since you are getting OOM
> when using partitionBy (and not when you just use flatMap), there should be
> one (or few) dates on which your partition size is bigger than the heap.
> You can do a count on dates to check if there is skewness in your data.
>
> The way out would be increase the heap size or use columns in partitionBy
> (like date + hour) to distribute the data better.
>
> Hope this helps!
>
> Thanks
> Ankur
>
> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:
>
>> Hi, Amit,
>>
>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>> The default is 200 change this property could change the task number when
>> you are using DataFrame API.
>>
>> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>>
>> I'm trying to run a simple pyspark application that reads from file
>> (json), flattens it (explode) and writes back to file (json) partitioned by
>> date using DataFrameWriter.partitionBy(*cols).
>>
>> I keep getting OOMEs like:
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>> at
>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>> ...
>>
>> Explode could make the underlying RDD grow a lot, and maybe in an
>> unbalanced way sometimes,
>> adding to that partitioning by date (in daily ETLs for instance) would
>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>
>> If I'm not using "partitionBy" with the writer (still exploding)
>> everything works fine.
>>
>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>> both in python and scala).
>>
>> Thanks!
>>
>>
>>
> --
Best Regards,
Ayan Guha


Re: partitionBy causing OOM

2017-09-25 Thread Ankur Srivastava
Hi Amit,

Spark keeps the partition that it is working on in memory (and does not
spill to disk even if it is running OOM). Also since you are getting OOM
when using partitionBy (and not when you just use flatMap), there should be
one (or few) dates on which your partition size is bigger than the heap.
You can do a count on dates to check if there is skewness in your data.

The way out would be increase the heap size or use columns in partitionBy
(like date + hour) to distribute the data better.

Hope this helps!

Thanks
Ankur

On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:

> Hi, Amit,
>
> Maybe you can change this configuration spark.sql.shuffle.partitions.
> The default is 200 change this property could change the task number when
> you are using DataFrame API.
>
> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>
> I'm trying to run a simple pyspark application that reads from file
> (json), flattens it (explode) and writes back to file (json) partitioned by
> date using DataFrameWriter.partitionBy(*cols).
>
> I keep getting OOMEs like:
> java.lang.OutOfMemoryError: Java heap space
> at org.apache.spark.util.collection.unsafe.sort.
> UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
> at org.apache.spark.util.collection.unsafe.sort.
> UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
> at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(
> TaskMemoryManager.java:203)
> ...
>
> Explode could make the underlying RDD grow a lot, and maybe in an
> unbalanced way sometimes,
> adding to that partitioning by date (in daily ETLs for instance) would
> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>
> If I'm not using "partitionBy" with the writer (still exploding)
> everything works fine.
>
> This happens both in EMR and in local (mac) pyspark/spark shell (tried
> both in python and scala).
>
> Thanks!
>
>
>


Re: partitionBy causing OOM

2017-09-25 Thread 孫澤恩
Hi, Amit,

Maybe you can change this configuration spark.sql.shuffle.partitions.
The default is 200 change this property could change the task number when you 
are using DataFrame API.

> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
> 
> I'm trying to run a simple pyspark application that reads from file (json), 
> flattens it (explode) and writes back to file (json) partitioned by date 
> using DataFrameWriter.partitionBy(*cols).
> 
> I keep getting OOMEs like:
> java.lang.OutOfMemoryError: Java heap space
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
> ...
> 
> Explode could make the underlying RDD grow a lot, and maybe in an unbalanced 
> way sometimes,  
> adding to that partitioning by date (in daily ETLs for instance) would 
> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark 
> supposed to spill to disk if the underlying RDD is too big to fit in memory?
> 
> If I'm not using "partitionBy" with the writer (still exploding) everything 
> works fine.
> 
> This happens both in EMR and in local (mac) pyspark/spark shell (tried both 
> in python and scala).
> 
> Thanks!



partitionBy causing OOM

2017-09-25 Thread Amit Sela
I'm trying to run a simple pyspark application that reads from file (json),
flattens it (explode) and writes back to file (json) partitioned by date
using DataFrameWriter.partitionBy(*cols).

I keep getting OOMEs like:
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
...

Explode could make the underlying RDD grow a lot, and maybe in an
unbalanced way sometimes,
adding to that partitioning by date (in daily ETLs for instance) would
probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
supposed to spill to disk if the underlying RDD is too big to fit in memory?

If I'm not using "partitionBy" with the writer (still exploding) everything
works fine.

This happens both in EMR and in local (mac) pyspark/spark shell (tried both
in python and scala).

Thanks!