HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810560773



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -524,10 +543,7 @@ case class HashShuffleSpec(
     // will add shuffles with the default partitioning of 
`ClusteredDistribution`, which uses all
     // the join keys.
     if 
(SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) {
-      partitioning.expressions.length == distribution.clustering.length &&
-        partitioning.expressions.zip(distribution.clustering).forall {
-          case (l, r) => l.semanticEquals(r)
-        }
+      partitioning.isPartitionedOnFullKeys(distribution)

Review comment:
       Although it is beyond the scope of the PR, same thing applies here. 
Would we need to require strict order of keys?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {
+            // Checks `HashPartitioning` is partitioned on exactly full 
clustering keys of
+            // `ClusteredDistribution`. Opt in this feature with enabling
+            // "spark.sql.requireAllClusterKeysForHashPartition", can help 
avoid potential data
+            // skewness for some jobs.
+            isPartitionedOnFullKeys(c)

Review comment:
       If we end up with strict ordering, we could document the method doc on 
isPartitionedOnFullKeys that it is also requiring exact order, and replace the 
condition of StatefulOpClusteredDistribution with isPartitionedOnFullKeys. I'm 
wondering we would care about ordering for cases we described.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
##########
@@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest
     val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): 
_*).sum("id")
     checkAnswer(df, Row(2, 3, 1))
   }
+
+  test("SPARK-38237: require all cluster keys for child required 
distribution") {

Review comment:
       Thanks! Please feel free to adjust the test case or leverage the test 
case to deduce additional test cases.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -271,6 +279,17 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
   override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
     HashShuffleSpec(this, distribution)
 
+  /**
+   * Checks if [[HashPartitioning]] is partitioned on exactly same full 
`clustering` keys of
+   * [[ClusteredDistribution]].
+   */
+  def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
+    expressions.length == distribution.clustering.length &&

Review comment:
       The condition is more restrict than we explain in the config (e.g. is 
the ordering important here?), but I'm fine with this if we are all OK with 
this, as my proposal is technically the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to