Hisoka-X commented on code in PR #42194:
URL: https://github.com/apache/spark/pull/42194#discussion_r1288037738


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -368,150 +368,197 @@ case class EnsureRequirements(
     var newLeft = left
     var newRight = right
 
-    val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, 
d) =>
+    val specsOpts = Seq(left, right).zip(requiredChildDistribution).map { case 
(p, d) =>
       if (!d.isInstanceOf[ClusteredDistribution]) return None
       val cd = d.asInstanceOf[ClusteredDistribution]
       val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
-      if (specOpt.isEmpty) return None
-      specOpt.get
+      specOpt
+    }
+    val specsAllExist = specsOpts.forall(_.nonEmpty)
+    if ((!conf.v2BucketingShuffleOneSideEnabled && !specsAllExist)
+      || specsOpts.count(_.isEmpty) > 1) {
+      return None
     }
-
-    val leftSpec = specs.head
-    val rightSpec = specs(1)
 
     var isCompatible = false
-    if (!conf.v2BucketingPushPartValuesEnabled) {
-      isCompatible = leftSpec.isCompatibleWith(rightSpec)
-    } else {
-      logInfo("Pushing common partition values for storage-partitioned join")
-      isCompatible = leftSpec.areKeysCompatible(rightSpec)
-
-      // Partition expressions are compatible. Regardless of whether partition 
values
-      // match from both sides of children, we can calculate a superset of 
partition values and
-      // push-down to respective data sources so they can adjust their output 
partitioning by
-      // filling missing partition keys with empty partitions. As result, we 
can still avoid
-      // shuffle.
-      //
-      // For instance, if two sides of a join have partition expressions
-      // `day(a)` and `day(b)` respectively
-      // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = 
t2.b`), but
-      // with different partition values:
-      //   `day(a)`: [0, 1]
-      //   `day(b)`: [1, 2, 3]
-      // Following the case 2 above, we don't have to shuffle both sides, but 
instead can
-      // just push the common set of partition values: `[0, 1, 2, 3]` down to 
the two data
-      // sources.
-      if (isCompatible) {
-        val leftPartValues = leftSpec.partitioning.partitionValues
-        val rightPartValues = rightSpec.partitioning.partitionValues
-
-        logInfo(
-          s"""
-             |Left side # of partitions: ${leftPartValues.size}
-             |Right side # of partitions: ${rightPartValues.size}
-             |""".stripMargin)
-
-        // As partition keys are compatible, we can pick either left or right 
as partition
-        // expressions
-        val partitionExprs = leftSpec.partitioning.expressions
-
-        var mergedPartValues = InternalRowComparableWrapper
+
+    if (specsAllExist) {

Review Comment:
   Ohhhhh, Cool. The code more clear now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to