HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r812440933
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
- case ClusteredDistribution(requiredClustering, _) =>
- expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ case c @ ClusteredDistribution(requiredClustering, _) =>
+ if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Review comment:
> I guess this PR is aiming for the same goal - add proper shuffle on
full clustering keys based on config, right?
Yes, but we still have an argument that why it is needed. I wouldn't say it
is due to HashPartitioning, as I mentioned about two different perspectives.
Even if the hash function of HashPartitioning is somewhat inefficient on proper
distribution on some sort of grouping keys and data, it is only contributing to
data skew. In any partitioning, the number of partitions are the physical limit
of parallelism. It is ClusteredDistribution preventing two operators to have
shuffle in between, coupling operators in the same stage, with same
partitioning & parallelism.
(Please correct me if AQE can decide to split the single stage to multiple
stages injecting shuffles in between.)
Considering two different perspectives, there are multiple cases of child
partitioning for each physical node we may need to deal with:
1) clustered keys are fully considered and having sufficient number of
partitions (ideal)
2) clustered keys are fully considered but having insufficient number of
partitions
3) only a subset of clustered keys are considered and having insufficient
number of partitions
4) only a subset of clustered keys are considered but having sufficient
number of partitions
Since we are going with the manual / heuristic way to deal with it, I would
like to see the way addressing more cases with less side-effects. That is the
main reason I tried to think about alternatives.
Requiring full clustered keys can deal with 3) and 4), where 4) may be
skewed (good case for shuffle) or not (shuffled may not be needed) so the
benefit of having shuffle is conditional. Requiring a minimum threshold of
number of partitioning can deal with 2) and 3) which are good in general to
ensure minimum parallelism for grouping/joining operators, where it misses the
case of 4), but we just mentioned the benefit of having shuffle in 4) is
conditional. In addition, it is no longer only bound to data skew, hence
applies to any partitioning.
Implementation wise, I imagine it is simple as we just add the another
constraint of ClusteredDistribution. If the required number of partitions
exists, it will strictly follow the number, otherwise we compare numPartitions
of partitioning and the threshold being defined in ClusteredDistribution. The
value of threshold could be optional if we doubt about the good default value
working for majority of queries.
For sure, these constraints can be co-used, to deal with as many cases as
they can. Requiring full clustered keys is still needed to deal with 4) - if we
are not sure about whether this applies only to HashPartitioning or further
partitioning, and want to defer the decision to have this in official
constraint of ClusteredDistribution, I'd agree about deferring the decision
till we deal with DataSourcePartitioning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]