Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Thanks Yong for the response. Adding my responses inline

On Tue, Jan 17, 2017 at 10:27 PM, Yong Zhang  wrote:

> What DB you are using for your Hive meta store, and what types are your
> partition columns?
>
I am using MySql for Hive metastore. Partition columns are  combination of
INT and STRING types.

>
> You maybe want to read the discussion in SPARK-6910, and especially the
> comments in PR. There are some limitation about partition pruning in
> Hive/Spark, maybe yours is one of them
>
Seems I had already gone through SPARK-6910 and corresponding all PRs.
*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*
*I had set the above properties from *SPARK-6910 & PRs.


>

> Yong
>
>
> --
> *From:* Raju Bairishetti 
> *Sent:* Tuesday, January 17, 2017 3:00 AM
> *To:* user @spark
> *Subject:* Re: Spark sql query plan contains all the partitions from hive
> table even though filtering of partitions is provided
>
> Had a high level look into the code. Seems getHiveQlPartitions  method
> from HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning
> conf value.
>
>  It should not fetch all partitions if we set metastorePartitionPruning to
> true (Default value for this is false)
>
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
>
> ...
>
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
>
> lazy val allPartitions = table.getAllPartitions
>
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
>
> Am I missing something or looking at wrong place?
>
>
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
>
>> Waiting for suggestions/help on this...
>>
>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
>> wrote:
>>
>>> Hello,
>>>
>>>Spark sql is generating query plan with all partitions information
>>> even though if we apply filters on partitions in the query.  Due to this,
>>> spark driver/hive metastore is hitting with OOM as each table is with lots
>>> of partitions.
>>>
>>> We can confirm from hive audit logs that it tries to *fetch all
>>> partitions* from hive metastore.
>>>
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>>> cmd=get_partitions : db= tbl=x
>>>
>>>
>>> Configured the following parameters in the spark conf to fix the above
>>> issue(source: from spark-jira & github pullreq):
>>>
>>> *spark.sql.hive.convertMetastoreParquet   false *
>>> *spark.sql.hive.metastorePartitionPruning   true*
>>>
>>>
>>> *   plan:  rdf.explain *
>>> *   == Physical Plan ==*
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>
>>> *get_partitions_by_filter* method is called and fetching only
>>> required partitions.
>>>
>>> But we are seeing parquetDecode errors in our applications
>>> frequently after this. Looks like these decoding errors were because of
>>> changing serde from spark-builtin to hive serde.
>>>
>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>> right approach instead of forcing users to use hive serde.
>>>
>>> Is there any workaround/way to fix this issue? I would like to hear more
>>> thoughts on this :)
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>
>>
>>
>> --
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Yong Zhang
What DB you are using for your Hive meta store, and what types are your 
partition columns?


You maybe want to read the discussion in SPARK-6910, and especially the 
comments in PR. There are some limitation about partition pruning in 
Hive/Spark, maybe yours is one of them.


Yong



From: Raju Bairishetti 
Sent: Tuesday, January 17, 2017 3:00 AM
To: user @spark
Subject: Re: Spark sql query plan contains all the partitions from hive table 
even though filtering of partitions is provided

Had a high level look into the code. Seems getHiveQlPartitions  method from 
HiveMetastoreCatalog is getting called irrespective of 
metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true 
(Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting 
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
Waiting for suggestions/help on this...

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
> wrote:
Hello,

   Spark sql is generating query plan with all partitions information even 
though if we apply filters on partitions in the query.  Due to this, spark 
driver/hive metastore is hitting with OOM as each table is with lots of 
partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from 
hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above 
issue(source: from spark-jira & github pullreq):
spark.sql.hive.convertMetastoreParquet   false
spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 
= 2),(venture#318 = DEFAULT)]

get_partitions_by_filter method is called and fetching only required 
partitions.

But we are seeing parquetDecode errors in our applications frequently after 
this. Looks like these decoding errors were because of changing serde from 
spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right 
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more 
thoughts on this :)

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Had a high level look into the code. Seems getHiveQlPartitions  method from
HiveMetastoreCatalog is getting called irrespective of
metastorePartitionPruning
conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to
true (Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?


On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti  wrote:

> Waiting for suggestions/help on this...
>
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
> wrote:
>
>> Hello,
>>
>>Spark sql is generating query plan with all partitions information
>> even though if we apply filters on partitions in the query.  Due to this,
>> spark driver/hive metastore is hitting with OOM as each table is with lots
>> of partitions.
>>
>> We can confirm from hive audit logs that it tries to *fetch all
>> partitions* from hive metastore.
>>
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>> cmd=get_partitions : db= tbl=x
>>
>>
>> Configured the following parameters in the spark conf to fix the above
>> issue(source: from spark-jira & github pullreq):
>>
>> *spark.sql.hive.convertMetastoreParquet   false*
>> *spark.sql.hive.metastorePartitionPruning   true*
>>
>>
>> *   plan:  rdf.explain*
>> *   == Physical Plan ==*
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>
>> *get_partitions_by_filter* method is called and fetching only
>> required partitions.
>>
>> But we are seeing parquetDecode errors in our applications frequently
>> after this. Looks like these decoding errors were because of changing
>> serde from spark-builtin to hive serde.
>>
>> I feel like,* fixing query plan generation in the spark-sql* is the
>> right approach instead of forcing users to use hive serde.
>>
>> Is there any workaround/way to fix this issue? I would like to hear more
>> thoughts on this :)
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-15 Thread Raju Bairishetti
Waiting for suggestions/help on this...

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti  wrote:

> Hello,
>
>Spark sql is generating query plan with all partitions information even
> though if we apply filters on partitions in the query.  Due to this, spark
> driver/hive metastore is hitting with OOM as each table is with lots of
> partitions.
>
> We can confirm from hive audit logs that it tries to *fetch all
> partitions* from hive metastore.
>
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
> cmd=get_partitions : db= tbl=x
>
>
> Configured the following parameters in the spark conf to fix the above
> issue(source: from spark-jira & github pullreq):
>
> *spark.sql.hive.convertMetastoreParquet   false*
> *spark.sql.hive.metastorePartitionPruning   true*
>
>
> *   plan:  rdf.explain*
> *   == Physical Plan ==*
>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>
> *get_partitions_by_filter* method is called and fetching only
> required partitions.
>
> But we are seeing parquetDecode errors in our applications frequently
> after this. Looks like these decoding errors were because of changing
> serde from spark-builtin to hive serde.
>
> I feel like,* fixing query plan generation in the spark-sql* is the right
> approach instead of forcing users to use hive serde.
>
> Is there any workaround/way to fix this issue? I would like to hear more
> thoughts on this :)
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-10 Thread Raju Bairishetti
Hello,

   Spark sql is generating query plan with all partitions information even
though if we apply filters on partitions in the query.  Due to this, spark
driver/hive metastore is hitting with OOM as each table is with lots of
partitions.

We can confirm from hive audit logs that it tries to *fetch all partitions*
from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above
issue(source: from spark-jira & github pullreq):

*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*


*   plan:  rdf.explain*
*   == Physical Plan ==*
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
28),(hour#317 = 2),(venture#318 = DEFAULT)]

*get_partitions_by_filter* method is called and fetching only required
partitions.

But we are seeing parquetDecode errors in our applications frequently
after this. Looks like these decoding errors were because of changing
serde from spark-builtin to hive serde.

I feel like,* fixing query plan generation in the spark-sql* is the right
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more
thoughts on this :)

--
Thanks,
Raju Bairishetti,
www.lazada.com