c21 commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r810572093
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
- case ClusteredDistribution(requiredClustering, _) =>
- expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ case c @ ClusteredDistribution(requiredClustering, _) =>
+ if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Review comment:
I actually also thought about the pros and cons for these two
approaches: 1). change behavior of `HashPartitioning`, vs 2). change behavior
of `ClusteredDistribution`. I am more inclined to 1). change behavior of
`HashPartitioning` for followed reason:
`ClusteredDistribution`'s current definition is pretty clean and flexible,
so let's not move backward.
``` scala
/**
* Represents data where tuples that share the same values for the
`clustering`
* [[Expression Expressions]] will be co-located in the same partition.
*/
case class ClusteredDistribution
```
As long as data is partitioned in the way where tuples/rows having same
values for `clustering` is in same partition, then the partitioning can satisfy
`ClusteredDistribution`. It tolerates both full keys and subset of keys, so
it's flexible enough to work for a range of operators - aggregate, window, join
(together with `ShuffleSpec` introduced recently for co-partitioning). It does
not has any implicit requirement of hash expression, or hash function (so it
gets rid of the drawback of `HashClusteredDistribution`). More partitioning
other than `HashPartitioning` can satisfy `ClusteredDistribution` (e.g.
`RangePartitioning` and `DataSourcePartitioning`). Add flag such as
`requiresFullKeysMatch` into `ClusteredDistribution` would make every
`Partitioning` implementation unnecessarily more complicated, as this is just a
problem for `HashPartitioning` now.
`HashPartitioning` can decide flexibly by itself when should it satisfy
`ClusteredDistribution`, either subset of keys (current behavior), or full keys
(with config introduced in this PR). This leaves other `Partitioning`
(`RangePartitioning` and `DataSourcePartitioning`) and `ClusteredDistribution`
untouched. Indeed this is just a local decision made by `HashPartitioning`. I
think this is more flexible and extendable. In the future, if other
`Partitioning` has similar requirement, e.g. `DataSourcePartitioning`, similar
logic can be introduced inside `DataSourcePartitioning.satisfies0()` locally
without any intrusive interface change.
> For latter, it's probably the simplest change, but here we have no idea
about the operator so we are restricted to apply the change in future as global
manner.
It's true, but I think the granularity is tricky to decide, so let's start
with best solution to maintain our interface cleanly. We can discuss later if
there is a strong requirement. One can further argue if user wants more finer
granularity control that he/she wants to specify exact operator in the query
(e.g. the query has 3 `aggregate`s, and only wants to enable feature for 1 of
them).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]