ulysses-you commented on code in PR #36438:
URL: https://github.com/apache/spark/pull/36438#discussion_r878992276
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -117,6 +118,112 @@ object ShufflePartitionsUtil extends Logging {
return Seq.empty
}
+ val emptyIndexSet = collection.mutable.Set.empty[Int]
+ inputPartitionSpecs.foreach(_.get.iterator.zipWithIndex.foreach {
+ case (EmptyPartitionSpec, i) => emptyIndexSet.add(i)
+ case _ =>
+ })
+
+ if (emptyIndexSet.isEmpty) {
+ return coalescePartitionsWithSkew(mapOutputStatistics,
inputPartitionSpecs,
+ targetSize, minPartitionSize, true)
+ }
+
+ // when all partitions are empty, return single EmptyPartitionSpec here to
satisfy
+ // SPARK-32083 (AQE coalesce should at least return one partition).
+ if (inputPartitionSpecs.flatten.flatten.forall(_ == EmptyPartitionSpec)) {
+ return inputPartitionSpecs.map(_ => Seq(EmptyPartitionSpec))
+ }
+
+ // ShufflePartitionSpecs at these emptyIndices can NOT be coalesced
+ // split inputPartitionSpecs into sub-sequences by the empty indices, and
+ // call coalescePartitionsWithSkew to optimize each sub-sequence.
+ // let inputPartitionSpecs are:
+ // [A0(empty), A1, A2, A3(empty), A4(empty), A5, A6, A7, A8, A9, A10]
+ // [B0, B1, B2, B3, B4(empty), B5, B6, B7, B8(empty), B9, B10]
+ // then:
+ // 1, specs at index (0, 3, 8) are kept: (A0(empty)-B0), (A3(empty)-B3),
(A8-B8(empty))
+ // 2, specs at index 4 are discarded, since they are all empty:
(A4(empty)-B4(empty))
+ // 3, sub-sequences [A1-B1, A2-B2], [A5-B5, A6-B6, A7-B7], [A9-B9,
A10-B10] are optimized
Review Comment:
A simple way may be: we can put this rule after the
`CoalesceShufflePartitions`, so if we match a empty partition that means we can
always skip it for no harm. But compared with applying the rule before
`CoalesceShufflePartitions`, we may miss to optimize some empty partition since
they may be coalesced.
But I think it seems can be accepted. If the empty partition can be
coalesced with other partition, that said the corresponding partition of the
other side of join is also small. So if we prefer to coalesce, we actually have
small
overhead since the shuffle data is small enough (less than advisory size).
Or even faster than skip empty partition since we have less reduce partition
number.
So what we really want to skip is one side partition is empty and other side
is big (may be a little skew or can not be optimized through skew). What do you
thank ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]