cloud-fan commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r753319752



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,13 +70,63 @@ case class EnsureRequirements(
     val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
       case (UnspecifiedDistribution, _) => false
       case (_: BroadcastDistribution, _) => false
+      case (AllTuples, _) => false
       case _ => true
     }.map(_._2)
 
-    val childrenNumPartitions =
-      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+    // If there are more than one children, we'll need to check partitioning & 
distribution of them
+    // and see if extra shuffles are necessary.
+    if (childrenIndexes.length > 1) {
+      childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+        if (!d.isInstanceOf[ClusteredDistribution]) {
+          throw new IllegalStateException(s"Expected ClusteredDistribution but 
found " +
+              s"${d.getClass.getSimpleName}")
+        }
+      }
+      val specs = childrenIndexes.map(i =>
+        i -> children(i).outputPartitioning.createShuffleSpec(
+          
requiredChildDistributions(i).requiredNumPartitions.getOrElse(conf.numShufflePartitions),
+          requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+      ).toMap
+
+      // Find out the shuffle spec that gives better parallelism.
+      //
+      // NOTE: this is not optimal for the case when there are more than 2 
children. Consider:
+      //   (10, 10, 11)
+      // it's better to pick 10 in this case since we only need to shuffle one 
side - we'd need to
+      // shuffle two sides if we pick 11.
+      //
+      // However this should be sufficient for now since in Spark nodes with 
multiple children
+      // always have exactly 2 children.
+      //
+      // Also when choosing the spec, we should consider those children with 
no `Exchange` node
+      // first. For instance, if we have:
+      //   A: (No_Exchange, 100) <---> B: (Exchange, 120)
+      // it's better to pick A and change B to (Exchange, 100) instead of 
picking B and insert a
+      // new shuffle for A.

Review comment:
       I'm trying to figure out the requirement here. We want to pick the best 
shuffle spec which:
   1. can create co-partitioned partitions (which excludes `RangeShuffleSpec`)
   2. maximize parallelism
   3. minimize shuffles need to be added (prefer to shuffle the child with 
existing `Exchange` node)
   
   I think the algorithm can be
   1. filter out shuffle specs that can't create co-partitioned partitions (we 
can add a bool flag in `ShuffleSpec` to report this)
   2. filter out shuffle specs whose numPartitions is small than 
`spark.sql.shuffle.partitions`
   3. sort the rest of the shuffle specs by two preferences: 1) prefer shuffle 
spec whose child is not Exchange. 2) prefer shuffle spec with larger 
numPartitions
   4. pick the best shuffle spec according to the sort result. The best shuffle 
spec can be None if all shuffle specs get filtered out
   
   If there is no best shuffle spec, add shuffle to all the children. Else, 
create partitioning with the best shuffle spec.
   
   Note, the best shuffle spec should also be compatible with most other 
shuffle specs, to reduce the shuffles to add. But in reality, we only have two 
children, I think we can skip this dimension for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to