sunchao commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810663212



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {

Review comment:
       I'm also inclined to option 1) for now, and agree to the points that 
@c21 raised above. 
   
   As a Spark developer, I was originally confused when seeing both 
`HashClusteredDistribution` and `ClusteredDistribution` and had to navigate the 
code base and reason about their behavior differences. Combined with the newly 
introduced config, a developer now has to remember parsing the value of the 
config and choose `HashClusteredDistribution` or `ClusteredDistribution` 
accordingly, which is some extra burden. In addition, it's better to have a 
separate `StatefulOpClusteredDistribution` dedicated to SS use cases, as it 
makes them more distinctive.
   
   Of course, having a separate `HashClusteredDistribution` opens up more 
opportunities for it to evolve separately. But I'd suggest to only consider 
that when we have some concrete ideas. So far, I don't see what can't be done 
with `ClusteredDistribution` alone.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -271,6 +279,17 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
   override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
     HashShuffleSpec(this, distribution)
 
+  /**
+   * Checks if [[HashPartitioning]] is partitioned on exactly same full 
`clustering` keys of
+   * [[ClusteredDistribution]].
+   */
+  def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
+    expressions.length == distribution.clustering.length &&

Review comment:
       I'm not sure if ordering is important here: is it a common case that 
data skewness is introduced after changing the order the hash keys? I'm not 
sure if murmur3 hash exhibits this kind of property.
   
   This also makes the optimization harder to kick in (imagine users have to 
carefully align join or aggregation keys to the same order as in bucket keys in 
the table). It is also a behavior change of bucket join, since currently Spark 
reorders the hash keys w.r.t join keys in 
`EnsureRequirements.reorderJoinPredicates`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to