ulysses-you commented on code in PR #36438:
URL: https://github.com/apache/spark/pull/36438#discussion_r878992276


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -117,6 +118,112 @@ object ShufflePartitionsUtil extends Logging {
       return Seq.empty
     }
 
+    val emptyIndexSet = collection.mutable.Set.empty[Int]
+    inputPartitionSpecs.foreach(_.get.iterator.zipWithIndex.foreach {
+      case (EmptyPartitionSpec, i) => emptyIndexSet.add(i)
+      case _ =>
+    })
+
+    if (emptyIndexSet.isEmpty) {
+      return coalescePartitionsWithSkew(mapOutputStatistics, 
inputPartitionSpecs,
+        targetSize, minPartitionSize, true)
+    }
+
+    // when all partitions are empty, return single EmptyPartitionSpec here to 
satisfy
+    // SPARK-32083 (AQE coalesce should at least return one partition).
+    if (inputPartitionSpecs.flatten.flatten.forall(_ == EmptyPartitionSpec)) {
+      return inputPartitionSpecs.map(_ => Seq(EmptyPartitionSpec))
+    }
+
+    // ShufflePartitionSpecs at these emptyIndices can NOT be coalesced
+    // split inputPartitionSpecs into sub-sequences by the empty indices, and
+    // call coalescePartitionsWithSkew to optimize each sub-sequence.
+    // let inputPartitionSpecs are:
+    //   [A0(empty), A1, A2, A3(empty), A4(empty), A5, A6, A7, A8, A9, A10]
+    //   [B0, B1, B2, B3, B4(empty), B5, B6, B7, B8(empty), B9, B10]
+    // then:
+    // 1, specs at index (0, 3, 8) are kept: (A0(empty)-B0), (A3(empty)-B3), 
(A8-B8(empty))
+    // 2, specs at index 4 are discarded, since they are all empty: 
(A4(empty)-B4(empty))
+    // 3, sub-sequences [A1-B1, A2-B2], [A5-B5, A6-B6, A7-B7], [A9-B9, 
A10-B10] are optimized

Review Comment:
   A simple way may be: we can put this rule after the 
`CoalesceShufflePartitions`, so if we match a empty partition that means we can 
always skip it for no harm. But compared with applying the rule before 
`CoalesceShufflePartitions`, we may miss to optimize some empty partition since 
they may be coalesced.
   
   But I think it seems can be accepted. If the empty partition can be 
coalesced with other partition, that said the corresponding partition of the 
other side of join is also small. So if we prefer to coalesce, we actually have 
small 
   overhead since the shuffle data is small enough (less than advisory size). 
Or even faster than skip empty partition since we have less reduce partition 
number.
   
   So what we really want to skip is one side partition is empty and other side 
is big (may be a little skew or can not be optimized through skew). What do you 
thank ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to