peter-toth commented on code in PR #54751:
URL: https://github.com/apache/spark/pull/54751#discussion_r2923796943


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -2892,4 +2892,128 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         Row("ccc", 30, 400.50)))
     }
   }
+
+  test("SPARK-55848: dropDuplicates after SPJ with partial clustering should 
produce " +
+      "correct results") {
+    val items_partitions = Array(identity("id"))
+    createTable(items, itemsColumns, items_partitions)
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+        "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+        "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+        "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+        "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+    val purchases_partitions = Array(identity("item_id"))
+    createTable(purchases, purchasesColumns, purchases_partitions)
+    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+        "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+        "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+        "(2, 11.0, cast('2020-01-01' as timestamp)), " +
+        "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+    withSQLConf(
+        SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
true.toString) {
+      // dropDuplicates on the join key after a partially-clustered SPJ must 
still
+      // produce the correct number of distinct ids.  Before the fix, the
+      // partially-clustered partitioning was incorrectly treated as satisfying
+      // ClusteredDistribution, so EnsureRequirements did not insert an 
Exchange
+      // before the dedup, leading to duplicate rows.
+      val df = sql(
+        s"""
+           |SELECT DISTINCT i.id
+           |FROM testcat.ns.$items i
+           |JOIN testcat.ns.$purchases p ON i.id = p.item_id
+           |""".stripMargin)
+      checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+      val allShuffles = collectAllShuffles(df.queryExecution.executedPlan)
+      assert(allShuffles.nonEmpty,
+        "should contain a shuffle for the post-join dedup with partial 
clustering")
+    }
+  }
+
+  test("SPARK-55848: Window dedup after SPJ with partial clustering should 
produce " +
+      "correct results") {
+    val items_partitions = Array(identity("id"))
+    createTable(items, itemsColumns, items_partitions)
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+        "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+        "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+        "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+        "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+    val purchases_partitions = Array(identity("item_id"))
+    createTable(purchases, purchasesColumns, purchases_partitions)
+    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+        "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+        "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+        "(2, 11.0, cast('2020-01-01' as timestamp)), " +
+        "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+    withSQLConf(
+        SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
true.toString) {
+      // Use ROW_NUMBER() OVER to dedup joined rows per id after a 
partially-clustered
+      // SPJ.  The WINDOW operator requires ClusteredDistribution on i.id; 
with partial
+      // clustering the plan must insert a shuffle so that the window
+      // produces exactly one row per id.
+      val df = sql(
+        s"""
+           |SELECT id, price FROM (
+           |  SELECT i.id, i.price,
+           |    ROW_NUMBER() OVER (PARTITION BY i.id ORDER BY i.price DESC) AS 
rn
+           |  FROM testcat.ns.$items i
+           |  JOIN testcat.ns.$purchases p ON i.id = p.item_id
+           |) t WHERE rn = 1
+           |""".stripMargin)
+      checkAnswer(df, Seq(Row(1, 41.0f), Row(2, 10.0f), Row(3, 15.5f)))
+
+      val allShuffles = collectAllShuffles(df.queryExecution.executedPlan)
+      assert(allShuffles.nonEmpty,

Review Comment:
   Besides checking the presence of shuffle, can you please also check if we 
have partially clustered `KeyGroupedPartitioning` from the scans?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to