cloud-fan commented on code in PR #38950:
URL: https://github.com/apache/spark/pull/38950#discussion_r1043344672


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -142,12 +145,66 @@ case class EnsureRequirements(
         Some(finalCandidateSpecs.values.maxBy(_.numPartitions))
       }
 
+      def getKeyGroupedSpec(idx: Int): ShuffleSpec = specs(idx) match {
+        case ShuffleSpecCollection(specs) => specs.head
+        case spec => spec
+      }
+
+      def populatePartitionKeys(plan: SparkPlan, keys: Seq[InternalRow]): 
SparkPlan =
+        plan match {
+          case scan: BatchScanExec =>
+            scan.copy(commonPartitionKeys = Some(keys))
+          case node =>
+            node.mapChildren(child => populatePartitionKeys(child, keys))
+        }
+
       // Check if 1) all children are of `KeyGroupedPartitioning` and 2) they 
are all compatible
       // with each other. If both are true, skip shuffle.
-      val allCompatible = childrenIndexes.sliding(2).forall {
-        case Seq(a, b) =>
-          checkKeyGroupedSpec(specs(a)) && checkKeyGroupedSpec(specs(b)) &&
-            specs(a).isCompatibleWith(specs(b))
+      val allCompatible = childrenIndexes.length == 2 && {
+        val left = childrenIndexes.head
+        val right = childrenIndexes(1)
+        var isCompatible: Boolean = false
+
+        if (checkKeyGroupedSpec(specs(left)) && 
checkKeyGroupedSpec(specs(right))) {
+          isCompatible = specs(left).isCompatibleWith(specs(right))
+
+          // If `isCompatible` is false, it could mean:
+          //   1. Partition expressions are not compatible: we have to shuffle 
in this case.
+          //   2. Partition expressions are compatible, but partition keys are 
not: in this case we
+          //      can compute a superset of partition keys and push-down to 
respective
+          //      data sources, which can then adjust their respective output 
partitioning by
+          //      filling missing partition keys with empty partitions. As 
result, Spark can still
+          //      avoid shuffle.

Review Comment:
   can we give a join query example to help people understand the optimization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to