[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-28 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r228758621
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

> > but after create table command, when we do insert command within the 
same session Hive statistics is not getting updated
> 
> This is the thing I don't understand. Like I said before, even if table 
has no stats, Spark will still get a stats via the `DetermineTableStats` rule.

@cloud-fan DetermineStats is just initializing the stats if the stats is 
not set, only if session.sessionState.conf.fallBackToHdfsForStatsEnabled is 
true then the rule is deriving the stats from file system and updating the 
stats as shown below code snippet. In insert flow  this condition never gets 
executed, so the stats will be still none.

![image](https://user-images.githubusercontent.com/12999161/47619998-e3096600-db0a-11e8-9315-fa0d18be0860.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226515799
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

You are right its about considering the default size, but i am not very 
sure whether we shall  invalidate the cache, i will explain my understanding 
below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226203589
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

The table created in the current session does not have stats. In this 
situation. It gets `sizeInBytes` from

https://github.com/apache/spark/blob/1ff4a77be498615ee7216fd9cc2d510ecbd43b27/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46

https://github.com/apache/spark/blob/25c2776dd9ae3f9792048c78be2cbd958fd99841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala#L88-L91
.
It's realy size, that why it's `broadcast join`. In fact, we should 
invalidate this table to let Spark use the `DetermineTableStats` to take 
effect. I am doing it here: https://github.com/apache/spark/pull/22721


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226203075
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

> > but after create table command, when we do insert command within the 
same session Hive statistics is not getting updated
> 
> This is the thing I don't understand. Like I said before, even if table 
has no stats, Spark will still get a stats via the `DetermineTableStats` rule.

Right,but i DefaultStatistics will return default value for the stats

> > but after create table command, when we do insert command within the 
same session Hive statistics is not getting updated
> 
> This is the thing I don't understand. Like I said before, even if table 
has no stats, Spark will still get a stats via the `DetermineTableStats` rule.
I think this rule will return default stats always unless we make 
session.sessionState.conf.fallBackToHdfsForStatsEnabled as true, i will 
reconfirm this behaviour.


![image](https://user-images.githubusercontent.com/12999161/47139545-b3bc5300-d2d9-11e8-9ae9-b13ee0dac970.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226201756
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

Yes its default setting which means false.  but i think it should be fine 
to keep default setting in this scenario .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226199284
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

@sujith71955 What if `spark.sql.statistics.size.autoUpdate.enabled=false` 
or `hive.stats.autogather=false`? It still update stats?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226198591
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

> but after create table command, when we do insert command within the same 
session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has 
no stats, Spark will still get a stats via the `DetermineTableStats` rule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226197174
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -1051,7 +1051,8 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 
   test("test statistics of LogicalRelation converted from Hive serde 
tables") {
 Seq("orc", "parquet").foreach { format =>
-  Seq(true, false).foreach { isConverted =>
+  // Botth parquet and orc will have Hivestatistics, both are 
convertable to Logical Relation.
+  Seq(true, true).foreach { isConverted =>
--- End diff --

Right. i think some misunderstanding i will recheck into this. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226192210
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

Thanks for your valuable feedback.
My observations : 
1) In insert flow we are always trying to update the HiveStats as per the 
below statement in InsertIntoHadoopFsRelationCommand. 
```
  if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
  }

```
but after create table command, when we do insert command within the same 
session Hive statistics is not getting updated due to below validation where 
condition expects stats to be non-empty as below
  
```
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit 
= {
if (table.stats.nonEmpty) { 
```
But if we re-launch spark-shell and trying to do insert command the 
Hivestatistics will be saved and now onward the stats will be taken from 
HiveStats and the flow will never try to estimate the data size with file .

2) Currently always system is not trying to  estimate the data size with 
files when we are executing the insert command, as i told above if we launch 
the query from a new context , system will try to read the stats from the Hive. 
i think there is a problem in the behavior consistency and also if we can 
always get the stats from hive then shall we need to calculate again eveytime 
the stats from files?

 >> I think we may need to update the flow where it shall always try read 
the data size from files, it shall never depend on HiveStats,
 >> Or if we are recording the HiveStats then everytime it shall read the 
Hivestats.  
Please let me know whether i am going right direction, let me know for any 
clarifications.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226151421
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

I don't quite understand why table must have stats. For both file sources 
and hive tables, we will estimate the data size with files, if the table 
doesn't have stats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226150341
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -1051,7 +1051,8 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 
   test("test statistics of LogicalRelation converted from Hive serde 
tables") {
 Seq("orc", "parquet").foreach { format =>
-  Seq(true, false).foreach { isConverted =>
+  // Botth parquet and orc will have Hivestatistics, both are 
convertable to Logical Relation.
+  Seq(true, true).foreach { isConverted =>
--- End diff --

This is to test when the conversion is on and off. We shouldn't change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-17 Thread sujith71955
GitHub user sujith71955 opened a pull request:

https://github.com/apache/spark/pull/22758

[SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has 
selected when restart spark-shell/spark-JDBC for hive provider

## What changes were proposed in this pull request?
Problem:
Below steps in sequence to reproduce the issue.
```
a.Create parquet table with stored as clause.
b.Run insert statement  => This will not update Hivestats.
c.Run (Select query which needs to calculate stats or explain cost select 
statement)  => this will evaluate  stats from HadoopFsRelation
d.Since correct stats(sizeInBytes) is available , the plan  will select  
broadcast node if join with any table.
e. Exit => (come out of shell)

f.Now again run **step c** ( calculate stat) query. This gives wrong stats 
(sizeInBytes - default value will be assigned)  in plan. Because it is 
calculated from HiveCatalogTable which does not have any stats as it is not 
updated in **step b**
g.Since in-correct stats(sizeInBytes - default value will be assigned) is 
available, the plan  will select  SortMergeJoin node if join with any table.
h.Now Run insert statement => This will  update Hivestats .
i.Now again run **step c** ( calculate stat) query. This gives correct stat 
(sizeInBytes)  in plan .because it can read the hive stats which is updated in 
**step i**.
j.Now onward always stat is available so correct stat is plan will be 
displayed  which picks  Broadcast join node(based on threshold size) always.
```
## What changes were proposed in this pull request?
Here the main problem is hive stats is not getting recorded after insert 
command, this is because of a condition  "if (table.stats.nonEmpty)" in 
updateTableStats()  which will be executed as part of 
InsertIntoHadoopFsRelationCommand command.
So as part of fix we initialized a default value for the CatalogTable stats 
if there is no cache of a particular LogicalRelation.
Also it is observed in Test Case “test statistics of LogicalRelation 
converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are 
convertible but we are expecting that only orc should
get/record stats Hivestats  not for parquet.But since both relations are 
convertible so we should have same expectation. Same is corrected in this PR.

## How was this patch tested?
Manually tested, attaching the snapshot, also corrected a UT as mentioned 
above in description which will compliment this PR changes.
Step 1: 
Login to spark-shell => create 2 tables => Run insert commad => Explain the 
check the plan =>Plan contains Broadcast join => Exit


![step-1_spark-25332](https://user-images.githubusercontent.com/12999161/47113009-83db6400-d275-11e8-8439-0b9cba0cb413.PNG)

Step 2:
Relaunch Spark-shell => Run explain command of particular select statement 
=> verify the plan => Plan contains SortMergeJoin - This is incorrect result.


![step-2_spark-25332](https://user-images.githubusercontent.com/12999161/47113119-d288fe00-d275-11e8-9c8c-971f02fddda7.PNG)

Step 3:
Again Run insert command => Run explain command of particular select 
statement => verify the plan
we can observer the node is been changed as BroadcastJoin - This makes the 
flow inconsistent.

**After Fix**
Login to spark-shell => create 2 tables => Run insert commad => Explain the 
check the plan =>Plan contains Broadcast join => Exit


![step-1-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113323-52af6380-d276-11e8-9eb9-71d1076d7e38.PNG)

Step 2:
Relaunch Spark-shell => Run explain command of particular select statement 
=> verify the plan => Plan still contains Broadcast join since after fix 
Hivestats is available for the table.

![step-2-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113407-94400e80-d276-11e8-99c1-66fa0c333beb.PNG)

Step 3:
Again Run insert command => Run explain command of particular select 
statement => verify the plan
we can observer the plan still retains BroadcastJoin - Nowonwards the 
results are always consistent


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sujith71955/spark master_statistics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22758


commit 469f3693b641dae161bbb673599f55f20a60767b
Author: s71955 
Date:   2018-10-17T19:43:39Z

[SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has 
selected when restart spark-shell/spark-JDBC for hive provider

## What changes were proposed in this pull request?
Problaem:
Below steps in s