HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r811472916



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {

Review comment:
       OK, 100% agree with that the class name does not tell the behavior very 
clear, especially there are both ClusteredDistribution and 
HashClusteredDistribution. In the other hands, the classdocs were very clear 
for both classes hence I didn't have to look into HashPartitioning.satisfy when 
figuring out the problem on required clustered distribution for stateful 
operator. But I agree everyone has a different view, so both can be right.
   
   The concern comes from the fact that the unique functionality in 
HashClusteredDistribution is lost when unifying the classes. I'm not strongly 
concerning that we need to bring back HashClusteredDistribution. I'm concerning 
that requiring full keys is a valid requirement we shouldn't simply drop, at 
least I see from inputs.
   
   > They have to first remember that these two configs exist, understand the 
differences between the two distributions and then write code to choose them 
accordingly, while in most cases it should just be transparent.
   
   Seems like you'd like to say these are exceptional cases, but I'd wonder how 
we can say "exceptional cases" given we had to make exceptions two times in a 
couple of months, and the impact of latter is quite broader (not pinpointing 
the scope). I'd rather say it is no longer exceptional case.
   
   The needs of requiring full grouping keys doesn't seem to be trivial. In the 
comment https://github.com/apache/spark/pull/35552#issuecomment-1045149556, 
@sigmod pointed out the valid case, and said AQE can't kick in because they are 
in the same stage. And the reason they are in the same stage is due of loose 
requirement of ClusteredDistribution. We can't simply pray AQE to deal with 
this nicely, at least we need to split out stages and ask AQE to handle the 
proper number of partitions.
   
   What we make exceptions are all about the requirement of 
ClusteredDistribution. There is no problem for HashPartitioning. The reason we 
fix HashPartitioning is just due to the design that we use `satisfy`. Limiting 
the change to Partitioning and saying "We change nothing on 
ClusteredDistribution" doesn't seem right to me. We are technically changing 
ClusteredDistribution, and we need to make it clear. Otherwise this is also 
going to be an undocumented one. Instead of just documenting, I prefer having 
the flag explicitly so that any partitioning would know about the detailed 
requirement very clear. You don't need to remember the config when you work on 
DataSourcePartitioning. ClusteredDistribution will provide it for you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to