HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r812318442



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {

Review comment:
       > Just to make sure we are on the same page. The operators such as 
aggregate, window are always using ClusteredDistribution for years. So the 
problem of having data skew in those operators, is a new problem we are aiming 
to fix, not a regression coming from recent PRs.
   
   I agree with this, but the difference is that we dropped 
HashClusteredDistribution so we have less feature to leverage. In the SPIP doc 
we said "unify" two classes, but what we had done is "removing" the 
HashClusteredDistribution without alternatives.
   
   Regarding the problem, there are two perspectives on the problem, 1) data 
skew 2) insufficient number of partitions. 2) applies to any partitioning.
   
   Let's think about end user's perspective. They run the batch query, and 
expect Spark to finish it as quick as possible (or be resource-efficient). 
Spark produces the general config - default number of shuffle partitions - 
which defines the general parallelism whenever shuffle is introduced.
   
   The point is when the config takes effect. More and more output partitioning 
ClusteredDistribution could match, less and less shuffle could be kicked in, 
while end users may expect like "a set of known operators would introduce 
shuffles, which adjusts parallelism as I set the config". For example, for the 
case ClusteredDistribution with DataSourcePartitioning, it could be 
hypothetically "nowhere". The max parallelism could be tied to source's 
parallelism and nothing except manual change of the query could help since 
there could be no shuffle at all. AQE won't help the parallelism/skew issue 
within the stage.
   
   Shuffle may not be something we try hard to eliminate at all. We also need 
to think when the shuffle would be likely help. We can't leverage stats in 
physical execution, so my fall back goes to be heuristic, like the different 
view and resolution on this problem 
https://github.com/apache/spark/pull/35574#issuecomment-1047303366.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to