sunchao commented on code in PR #53098:
URL: https://github.com/apache/spark/pull/53098#discussion_r2536530710
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -140,6 +140,13 @@ case class EnsureRequirements(
// Choose all the specs that can be used to shuffle other children
val candidateSpecs = specs
.filter(_._2.canCreatePartitioning)
+ .filter {
Review Comment:
Trying to understand this too. In `checkKeyGroupCompatible` we already makes
sure that both children are of `KeyGroupedPartitioning`. This new check
additionally checks that leaf nodes from both are all
`KeyGroupedPartitionedScan`?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -2626,4 +2627,148 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
assert(scans.forall(_.inputRDD.partitions.length == 2))
}
}
+
+ test("SPARK-53322: checkpointed scans avoid shuffles for aggregates") {
+ withTempDir { dir =>
+ spark.sparkContext.setCheckpointDir(dir.getPath)
+ val items_partitions = Array(identity("id"))
Review Comment:
nit: use camel cases?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]