Miguel Molina created SPARK-29549:
-------------------------------------

             Summary: Union of DataSourceV2 datasources leads to duplicated 
results
                 Key: SPARK-29549
                 URL: https://issues.apache.org/jira/browse/SPARK-29549
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.4, 2.3.3, 2.3.2, 2.3.1, 2.3.0
            Reporter: Miguel Molina


Hello!

I've discovered that when two DataSourceV2 data frames in a query of the exact 
same shape are joined and there is an aggregation in the query, only the first 
results are used. The rest get removed by the ReuseExchange rule and reuse the 
results of the first data frame, leading to N times the first data frame 
results.

 

I've put together a repository with an example project where this can be 
reproduced: [https://github.com/erizocosmico/spark-union-issue]

 

Basically, doing this:

 
{code:java}
val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP BY 
name")
val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY name")

products.union(users)
 .select("name")
 .show(truncate = false, numRows = 50){code}
 

 

Where products is:
{noformat}
+---------+---+
|name |id |
+---------+---+
|candy |1 |
|chocolate|2 |
|milk |3 |
|cinnamon |4 |
|pizza |5 |
|pineapple|6 |
+---------+---+{noformat}
And users is:
{noformat}
+-------+---+
|name |id |
+-------+---+
|andy |1 |
|alice |2 |
|mike |3 |
|mariah |4 |
|eleanor|5 |
|matthew|6 |
+-------+---+ {noformat}
 

Results are incorrect:
{noformat}
+---------+
|name |
+---------+
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
+---------+{noformat}
 

This is the plan explained:

 
{noformat}
== Parsed Logical Plan ==
'Project [unresolvedalias('name, None)]
+- AnalysisBarrier
 +- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Analyzed Logical Plan ==
name: string
Project [name#0]
+- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Optimized Logical Plan ==
Union
:- Aggregate [name#0], [name#0]
: +- Project [name#0]
: +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- Aggregate [name#4], [name#4]
 +- Project [name#4]
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
[alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
{noformat}
 

 

In the physical plan, the first exchange is reused, but it shouldn't be because 
both sources are not the same.

 
{noformat}
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
[chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat}
 

This seems to be fixed in 2.4.x, but affects, 2.3.x versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to