Re: Hive Query on Spark fails with OOM

2016-03-15 Thread Sabarish Sasidharan
Yes, I suggested increasing shuffle partitions to address this problem. The
other suggestion to increase shuffle fraction was not for this but makes
sense given that you are reserving all that memory and doing nothing with
it. By diverting more of it for shuffles you can help improve your shuffle
performance.

Regards
Sab
On 14-Mar-2016 2:33 pm, "Prabhu Joseph"  wrote:

> The issue is the query hits OOM on a Stage when reading Shuffle Output
> from previous stage.How come increasing shuffle memory helps to avoid OOM.
>
> On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
>
>> Thats a pretty old version of Spark SQL. It is devoid of all the
>> improvements introduced in the last few releases.
>>
>> You should try bumping your spark.sql.shuffle.partitions to a value
>> higher than default (5x or 10x). Also increase your shuffle memory fraction
>> as you really are not explicitly caching anything. You could simply swap
>> the fractions in your case.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>>
>>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 I believe the OP is using Spark SQL and not Hive on Spark.

 Regards
 Sab

 On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> I think the only version of Spark that works OK with Hive (Hive on
> Spark engine) is version 1.3.1. I also get OOM from time to time and have
> to revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by
>> version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot
>>> of time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same
>>> instance_id (maximum count of number of rows with same instance_id is 
>>> 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory.
>>> YARN has allotted all the requested resource immediately and no other 
>>> job
>>> is running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>> write 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL
>>> and output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data
>>> when it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a
>>> better performance than Mapreduce. And what are the hive queries that 
>>> won't
>>> perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number
>>> of executors for given input data size. We don't specify Spark 
>>> Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why 
>>> Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle
>>> and some need lot of memory for cache. So, when a Spark Executor 

Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Michael Armbrust
On Mon, Mar 14, 2016 at 1:30 PM, Prabhu Joseph 
wrote:
>
> Thanks for the recommendation. But can you share what are the
> improvements made above Spark-1.2.1 and how which specifically handle the
> issue that is observed here.
>

Memory used for query execution is now explicitly accounted for:

https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
Michael,

Thanks for the recommendation. But can you share what are the
improvements made above Spark-1.2.1 and how which specifically handle the
issue that is observed here.



On Tue, Mar 15, 2016 at 12:03 AM, Jörn Franke  wrote:

> I am not sure about this. At least Hortonworks provides its distribution
> with Hive and Spark 1.6
>
> On 14 Mar 2016, at 09:25, Mich Talebzadeh 
> wrote:
>
> I think the only version of Spark that works OK with Hive (Hive on Spark
> engine) is version 1.3.1. I also get OOM from time to time and have to
> revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>> time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
>>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same instance_id
>>> (maximum count of number of rows with same instance_id is 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>> has allotted all the requested resource immediately and no other job is
>>> running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
>>> 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>> output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>>> it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a better
>>> performance than Mapreduce. And what are the hive queries that won't perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number of
>>> executors for given input data size. We don't specify Spark Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>> memory available
>>> for cache and does not use the cache but when there is a need to do lot
>>> of shuffle, will executors only use the shuffle fraction which is set for
>>> doing shuffle or will it use
>>> the free memory available for cache as well.
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Jörn Franke
I am not sure about this. At least Hortonworks provides its distribution with 
Hive and Spark 1.6

> On 14 Mar 2016, at 09:25, Mich Talebzadeh  wrote:
> 
> I think the only version of Spark that works OK with Hive (Hive on Spark 
> engine) is version 1.3.1. I also get OOM from time to time and have to revert 
> using MR
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 14 March 2016 at 08:06, Sabarish Sasidharan 
>>  wrote:
>> Which version of Spark are you using? The configuration varies by version.
>> 
>> Regards
>> Sab
>> 
>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph 
>>>  wrote:
>>> Hi All,
>>> 
>>> A Hive Join query which runs fine and faster in MapReduce takes lot of time 
>>> with Spark and finally fails with OOM.
>>> 
>>> Query:  hivejoin.py
>>> 
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER 
>>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = 
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>> 
>>> 
>>> Data Study:
>>> 
>>> Number of Rows:
>>> 
>>> A table has 1002093508  
>>> B table has5371668
>>> C table has  1000
>>> 
>>> No Data Skewness:
>>> 
>>> item_id in B is unique and A has multiple rows with same item_id, so after 
>>> first INNER_JOIN the result set is same 1002093508 rows
>>> 
>>> instance_id in C is unique and A has multiple rows with same instance_id 
>>> (maximum count of number of rows with same instance_id is 250)
>>>  
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has 
>>> allotted all the requested resource immediately and no other job is running 
>>> on the 
>>> cluster.
>>> 
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>> 
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write 
>>> 500GB of intermediate data
>>> 
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and 
>>> output of 400GB is shuffled 
>>> 
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when it 
>>> reached 40GB data itself
>>> 
>>> First of all, what kind of Hive queries when run on Spark gets a better 
>>> performance than Mapreduce. And what are the hive queries that won't perform
>>> well in Spark.
>>>  
>>> How to calculate the optimal Heap for Executor Memory and the number of 
>>> executors for given input data size. We don't specify Spark Executors to 
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4 
>>> is failing immediately 
>>> when it has just read 40GB data, is it caching data in Memory. 
>>> 
>>> And in a Spark job, some stage will need lot of memory for shuffle and some 
>>> need lot of memory for cache. So, when a Spark Executor has lot of memory 
>>> available 
>>> for cache and does not use the cache but when there is a need to do lot of 
>>> shuffle, will executors only use the shuffle fraction which is set for 
>>> doing shuffle or will it use 
>>> the free memory available for cache as well.
>>> 
>>> 
>>> Thanks,
>>> Prabhu Joseph
>> 
>> 
>> 
>> -- 
>> 
>> Architect - Big Data
>> Ph: +91 99805 99458
>> 
>> Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
>> India ICT)
>> +++
> 


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Michael Armbrust
+1 to upgrading Spark.  1.2.1 has non of the memory management improvements
that were added in 1.4-1.6.

On Mon, Mar 14, 2016 at 2:03 AM, Prabhu Joseph 
wrote:

> The issue is the query hits OOM on a Stage when reading Shuffle Output
> from previous stage.How come increasing shuffle memory helps to avoid OOM.
>
> On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
>
>> Thats a pretty old version of Spark SQL. It is devoid of all the
>> improvements introduced in the last few releases.
>>
>> You should try bumping your spark.sql.shuffle.partitions to a value
>> higher than default (5x or 10x). Also increase your shuffle memory fraction
>> as you really are not explicitly caching anything. You could simply swap
>> the fractions in your case.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>>
>>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 I believe the OP is using Spark SQL and not Hive on Spark.

 Regards
 Sab

 On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> I think the only version of Spark that works OK with Hive (Hive on
> Spark engine) is version 1.3.1. I also get OOM from time to time and have
> to revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by
>> version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot
>>> of time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same
>>> instance_id (maximum count of number of rows with same instance_id is 
>>> 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory.
>>> YARN has allotted all the requested resource immediately and no other 
>>> job
>>> is running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>> write 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL
>>> and output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data
>>> when it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a
>>> better performance than Mapreduce. And what are the hive queries that 
>>> won't
>>> perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number
>>> of executors for given input data size. We don't specify Spark 
>>> Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why 
>>> Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle
>>> and some need lot of memory for cache. So, when a Spark Executor has 
>>> lot of
>>> memory available
>>> for cache and does not use the cache but when there is a need to do
>>> lot of shuffle, will executors only use the shuffle fraction which is 
>>> set

Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
The issue is the query hits OOM on a Stage when reading Shuffle Output from
previous stage.How come increasing shuffle memory helps to avoid OOM.

On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan  wrote:

> Thats a pretty old version of Spark SQL. It is devoid of all the
> improvements introduced in the last few releases.
>
> You should try bumping your spark.sql.shuffle.partitions to a value
> higher than default (5x or 10x). Also increase your shuffle memory fraction
> as you really are not explicitly caching anything. You could simply swap
> the fractions in your case.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph  > wrote:
>
>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>
>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe the OP is using Spark SQL and not Hive on Spark.
>>>
>>> Regards
>>> Sab
>>>
>>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I think the only version of Spark that works OK with Hive (Hive on
 Spark engine) is version 1.3.1. I also get OOM from time to time and have
 to revert using MR

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 14 March 2016 at 08:06, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> Which version of Spark are you using? The configuration varies by
> version.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
> prabhujose.ga...@gmail.com> wrote:
>
>> Hi All,
>>
>> A Hive Join query which runs fine and faster in MapReduce takes lot
>> of time with Spark and finally fails with OOM.
>>
>> *Query:  hivejoin.py*
>>
>> from pyspark import SparkContext, SparkConf
>> from pyspark.sql import HiveContext
>> conf = SparkConf().setAppName("Hive_Join")
>> sc = SparkContext(conf=conf)
>> hiveCtx = HiveContext(sc)
>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>> a.instance_id");
>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>> print results
>>
>>
>> *Data Study:*
>>
>> Number of Rows:
>>
>> A table has 1002093508
>> B table has5371668
>> C table has  1000
>>
>> No Data Skewness:
>>
>> item_id in B is unique and A has multiple rows with same item_id, so
>> after first INNER_JOIN the result set is same 1002093508 rows
>>
>> instance_id in C is unique and A has multiple rows with same
>> instance_id (maximum count of number of rows with same instance_id is 
>> 250)
>>
>> Spark Job runs with 90 Executors each with 2cores and 6GB memory.
>> YARN has allotted all the requested resource immediately and no other job
>> is running on the
>> cluster.
>>
>> spark.storage.memoryFraction 0.6
>> spark.shuffle.memoryFraction 0.2
>>
>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>> write 500GB of intermediate data
>>
>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL
>> and output of 400GB is shuffled
>>
>> Stage 4 - tasks fails with OOM on reading the shuffled output data
>> when it reached 40GB data itself
>>
>> First of all, what kind of Hive queries when run on Spark gets a
>> better performance than Mapreduce. And what are the hive queries that 
>> won't
>> perform
>> well in Spark.
>>
>> How to calculate the optimal Heap for Executor Memory and the number
>> of executors for given input data size. We don't specify Spark Executors 
>> to
>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 
>> 4
>> is failing immediately
>> when it has just read 40GB data, is it caching data in Memory.
>>
>> And in a Spark job, some stage will need lot of memory for shuffle
>> and some need lot of memory for cache. So, when a Spark Executor has lot 
>> of
>> memory available
>> for cache and does not use the cache but when there is a need to do
>> lot of shuffle, will executors only use the shuffle fraction which is set
>> for doing shuffle or will it use
>> the free memory available for cache as well.
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the 

Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
Thats a pretty old version of Spark SQL. It is devoid of all the
improvements introduced in the last few releases.

You should try bumping your spark.sql.shuffle.partitions to a value higher
than default (5x or 10x). Also increase your shuffle memory fraction as you
really are not explicitly caching anything. You could simply swap the
fractions in your case.

Regards
Sab

On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph 
wrote:

> It is a Spark-SQL and the version used is Spark-1.2.1.
>
> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I believe the OP is using Spark SQL and not Hive on Spark.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I think the only version of Spark that works OK with Hive (Hive on Spark
>>> engine) is version 1.3.1. I also get OOM from time to time and have to
>>> revert using MR
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 Which version of Spark are you using? The configuration varies by
 version.

 Regards
 Sab

 On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
 prabhujose.ga...@gmail.com> wrote:

> Hi All,
>
> A Hive Join query which runs fine and faster in MapReduce takes lot of
> time with Spark and finally fails with OOM.
>
> *Query:  hivejoin.py*
>
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> conf = SparkConf().setAppName("Hive_Join")
> sc = SparkContext(conf=conf)
> hiveCtx = HiveContext(sc)
> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
> a.instance_id");
> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
> print results
>
>
> *Data Study:*
>
> Number of Rows:
>
> A table has 1002093508
> B table has5371668
> C table has  1000
>
> No Data Skewness:
>
> item_id in B is unique and A has multiple rows with same item_id, so
> after first INNER_JOIN the result set is same 1002093508 rows
>
> instance_id in C is unique and A has multiple rows with same
> instance_id (maximum count of number of rows with same instance_id is 250)
>
> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
> has allotted all the requested resource immediately and no other job is
> running on the
> cluster.
>
> spark.storage.memoryFraction 0.6
> spark.shuffle.memoryFraction 0.2
>
> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
> write 500GB of intermediate data
>
> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
> output of 400GB is shuffled
>
> Stage 4 - tasks fails with OOM on reading the shuffled output data
> when it reached 40GB data itself
>
> First of all, what kind of Hive queries when run on Spark gets a
> better performance than Mapreduce. And what are the hive queries that 
> won't
> perform
> well in Spark.
>
> How to calculate the optimal Heap for Executor Memory and the number
> of executors for given input data size. We don't specify Spark Executors 
> to
> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
> is failing immediately
> when it has just read 40GB data, is it caching data in Memory.
>
> And in a Spark job, some stage will need lot of memory for shuffle and
> some need lot of memory for cache. So, when a Spark Executor has lot of
> memory available
> for cache and does not use the cache but when there is a need to do
> lot of shuffle, will executors only use the shuffle fraction which is set
> for doing shuffle or will it use
> the free memory available for cache as well.
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>
>
>
>
>
>
>
>


 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++

>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
It is a Spark-SQL and the version used is Spark-1.2.1.

On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I believe the OP is using Spark SQL and not Hive on Spark.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I think the only version of Spark that works OK with Hive (Hive on Spark
>> engine) is version 1.3.1. I also get OOM from time to time and have to
>> revert using MR
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Which version of Spark are you using? The configuration varies by
>>> version.
>>>
>>> Regards
>>> Sab
>>>
>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>> prabhujose.ga...@gmail.com> wrote:
>>>
 Hi All,

 A Hive Join query which runs fine and faster in MapReduce takes lot of
 time with Spark and finally fails with OOM.

 *Query:  hivejoin.py*

 from pyspark import SparkContext, SparkConf
 from pyspark.sql import HiveContext
 conf = SparkConf().setAppName("Hive_Join")
 sc = SparkContext(conf=conf)
 hiveCtx = HiveContext(sc)
 hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
 INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
 a.instance_id");
 results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
 print results


 *Data Study:*

 Number of Rows:

 A table has 1002093508
 B table has5371668
 C table has  1000

 No Data Skewness:

 item_id in B is unique and A has multiple rows with same item_id, so
 after first INNER_JOIN the result set is same 1002093508 rows

 instance_id in C is unique and A has multiple rows with same
 instance_id (maximum count of number of rows with same instance_id is 250)

 Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
 has allotted all the requested resource immediately and no other job is
 running on the
 cluster.

 spark.storage.memoryFraction 0.6
 spark.shuffle.memoryFraction 0.2

 Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
 write 500GB of intermediate data

 Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
 output of 400GB is shuffled

 Stage 4 - tasks fails with OOM on reading the shuffled output data when
 it reached 40GB data itself

 First of all, what kind of Hive queries when run on Spark gets a better
 performance than Mapreduce. And what are the hive queries that won't 
 perform
 well in Spark.

 How to calculate the optimal Heap for Executor Memory and the number of
 executors for given input data size. We don't specify Spark Executors to
 cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
 is failing immediately
 when it has just read 40GB data, is it caching data in Memory.

 And in a Spark job, some stage will need lot of memory for shuffle and
 some need lot of memory for cache. So, when a Spark Executor has lot of
 memory available
 for cache and does not use the cache but when there is a need to do lot
 of shuffle, will executors only use the shuffle fraction which is set for
 doing shuffle or will it use
 the free memory available for cache as well.


 Thanks,
 Prabhu Joseph











>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
I believe the OP is using Spark SQL and not Hive on Spark.

Regards
Sab

On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh 
wrote:

> I think the only version of Spark that works OK with Hive (Hive on Spark
> engine) is version 1.3.1. I also get OOM from time to time and have to
> revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>> time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
>>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same instance_id
>>> (maximum count of number of rows with same instance_id is 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>> has allotted all the requested resource immediately and no other job is
>>> running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
>>> 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>> output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>>> it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a better
>>> performance than Mapreduce. And what are the hive queries that won't perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number of
>>> executors for given input data size. We don't specify Spark Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>> memory available
>>> for cache and does not use the cache but when there is a need to do lot
>>> of shuffle, will executors only use the shuffle fraction which is set for
>>> doing shuffle or will it use
>>> the free memory available for cache as well.
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Mich Talebzadeh
I think the only version of Spark that works OK with Hive (Hive on Spark
engine) is version 1.3.1. I also get OOM from time to time and have to
revert using MR

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 March 2016 at 08:06, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> Which version of Spark are you using? The configuration varies by version.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
> prabhujose.ga...@gmail.com> wrote:
>
>> Hi All,
>>
>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>> time with Spark and finally fails with OOM.
>>
>> *Query:  hivejoin.py*
>>
>> from pyspark import SparkContext, SparkConf
>> from pyspark.sql import HiveContext
>> conf = SparkConf().setAppName("Hive_Join")
>> sc = SparkContext(conf=conf)
>> hiveCtx = HiveContext(sc)
>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>> a.instance_id");
>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>> print results
>>
>>
>> *Data Study:*
>>
>> Number of Rows:
>>
>> A table has 1002093508
>> B table has5371668
>> C table has  1000
>>
>> No Data Skewness:
>>
>> item_id in B is unique and A has multiple rows with same item_id, so
>> after first INNER_JOIN the result set is same 1002093508 rows
>>
>> instance_id in C is unique and A has multiple rows with same instance_id
>> (maximum count of number of rows with same instance_id is 250)
>>
>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>> has allotted all the requested resource immediately and no other job is
>> running on the
>> cluster.
>>
>> spark.storage.memoryFraction 0.6
>> spark.shuffle.memoryFraction 0.2
>>
>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
>> 500GB of intermediate data
>>
>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>> output of 400GB is shuffled
>>
>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>> it reached 40GB data itself
>>
>> First of all, what kind of Hive queries when run on Spark gets a better
>> performance than Mapreduce. And what are the hive queries that won't perform
>> well in Spark.
>>
>> How to calculate the optimal Heap for Executor Memory and the number of
>> executors for given input data size. We don't specify Spark Executors to
>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>> is failing immediately
>> when it has just read 40GB data, is it caching data in Memory.
>>
>> And in a Spark job, some stage will need lot of memory for shuffle and
>> some need lot of memory for cache. So, when a Spark Executor has lot of
>> memory available
>> for cache and does not use the cache but when there is a need to do lot
>> of shuffle, will executors only use the shuffle fraction which is set for
>> doing shuffle or will it use
>> the free memory available for cache as well.
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Sabarish Sasidharan
Which version of Spark are you using? The configuration varies by version.

Regards
Sab

On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph 
wrote:

> Hi All,
>
> A Hive Join query which runs fine and faster in MapReduce takes lot of
> time with Spark and finally fails with OOM.
>
> *Query:  hivejoin.py*
>
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> conf = SparkConf().setAppName("Hive_Join")
> sc = SparkContext(conf=conf)
> hiveCtx = HiveContext(sc)
> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
> a.instance_id");
> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
> print results
>
>
> *Data Study:*
>
> Number of Rows:
>
> A table has 1002093508
> B table has5371668
> C table has  1000
>
> No Data Skewness:
>
> item_id in B is unique and A has multiple rows with same item_id, so after
> first INNER_JOIN the result set is same 1002093508 rows
>
> instance_id in C is unique and A has multiple rows with same instance_id
> (maximum count of number of rows with same instance_id is 250)
>
> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
> allotted all the requested resource immediately and no other job is running
> on the
> cluster.
>
> spark.storage.memoryFraction 0.6
> spark.shuffle.memoryFraction 0.2
>
> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
> 500GB of intermediate data
>
> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
> output of 400GB is shuffled
>
> Stage 4 - tasks fails with OOM on reading the shuffled output data when it
> reached 40GB data itself
>
> First of all, what kind of Hive queries when run on Spark gets a better
> performance than Mapreduce. And what are the hive queries that won't perform
> well in Spark.
>
> How to calculate the optimal Heap for Executor Memory and the number of
> executors for given input data size. We don't specify Spark Executors to
> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
> is failing immediately
> when it has just read 40GB data, is it caching data in Memory.
>
> And in a Spark job, some stage will need lot of memory for shuffle and
> some need lot of memory for cache. So, when a Spark Executor has lot of
> memory available
> for cache and does not use the cache but when there is a need to do lot of
> shuffle, will executors only use the shuffle fraction which is set for
> doing shuffle or will it use
> the free memory available for cache as well.
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>
>
>
>
>
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Hive Query on Spark fails with OOM

2016-03-13 Thread Prabhu Joseph
Hi All,

A Hive Join query which runs fine and faster in MapReduce takes lot of time
with Spark and finally fails with OOM.

*Query:  hivejoin.py*

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf().setAppName("Hive_Join")
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
a.instance_id");
results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
print results


*Data Study:*

Number of Rows:

A table has 1002093508
B table has5371668
C table has  1000

No Data Skewness:

item_id in B is unique and A has multiple rows with same item_id, so after
first INNER_JOIN the result set is same 1002093508 rows

instance_id in C is unique and A has multiple rows with same instance_id
(maximum count of number of rows with same instance_id is 250)

Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
allotted all the requested resource immediately and no other job is running
on the
cluster.

spark.storage.memoryFraction 0.6
spark.shuffle.memoryFraction 0.2

Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
500GB of intermediate data

Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
output of 400GB is shuffled

Stage 4 - tasks fails with OOM on reading the shuffled output data when it
reached 40GB data itself

First of all, what kind of Hive queries when run on Spark gets a better
performance than Mapreduce. And what are the hive queries that won't perform
well in Spark.

How to calculate the optimal Heap for Executor Memory and the number of
executors for given input data size. We don't specify Spark Executors to
cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
is failing immediately
when it has just read 40GB data, is it caching data in Memory.

And in a Spark job, some stage will need lot of memory for shuffle and some
need lot of memory for cache. So, when a Spark Executor has lot of memory
available
for cache and does not use the cache but when there is a need to do lot of
shuffle, will executors only use the shuffle fraction which is set for
doing shuffle or will it use
the free memory available for cache as well.


Thanks,
Prabhu Joseph