[jira] [Commented] (SPARK-29549) Union of DataSourceV2 datasources leads to duplicated results

2019-10-28 Thread Miguel Molina (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961043#comment-16961043
 ] 

Miguel Molina commented on SPARK-29549:
---

Hi [~hyukjin.kwon]. In Spark 2.4 it works correctly, this only affects 2.3.x.

> Union of DataSourceV2 datasources leads to duplicated results
> -
>
> Key: SPARK-29549
> URL: https://issues.apache.org/jira/browse/SPARK-29549
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4
>Reporter: Miguel Molina
>Priority: Major
>
> Hello!
> I've discovered that when two DataSourceV2 data frames in a query of the 
> exact same shape are joined and there is an aggregation in the query, only 
> the first results are used. The rest get removed by the ReuseExchange rule 
> and reuse the results of the first data frame, leading to N times the first 
> data frame results.
>  
> I've put together a repository with an example project where this can be 
> reproduced: [https://github.com/erizocosmico/spark-union-issue]
>  
> Basically, doing this:
>  
> {code:java}
> val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP 
> BY name")
> val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY 
> name")
> products.union(users)
>  .select("name")
>  .show(truncate = false, numRows = 50){code}
>  
>  
> Where products is:
> {noformat}
> +-+---+
> |name |id |
> +-+---+
> |candy |1 |
> |chocolate|2 |
> |milk |3 |
> |cinnamon |4 |
> |pizza |5 |
> |pineapple|6 |
> +-+---+{noformat}
> And users is:
> {noformat}
> +---+---+
> |name |id |
> +---+---+
> |andy |1 |
> |alice |2 |
> |mike |3 |
> |mariah |4 |
> |eleanor|5 |
> |matthew|6 |
> +---+---+ {noformat}
>  
> Results are incorrect:
> {noformat}
> +-+
> |name |
> +-+
> |candy |
> |pizza |
> |chocolate|
> |cinnamon |
> |pineapple|
> |milk |
> |candy |
> |pizza |
> |chocolate|
> |cinnamon |
> |pineapple|
> |milk |
> +-+{noformat}
>  
> This is the plan explained:
>  
> {noformat}
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('name, None)]
> +- AnalysisBarrier
>  +- Union
>  :- Aggregate [name#0], [name#0, count(1) AS count#8L]
>  : +- SubqueryAlias products
>  : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
>  +- Aggregate [name#4], [name#4, count(1) AS count#12L]
>  +- SubqueryAlias users
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Analyzed Logical Plan ==
> name: string
> Project [name#0]
> +- Union
>  :- Aggregate [name#0], [name#0, count(1) AS count#8L]
>  : +- SubqueryAlias products
>  : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
>  +- Aggregate [name#4], [name#4, count(1) AS count#12L]
>  +- SubqueryAlias users
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Optimized Logical Plan ==
> Union
> :- Aggregate [name#0], [name#0]
> : +- Project [name#0]
> : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- Aggregate [name#4], [name#4]
>  +- Project [name#4]
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Physical Plan ==
> Union
> :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- Exchange hashpartitioning(name#0, 200)
> : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- *(1) Project [name#0]
> : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
>  +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
> {noformat}
>  
>  
> In the physical plan, the first exchange is reused, but it shouldn't be 
> because both sources are not the same.
>  
> {noformat}
> == Physical Plan ==
> Union
> :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- Exchange hashpartitioning(name#0, 200)
> : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- *(1) Project [name#0]
> : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
>  +- ReusedExchange [name#4], Exchange hashpartition

[jira] [Created] (SPARK-29549) Union of DataSourceV2 datasources leads to duplicated results

2019-10-22 Thread Miguel Molina (Jira)
Miguel Molina created SPARK-29549:
-

 Summary: Union of DataSourceV2 datasources leads to duplicated 
results
 Key: SPARK-29549
 URL: https://issues.apache.org/jira/browse/SPARK-29549
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.4, 2.3.3, 2.3.2, 2.3.1, 2.3.0
Reporter: Miguel Molina


Hello!

I've discovered that when two DataSourceV2 data frames in a query of the exact 
same shape are joined and there is an aggregation in the query, only the first 
results are used. The rest get removed by the ReuseExchange rule and reuse the 
results of the first data frame, leading to N times the first data frame 
results.

 

I've put together a repository with an example project where this can be 
reproduced: [https://github.com/erizocosmico/spark-union-issue]

 

Basically, doing this:

 
{code:java}
val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP BY 
name")
val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY name")

products.union(users)
 .select("name")
 .show(truncate = false, numRows = 50){code}
 

 

Where products is:
{noformat}
+-+---+
|name |id |
+-+---+
|candy |1 |
|chocolate|2 |
|milk |3 |
|cinnamon |4 |
|pizza |5 |
|pineapple|6 |
+-+---+{noformat}
And users is:
{noformat}
+---+---+
|name |id |
+---+---+
|andy |1 |
|alice |2 |
|mike |3 |
|mariah |4 |
|eleanor|5 |
|matthew|6 |
+---+---+ {noformat}
 

Results are incorrect:
{noformat}
+-+
|name |
+-+
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
+-+{noformat}
 

This is the plan explained:

 
{noformat}
== Parsed Logical Plan ==
'Project [unresolvedalias('name, None)]
+- AnalysisBarrier
 +- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Analyzed Logical Plan ==
name: string
Project [name#0]
+- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Optimized Logical Plan ==
Union
:- Aggregate [name#0], [name#0]
: +- Project [name#0]
: +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- Aggregate [name#4], [name#4]
 +- Project [name#4]
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
{noformat}
 

 

In the physical plan, the first exchange is reused, but it shouldn't be because 
both sources are not the same.

 
{noformat}
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat}
 

This seems to be fixed in 2.4.x, but affects, 2.3.x versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org