HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r812318442
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
- case ClusteredDistribution(requiredClustering, _) =>
- expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ case c @ ClusteredDistribution(requiredClustering, _) =>
+ if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Review comment:
> Just to make sure we are on the same page. The operators such as
aggregate, window are always using ClusteredDistribution for years. So the
problem of having data skew in those operators, is a new problem we are aiming
to fix, not a regression coming from recent PRs.
I agree with this, but the difference is that we dropped
HashClusteredDistribution so we have less feature to leverage. In the SPIP doc
we said "unify" two classes, but what we had done is "removing" the
HashClusteredDistribution without alternatives.
Regarding the problem, there are two perspectives on the problem, 1) data
skew 2) insufficient number of partitions. 2) applies to any partitioning.
Let's think about end user's perspective. They run the batch query, and
expect Spark to finish it as quick as possible (or be resource-efficient).
Spark produces the general config - default number of shuffle partitions -
which defines the general parallelism whenever shuffle is introduced.
The point is when the config takes effect. More and more output partitioning
ClusteredDistribution could match, less and less shuffle could be kicked in,
while end users may expect like "a set of known operators would introduce
shuffles, which adjusts parallelism as I set the config". For example, for the
case ClusteredDistribution with DataSourcePartitioning, it could be
hypothetically "nowhere". The max parallelism could be tied to source's
parallelism and nothing except manual change of the query could help since
there could be no shuffle at all. AQE won't help the parallelism/skew issue
within the stage.
Shuffle may not be something we should try hard to eliminate at all. We also
need to think when the shuffle would be likely help. We can't leverage stats in
physical execution, so my fall back goes to be heuristic, like the different
view and resolution on this problem
https://github.com/apache/spark/pull/35574#issuecomment-1047303366.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]