maropu commented on a change in pull request #31404:
URL: https://github.com/apache/spark/pull/31404#discussion_r573803578



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##########
@@ -385,6 +385,90 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(unionDF, unionAllDF)
   }
 
+  test("SQL-style union using Dataset: remove unnecessary deduplicate in 
multiple unions") {
+    val unionDF = 
testData.union(testData).distinct().union(testData).distinct()
+      .union(testData).distinct().union(testData).distinct()
+
+    // Before optimizer, there are three 'union.deduplicate' operations should 
be combined.
+    assert(unionDF.queryExecution.analyzed.collect {
+      case j: Union if j.children.size == 4 => j
+    }.size === 1)
+
+    // After optimizer, four 'union.deduplicate' operations should be combined.
+    assert(unionDF.queryExecution.optimizedPlan.collect {
+      case j: Union if j.children.size == 5 => j

Review comment:
       nit: why `j`? `u` instead?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##########
@@ -385,6 +385,90 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(unionDF, unionAllDF)
   }
 
+  test("SQL-style union using Dataset: remove unnecessary deduplicate in 
multiple unions") {
+    val unionDF = 
testData.union(testData).distinct().union(testData).distinct()
+      .union(testData).distinct().union(testData).distinct()
+
+    // Before optimizer, there are three 'union.deduplicate' operations should 
be combined.
+    assert(unionDF.queryExecution.analyzed.collect {
+      case j: Union if j.children.size == 4 => j
+    }.size === 1)
+
+    // After optimizer, four 'union.deduplicate' operations should be combined.
+    assert(unionDF.queryExecution.optimizedPlan.collect {
+      case j: Union if j.children.size == 5 => j
+    }.size === 1)
+
+    checkAnswer(
+      unionDF.agg(avg("key"), max("key"), min("key"), sum("key")),
+      Row(50.5, 100, 1, 5050) :: Nil
+    )
+
+    // The result of SQL-style union
+    val unionSQLResult = sql(
+      """
+        | select key, value from testData
+        | union
+        | select key, value from testData
+        | union
+        | select key, value from testData
+        | union
+        | select key, value from testData
+        | union
+        | select key, value from testData
+        |""".stripMargin)
+    checkAnswer(unionDF, unionSQLResult)

Review comment:
       We need this test? What's a purpose of this test?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##########
@@ -385,6 +385,90 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(unionDF, unionAllDF)
   }
 
+  test("SQL-style union using Dataset: remove unnecessary deduplicate in 
multiple unions") {
+    val unionDF = 
testData.union(testData).distinct().union(testData).distinct()
+      .union(testData).distinct().union(testData).distinct()
+
+    // Before optimizer, there are three 'union.deduplicate' operations should 
be combined.
+    assert(unionDF.queryExecution.analyzed.collect {
+      case j: Union if j.children.size == 4 => j

Review comment:
       nit: isn't it more intuitive to simply count the number of aggregates 
for `distinct`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to