szehon-ho commented on code in PR #54714:
URL: https://github.com/apache/spark/pull/54714#discussion_r2928048296
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1210,6 +1210,142 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
+ test("SPARK-55848: dropDuplicates after SPJ with partial clustering should
produce " +
+ "correct results") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+ // Two rows for id=1 so partial clustering may split them across tasks
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+ "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val purchases_partitions = Array(identity("item_id"))
+ createTable(purchases, purchasesColumns, purchases_partitions)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+ "(2, 11.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+ withSQLConf(
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
true.toString) {
+ // dropDuplicates on the join key after a partially-clustered SPJ must
still
+ // produce the correct number of distinct ids. Before the fix, the
+ // partially-clustered partitioning was incorrectly treated as satisfying
+ // ClusteredDistribution, so EnsureRequirements did not insert an
Exchange
+ // before the dedup, leading to duplicate rows.
+ val df = sql(
+ s"""
+ |SELECT DISTINCT i.id
+ |FROM testcat.ns.$items i
+ |JOIN testcat.ns.$purchases p ON i.id = p.item_id
+ |""".stripMargin)
+ checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+ // One GroupPartitionsExec per join child to align the
partially-clustered
+ // partitions, and one above the join to group for the aggregate.
+ val joinGP = collectGroupPartitions(df.queryExecution.executedPlan)
+ assert(joinGP.size === 2,
+ "expected 2 GroupPartitionsExec under the join")
+ val allGP = collectAllGroupPartitions(df.queryExecution.executedPlan)
+ assert(allGP.size === 3,
+ "expected 3 GroupPartitionsExec total (2 under join + 1 above for
aggregate)")
+ }
+ }
+
+ test("SPARK-55848: Window dedup after SPJ with partial clustering should
produce " +
Review Comment:
nit: 'should produce correct results' is redundant :)
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1210,6 +1210,142 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
+ test("SPARK-55848: dropDuplicates after SPJ with partial clustering should
produce " +
+ "correct results") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+ // Two rows for id=1 so partial clustering may split them across tasks
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+ "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val purchases_partitions = Array(identity("item_id"))
+ createTable(purchases, purchasesColumns, purchases_partitions)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+ "(2, 11.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+ withSQLConf(
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
true.toString) {
+ // dropDuplicates on the join key after a partially-clustered SPJ must
still
+ // produce the correct number of distinct ids. Before the fix, the
+ // partially-clustered partitioning was incorrectly treated as satisfying
+ // ClusteredDistribution, so EnsureRequirements did not insert an
Exchange
+ // before the dedup, leading to duplicate rows.
+ val df = sql(
+ s"""
+ |SELECT DISTINCT i.id
+ |FROM testcat.ns.$items i
+ |JOIN testcat.ns.$purchases p ON i.id = p.item_id
+ |""".stripMargin)
+ checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+ // One GroupPartitionsExec per join child to align the
partially-clustered
+ // partitions, and one above the join to group for the aggregate.
+ val joinGP = collectGroupPartitions(df.queryExecution.executedPlan)
+ assert(joinGP.size === 2,
+ "expected 2 GroupPartitionsExec under the join")
+ val allGP = collectAllGroupPartitions(df.queryExecution.executedPlan)
+ assert(allGP.size === 3,
+ "expected 3 GroupPartitionsExec total (2 under join + 1 above for
aggregate)")
+ }
+ }
+
+ test("SPARK-55848: Window dedup after SPJ with partial clustering should
produce " +
+ "correct results") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+ "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val purchases_partitions = Array(identity("item_id"))
+ createTable(purchases, purchasesColumns, purchases_partitions)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+ "(2, 11.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+ withSQLConf(
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
true.toString) {
+ // Use ROW_NUMBER() OVER to dedup joined rows per id after a
partially-clustered
+ // SPJ. The WINDOW operator requires ClusteredDistribution on i.id;
with partial
+ // clustering the plan must insert the right exchange/group so that the
window
+ // produces exactly one row per id.
+ val df = sql(
Review Comment:
should we use 'selectWithMergeJoinHint' for consistency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]