cloud-fan commented on code in PR #39633:
URL: https://github.com/apache/spark/pull/39633#discussion_r1097319853
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -411,6 +336,235 @@ case class EnsureRequirements(
}
}
+ /**
+ * Checks whether two children, `left` and `right`, of a join operator have
compatible
+ * `KeyGroupedPartitioning`, and can benefit from storage-partitioned join.
+ *
+ * Returns the updated new children if the check is successful, otherwise
`None`.
+ */
+ private def checkKeyGroupCompatible(
+ parent: SparkPlan,
+ left: SparkPlan,
+ right: SparkPlan,
+ requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+ parent match {
+ case smj: SortMergeJoinExec =>
+ checkKeyGroupCompatible(left, right, smj.joinType,
requiredChildDistribution)
+ case sj: ShuffledHashJoinExec =>
+ checkKeyGroupCompatible(left, right, sj.joinType,
requiredChildDistribution)
+ case _ =>
+ None
+ }
+ }
+
+ private def checkKeyGroupCompatible(
+ left: SparkPlan,
+ right: SparkPlan,
+ joinType: JoinType,
+ requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+ assert(requiredChildDistribution.length == 2)
+
+ var newLeft = left
+ var newRight = right
+
+ val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p,
d) =>
+ if (!d.isInstanceOf[ClusteredDistribution]) return None
+ val cd = d.asInstanceOf[ClusteredDistribution]
+ val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
+ if (specOpt.isEmpty) return None
+ specOpt.get
+ }
+
+ val leftSpec = specs.head
+ val rightSpec = specs(1)
+
+ var isCompatible = false
+ if (!conf.v2BucketingPushPartValuesEnabled) {
+ isCompatible = leftSpec.isCompatibleWith(rightSpec)
+ } else {
+ logInfo("Pushing common partition values for storage-partitioned join")
+ isCompatible = leftSpec.areKeysCompatible(rightSpec)
+
+ // Partition expressions are compatible. Regardless of whether partition
values
+ // match from both sides of children, we can we can calculate a superset
of
Review Comment:
```suggestion
// match from both sides of children, we can calculate a superset of
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]