c21 commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810572093



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, _) =>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+        case c @ ClusteredDistribution(requiredClustering, _) =>
+          if (SQLConf.get.requireAllClusterKeysForHashPartition) {

Review comment:
       I actually also thought about the pros and cons for these two 
approaches: 1). change behavior of `HashPartitioning`, vs 2). change behavior 
of `ClusteredDistribution`. I am more inclined to 1). change behavior of 
`HashPartitioning` for followed reason:
   
   `ClusteredDistribution`'s current definition is pretty clean and flexible, 
so let's not move backward.
   
   ``` scala
   /**
    * Represents data where tuples that share the same values for the 
`clustering`
    * [[Expression Expressions]] will be co-located in the same partition.
    */
   case class ClusteredDistribution
   ```
   
   As long as data is partitioned in the way where tuples/rows having same 
values for `clustering` is in same partition, then the partitioning can satisfy 
`ClusteredDistribution`. It tolerates both full keys and subset of keys, so 
it's flexible enough to work for a range of operators - aggregate, window, join 
(together with `ShuffleSpec` introduced recently for co-partitioning). It does 
not has any implicit requirement of hash expression, or hash function (so it 
gets rid of the drawback of `HashClusteredDistribution`). More partitioning 
other than `HashPartitioning` can satisfy `ClusteredDistribution` (e.g. 
`RangePartitioning` and `DataSourcePartitioning`). Add flag such as 
`requiresFullKeysMatch` into `ClusteredDistribution` would make every 
`Partitioning` implementation unnecessarily more complicated, as this is just a 
problem for `HashPartitioning` now.
   
   `HashPartitioning` can decide flexibly by itself when should it satisfy 
`ClusteredDistribution`, either subset of keys (current behavior), or full keys 
(with config introduced in this PR). This leaves other `Partitioning` 
(`RangePartitioning` and `DataSourcePartitioning`) and `ClusteredDistribution` 
untouched. Indeed this is just a local decision made by `HashPartitioning`. I 
think this is more flexible and extendable. In the future, if other 
`Partitioning` has similar requirement, e.g. `DataSourcePartitioning`, similar 
logic can be introduced inside `DataSourcePartitioning.satisfies0()` locally 
without any intrusive interface change.
   
   > For latter, it's probably the simplest change, but here we have no idea 
about the operator so we are restricted to apply the change in future as global 
manner.
   
   It's true, but I think the granularity is tricky to decide, so let's start 
with best solution to maintain our interface cleanly. We can discuss later if 
there is a strong requirement. One can further argue if user wants more finer 
granularity control  that he/she wants to specify exact operator in the query 
(e.g. the query has 3 `aggregate`s, and only wants to enable feature for 1 of 
them).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to