zhengruifeng commented on code in PR #36438:
URL: https://github.com/apache/spark/pull/36438#discussion_r872300669
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -117,6 +118,112 @@ object ShufflePartitionsUtil extends Logging {
return Seq.empty
}
+ val emptyIndexSet = collection.mutable.Set.empty[Int]
+ inputPartitionSpecs.foreach(_.get.iterator.zipWithIndex.foreach {
+ case (EmptyPartitionSpec, i) => emptyIndexSet.add(i)
+ case _ =>
+ })
+
+ if (emptyIndexSet.isEmpty) {
+ return coalescePartitionsWithSkew(mapOutputStatistics,
inputPartitionSpecs,
+ targetSize, minPartitionSize, true)
+ }
+
+ // when all partitions are empty, return single EmptyPartitionSpec here to
satisfy
+ // SPARK-32083 (AQE coalesce should at least return one partition).
+ if (inputPartitionSpecs.flatten.flatten.forall(_ == EmptyPartitionSpec)) {
+ return inputPartitionSpecs.map(_ => Seq(EmptyPartitionSpec))
+ }
+
+ // ShufflePartitionSpecs at these emptyIndices can NOT be coalesced
+ // split inputPartitionSpecs into sub-sequences by the empty indices, and
+ // call coalescePartitionsWithSkew to optimize each sub-sequence.
+ // let inputPartitionSpecs are:
+ // [A0(empty), A1, A2, A3(empty), A4(empty), A5, A6, A7, A8, A9, A10]
+ // [B0, B1, B2, B3, B4(empty), B5, B6, B7, B8(empty), B9, B10]
+ // then:
+ // 1, specs at index (0, 3, 8) are kept: (A0(empty)-B0), (A3(empty)-B3),
(A8-B8(empty))
+ // 2, specs at index 4 are discarded, since they are all empty:
(A4(empty)-B4(empty))
+ // 3, sub-sequences [A1-B1, A2-B2], [A5-B5, A6-B6, A7-B7], [A9-B9,
A10-B10] are optimized
Review Comment:
Yes, this maybe a regression.
This rule may skiped the second and third partitions (if the jointype is
`Inner`), but it will split one task into more tasks.
if partitions can be coalesed by `CoalesceShufflePartitions`, we may need to
measure whether it is worthwhile to apply this rule (maybe we only mark a
partition empty when its size is larger than a threshold)
if `CoalesceShufflePartitions` is not triggered, it should be always
beneficial to mark some non-empty partitions empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]