cloud-fan commented on a change in pull request #35138:
URL: https://github.com/apache/spark/pull/35138#discussion_r780751016
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -451,30 +468,24 @@ case class HashShuffleSpec(
false
}
- override def canCreatePartitioning: Boolean = true
+ override def canCreatePartitioning: Boolean = {
+ // To avoid potential data skew, we don't allow `HashShuffleSpec` to
create partitioning if
+ // the hash partition keys are not the full join keys (the cluster keys).
Then the planner
+ // will add shuffles with the default partitioning of
`ClusteredDistribution`, which uses all
+ // the join keys.
+ if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) {
+ distribution.clustering.forall(x =>
partitioning.expressions.exists(_.semanticEquals(x)))
Review comment:
Good point. To fully restore to the previous behavior, we should require
an exact match, though I think the current change should cover the data skew
issues.
I'll make the change to be conservative.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]