sunchao commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r753389528



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,13 +70,63 @@ case class EnsureRequirements(
     val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
       case (UnspecifiedDistribution, _) => false
       case (_: BroadcastDistribution, _) => false
+      case (AllTuples, _) => false
       case _ => true
     }.map(_._2)
 
-    val childrenNumPartitions =
-      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+    // If there are more than one children, we'll need to check partitioning & 
distribution of them
+    // and see if extra shuffles are necessary.
+    if (childrenIndexes.length > 1) {
+      childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+        if (!d.isInstanceOf[ClusteredDistribution]) {
+          throw new IllegalStateException(s"Expected ClusteredDistribution but 
found " +
+              s"${d.getClass.getSimpleName}")
+        }
+      }
+      val specs = childrenIndexes.map(i =>
+        i -> children(i).outputPartitioning.createShuffleSpec(
+          
requiredChildDistributions(i).requiredNumPartitions.getOrElse(conf.numShufflePartitions),
+          requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+      ).toMap
+
+      // Find out the shuffle spec that gives better parallelism.
+      //
+      // NOTE: this is not optimal for the case when there are more than 2 
children. Consider:
+      //   (10, 10, 11)
+      // it's better to pick 10 in this case since we only need to shuffle one 
side - we'd need to
+      // shuffle two sides if we pick 11.
+      //
+      // However this should be sufficient for now since in Spark nodes with 
multiple children
+      // always have exactly 2 children.
+      //
+      // Also when choosing the spec, we should consider those children with 
no `Exchange` node
+      // first. For instance, if we have:
+      //   A: (No_Exchange, 100) <---> B: (Exchange, 120)
+      // it's better to pick A and change B to (Exchange, 100) instead of 
picking B and insert a
+      // new shuffle for A.

Review comment:
       Thanks for the detailed description. Yea I tried a similar approach to 
what you mentioned above, but the implementation was somehow a bit more complex 
than I thought and thus I went with this simpler approach which is tailored for 
the case of exactly two children.
   
   Let me try it again and see if it can be simple, general, and easy to 
understand.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to